Skip to content

Commit cdd24f1

Browse files
committed
Record compute time for CoalescePartitionsExec
1 parent 6402200 commit cdd24f1

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

datafusion/src/physical_plan/coalesce_partitions.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
9797
}
9898

9999
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
100-
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
101-
102100
// CoalescePartitionsExec produces a single partition
103101
if 0 != partition {
104102
return Err(DataFusionError::Internal(format!(
@@ -113,10 +111,16 @@ impl ExecutionPlan for CoalescePartitionsExec {
113111
"CoalescePartitionsExec requires at least one input partition".to_owned(),
114112
)),
115113
1 => {
116-
// bypass any threading if there is a single partition
114+
// bypass any threading / metrics if there is a single partition
117115
self.input.execute(0).await
118116
}
119117
_ => {
118+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
119+
// record the (very) minimal work done so that
120+
// elapsed_compute is not reported as 0
121+
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
122+
let _timer = elapsed_compute.timer();
123+
120124
// use a stream that allows each sender to put in at
121125
// least one result in an attempt to maximize
122126
// parallelism.

datafusion/tests/sql.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2352,13 +2352,17 @@ async fn explain_analyze_baseline_metrics() {
23522352
register_aggregate_csv_by_sql(&mut ctx).await;
23532353
// a query with as many operators as we have metrics for
23542354
let sql = "EXPLAIN ANALYZE \
2355-
select count(*) from \
2356-
(SELECT count(*), c1 \
2357-
FROM aggregate_test_100 \
2358-
WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
2359-
GROUP BY c1 \
2360-
ORDER BY c1) \
2361-
LIMIT 1";
2355+
SELECT count(*) as cnt FROM \
2356+
(SELECT count(*), c1 \
2357+
FROM aggregate_test_100 \
2358+
WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
2359+
GROUP BY c1 \
2360+
ORDER BY c1 ) \
2361+
UNION ALL \
2362+
SELECT 1 as cnt \
2363+
UNION ALL \
2364+
SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) \
2365+
LIMIT 2";
23622366
println!("running query: {}", sql);
23632367
let plan = ctx.create_logical_plan(sql).unwrap();
23642368
let plan = ctx.optimize(&plan).unwrap();
@@ -2409,6 +2413,8 @@ async fn explain_analyze_baseline_metrics() {
24092413
"metrics=[output_rows=5, elapsed_compute"
24102414
);
24112415

2416+
assert!(false, "TODO: Test for cpaslce partitoon, union and window agg exec");
2417+
24122418
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
24132419
use datafusion::physical_plan;
24142420

@@ -2418,6 +2424,9 @@ async fn explain_analyze_baseline_metrics() {
24182424
|| plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
24192425
|| plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
24202426
|| plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
2427+
|| plan.as_any().downcast_ref::<physical_plan::coalesce_partitions::CoalescePartitionsExec>().is_some()
2428+
|| plan.as_any().downcast_ref::<physical_plan::union::UnionExec>().is_some()
2429+
|| plan.as_any().downcast_ref::<physical_plan::windows::WindowAggExec>().is_some()
24212430
|| plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
24222431
}
24232432

0 commit comments

Comments
 (0)