Skip to content

Commit b64c13d

Browse files
authored
chore: Include first ScanExec batch in metrics (#1105)
* include first batch in ScanExec metrics * record row count metric * fix regression
1 parent ca3a529 commit b64c13d

File tree

1 file changed

+40
-10
lines changed
  • native/core/src/execution/operators

1 file changed

+40
-10
lines changed

native/core/src/execution/operators/scan.rs

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,16 @@ pub struct ScanExec {
6565
pub input_source_description: String,
6666
/// The data types of columns of the input batch. Converted from Spark schema.
6767
pub data_types: Vec<DataType>,
68+
/// Schema of first batch
69+
pub schema: SchemaRef,
6870
/// The input batch of input data. Used to determine the schema of the input data.
6971
/// It is also used in unit test to mock the input data from JVM.
7072
pub batch: Arc<Mutex<Option<InputBatch>>>,
7173
/// Cache of expensive-to-compute plan properties
7274
cache: PlanProperties,
7375
/// Metrics collector
7476
metrics: ExecutionPlanMetricsSet,
77+
baseline_metrics: BaselineMetrics,
7578
}
7679

7780
impl ScanExec {
@@ -81,22 +84,30 @@ impl ScanExec {
8184
input_source_description: &str,
8285
data_types: Vec<DataType>,
8386
) -> Result<Self, CometError> {
87+
let metrics_set = ExecutionPlanMetricsSet::default();
88+
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
89+
8490
// Scan's schema is determined by the input batch, so we need to set it before execution.
8591
// Note that we determine if arrays are dictionary-encoded based on the
8692
// first batch. The array may be dictionary-encoded in some batches and not others, and
8793
// ScanExec will cast arrays from all future batches to the type determined here, so we
8894
// may end up either unpacking dictionary arrays or dictionary-encoding arrays.
8995
// Dictionary-encoded primitive arrays are always unpacked.
9096
let first_batch = if let Some(input_source) = input_source.as_ref() {
91-
ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?
97+
let mut timer = baseline_metrics.elapsed_compute().timer();
98+
let batch =
99+
ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?;
100+
timer.stop();
101+
baseline_metrics.record_output(batch.num_rows());
102+
batch
92103
} else {
93104
InputBatch::EOF
94105
};
95106

96107
let schema = scan_schema(&first_batch, &data_types);
97108

98109
let cache = PlanProperties::new(
99-
EquivalenceProperties::new(schema),
110+
EquivalenceProperties::new(Arc::clone(&schema)),
100111
// The partitioning is not important because we are not using DataFusion's
101112
// query planner or optimizer
102113
Partitioning::UnknownPartitioning(1),
@@ -110,7 +121,9 @@ impl ScanExec {
110121
data_types,
111122
batch: Arc::new(Mutex::new(Some(first_batch))),
112123
cache,
113-
metrics: ExecutionPlanMetricsSet::default(),
124+
metrics: metrics_set,
125+
baseline_metrics,
126+
schema,
114127
})
115128
}
116129

@@ -276,11 +289,15 @@ impl ExecutionPlan for ScanExec {
276289
}
277290

278291
fn schema(&self) -> SchemaRef {
279-
// `unwrap` is safe because `schema` is only called during converting
280-
// Spark plan to DataFusion plan. At the moment, `batch` is not EOF.
281-
let binding = self.batch.try_lock().unwrap();
282-
let input_batch = binding.as_ref().unwrap();
283-
scan_schema(input_batch, &self.data_types)
292+
if self.exec_context_id == TEST_EXEC_CONTEXT_ID {
293+
// `unwrap` is safe because `schema` is only called during converting
294+
// Spark plan to DataFusion plan. At the moment, `batch` is not EOF.
295+
let binding = self.batch.try_lock().unwrap();
296+
let input_batch = binding.as_ref().unwrap();
297+
scan_schema(input_batch, &self.data_types)
298+
} else {
299+
Arc::clone(&self.schema)
300+
}
284301
}
285302

286303
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
@@ -303,6 +320,7 @@ impl ExecutionPlan for ScanExec {
303320
self.clone(),
304321
self.schema(),
305322
partition,
323+
self.baseline_metrics.clone(),
306324
)))
307325
}
308326

@@ -352,8 +370,12 @@ struct ScanStream<'a> {
352370
}
353371

354372
impl<'a> ScanStream<'a> {
355-
pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize) -> Self {
356-
let baseline_metrics = BaselineMetrics::new(&scan.metrics, partition);
373+
pub fn new(
374+
scan: ScanExec,
375+
schema: SchemaRef,
376+
partition: usize,
377+
baseline_metrics: BaselineMetrics,
378+
) -> Self {
357379
let cast_time = MetricBuilder::new(&scan.metrics).subset_time("cast_time", partition);
358380
Self {
359381
scan,
@@ -465,4 +487,12 @@ impl InputBatch {
465487

466488
InputBatch::Batch(columns, num_rows)
467489
}
490+
491+
/// Get the number of rows in this batch
492+
fn num_rows(&self) -> usize {
493+
match self {
494+
Self::EOF => 0,
495+
Self::Batch(_, num_rows) => *num_rows,
496+
}
497+
}
468498
}

0 commit comments

Comments
 (0)