Skip to content

Commit ad998ff

Browse files
committed
Instrument WindowAggExec, UnionExec
Fixup tests and implelementation
1 parent cdd24f1 commit ad998ff

File tree

3 files changed

+128
-24
lines changed

3 files changed

+128
-24
lines changed

datafusion/src/physical_plan/union.rs

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,36 @@
2323
2424
use std::{any::Any, sync::Arc};
2525

26-
use arrow::datatypes::SchemaRef;
26+
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
27+
use futures::StreamExt;
2728

2829
use super::{
29-
ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning,
30+
metrics::{ExecutionPlanMetricsSet, MetricsSet},
31+
ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
3032
SendableRecordBatchStream, Statistics,
3133
};
32-
use crate::{error::Result, physical_plan::expressions};
34+
use crate::{
35+
error::Result,
36+
physical_plan::{expressions, metrics::BaselineMetrics},
37+
};
3338
use async_trait::async_trait;
3439

3540
/// UNION ALL execution plan
3641
#[derive(Debug)]
3742
pub struct UnionExec {
3843
/// Input execution plan
3944
inputs: Vec<Arc<dyn ExecutionPlan>>,
45+
/// Execution metrics
46+
metrics: ExecutionPlanMetricsSet,
4047
}
4148

4249
impl UnionExec {
4350
/// Create a new UnionExec
4451
pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
45-
UnionExec { inputs }
52+
UnionExec {
53+
inputs,
54+
metrics: ExecutionPlanMetricsSet::new(),
55+
}
4656
}
4757
}
4858

@@ -82,11 +92,18 @@ impl ExecutionPlan for UnionExec {
8292
}
8393

8494
async fn execute(&self, mut partition: usize) -> Result<SendableRecordBatchStream> {
95+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
96+
// record the tiny amount of work done in this function so
97+
// elapsed_compute is reported as non zero
98+
let timer = baseline_metrics.elapsed_compute().timer();
99+
85100
// find partition to execute
86101
for input in self.inputs.iter() {
87102
// Calculate whether partition belongs to the current partition
88103
if partition < input.output_partitioning().partition_count() {
89-
return input.execute(partition).await;
104+
let stream = input.execute(partition).await?;
105+
drop(timer);
106+
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
90107
} else {
91108
partition -= input.output_partitioning().partition_count();
92109
}
@@ -110,6 +127,10 @@ impl ExecutionPlan for UnionExec {
110127
}
111128
}
112129

130+
fn metrics(&self) -> Option<MetricsSet> {
131+
Some(self.metrics.clone_inner())
132+
}
133+
113134
fn statistics(&self) -> Statistics {
114135
self.inputs
115136
.iter()
@@ -119,6 +140,40 @@ impl ExecutionPlan for UnionExec {
119140
}
120141
}
121142

143+
/// Stream wrapper that records `BaselineMetrics` for a particular
144+
/// partition
145+
struct ObservedStream {
146+
inner: SendableRecordBatchStream,
147+
baseline_metrics: BaselineMetrics,
148+
}
149+
150+
impl ObservedStream {
151+
fn new(inner: SendableRecordBatchStream, baseline_metrics: BaselineMetrics) -> Self {
152+
Self {
153+
inner,
154+
baseline_metrics,
155+
}
156+
}
157+
}
158+
159+
impl RecordBatchStream for ObservedStream {
160+
fn schema(&self) -> arrow::datatypes::SchemaRef {
161+
self.inner.schema()
162+
}
163+
}
164+
165+
impl futures::Stream for ObservedStream {
166+
type Item = arrow::error::Result<RecordBatch>;
167+
168+
fn poll_next(
169+
mut self: std::pin::Pin<&mut Self>,
170+
cx: &mut std::task::Context<'_>,
171+
) -> std::task::Poll<Option<Self::Item>> {
172+
let poll = self.inner.poll_next_unpin(cx);
173+
self.baseline_metrics.record_poll(poll)
174+
}
175+
}
176+
122177
fn col_stats_union(
123178
mut left: ColumnStatistics,
124179
right: ColumnStatistics,

datafusion/src/physical_plan/windows/window_agg_exec.rs

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
//! Stream and channel implementations for window function expressions.
1919
2020
use crate::error::{DataFusionError, Result};
21+
use crate::physical_plan::metrics::{
22+
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
23+
};
2124
use crate::physical_plan::{
2225
common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
2326
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
@@ -30,7 +33,7 @@ use arrow::{
3033
};
3134
use async_trait::async_trait;
3235
use futures::stream::Stream;
33-
use futures::Future;
36+
use futures::FutureExt;
3437
use pin_project_lite::pin_project;
3538
use std::any::Any;
3639
use std::pin::Pin;
@@ -48,6 +51,8 @@ pub struct WindowAggExec {
4851
schema: SchemaRef,
4952
/// Schema before the window
5053
input_schema: SchemaRef,
54+
/// Execution metrics
55+
metrics: ExecutionPlanMetricsSet,
5156
}
5257

5358
impl WindowAggExec {
@@ -59,11 +64,12 @@ impl WindowAggExec {
5964
) -> Result<Self> {
6065
let schema = create_schema(&input_schema, &window_expr)?;
6166
let schema = Arc::new(schema);
62-
Ok(WindowAggExec {
67+
Ok(Self {
6368
input,
6469
window_expr,
6570
schema,
6671
input_schema,
72+
metrics: ExecutionPlanMetricsSet::new(),
6773
})
6874
}
6975

@@ -140,6 +146,7 @@ impl ExecutionPlan for WindowAggExec {
140146
self.schema.clone(),
141147
self.window_expr.clone(),
142148
input,
149+
BaselineMetrics::new(&self.metrics, partition),
143150
));
144151
Ok(stream)
145152
}
@@ -163,6 +170,10 @@ impl ExecutionPlan for WindowAggExec {
163170
Ok(())
164171
}
165172

173+
fn metrics(&self) -> Option<MetricsSet> {
174+
Some(self.metrics.clone_inner())
175+
}
176+
166177
fn statistics(&self) -> Statistics {
167178
let input_stat = self.input.statistics();
168179
let win_cols = self.window_expr.len();
@@ -214,6 +225,7 @@ pin_project! {
214225
#[pin]
215226
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
216227
finished: bool,
228+
baseline_metrics: BaselineMetrics,
217229
}
218230
}
219231

@@ -223,31 +235,41 @@ impl WindowAggStream {
223235
schema: SchemaRef,
224236
window_expr: Vec<Arc<dyn WindowExpr>>,
225237
input: SendableRecordBatchStream,
238+
baseline_metrics: BaselineMetrics,
226239
) -> Self {
227240
let (tx, rx) = futures::channel::oneshot::channel();
228241
let schema_clone = schema.clone();
242+
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
229243
tokio::spawn(async move {
230244
let schema = schema_clone.clone();
231-
let result = WindowAggStream::process(input, window_expr, schema).await;
245+
let result =
246+
WindowAggStream::process(input, window_expr, schema, elapsed_compute)
247+
.await;
232248
tx.send(result)
233249
});
234250

235251
Self {
252+
schema,
236253
output: rx,
237254
finished: false,
238-
schema,
255+
baseline_metrics,
239256
}
240257
}
241258

242259
async fn process(
243260
input: SendableRecordBatchStream,
244261
window_expr: Vec<Arc<dyn WindowExpr>>,
245262
schema: SchemaRef,
263+
elapsed_compute: crate::physical_plan::metrics::Time,
246264
) -> ArrowResult<RecordBatch> {
247265
let input_schema = input.schema();
248266
let batches = common::collect(input)
249267
.await
250268
.map_err(DataFusionError::into_arrow_external_error)?;
269+
270+
// record compute time on drop
271+
let _timer = elapsed_compute.timer();
272+
251273
let batch = common::combine_batches(&batches, input_schema.clone())?;
252274
if let Some(batch) = batch {
253275
// calculate window cols
@@ -267,18 +289,31 @@ impl WindowAggStream {
267289
impl Stream for WindowAggStream {
268290
type Item = ArrowResult<RecordBatch>;
269291

270-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
292+
fn poll_next(
293+
mut self: Pin<&mut Self>,
294+
cx: &mut Context<'_>,
295+
) -> Poll<Option<Self::Item>> {
296+
let poll = self.poll_next_inner(cx);
297+
self.baseline_metrics.record_poll(poll)
298+
}
299+
}
300+
301+
impl WindowAggStream {
302+
#[inline]
303+
fn poll_next_inner(
304+
self: &mut Pin<&mut Self>,
305+
cx: &mut Context<'_>,
306+
) -> Poll<Option<ArrowResult<RecordBatch>>> {
271307
if self.finished {
272308
return Poll::Ready(None);
273309
}
274310

275311
// is the output ready?
276-
let this = self.project();
277-
let output_poll = this.output.poll(cx);
312+
let output_poll = self.output.poll_unpin(cx);
278313

279314
match output_poll {
280315
Poll::Ready(result) => {
281-
*this.finished = true;
316+
self.finished = true;
282317
// check for error in receiving channel and unwrap actual result
283318
let result = match result {
284319
Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving

datafusion/tests/sql.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2362,7 +2362,7 @@ async fn explain_analyze_baseline_metrics() {
23622362
SELECT 1 as cnt \
23632363
UNION ALL \
23642364
SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) \
2365-
LIMIT 2";
2365+
LIMIT 3";
23662366
println!("running query: {}", sql);
23672367
let plan = ctx.create_logical_plan(sql).unwrap();
23682368
let plan = ctx.optimize(&plan).unwrap();
@@ -2372,11 +2372,6 @@ async fn explain_analyze_baseline_metrics() {
23722372
println!("Query Output:\n\n{}", formatted);
23732373
let formatted = normalize_for_explain(&formatted);
23742374

2375-
assert_metrics!(
2376-
&formatted,
2377-
"CoalescePartitionsExec",
2378-
"metrics=[output_rows=5, elapsed_compute=NOT RECORDED"
2379-
);
23802375
assert_metrics!(
23812376
&formatted,
23822377
"HashAggregateExec: mode=Partial, gby=[]",
@@ -2389,7 +2384,7 @@ async fn explain_analyze_baseline_metrics() {
23892384
);
23902385
assert_metrics!(
23912386
&formatted,
2392-
"SortExec: [c1@0 ASC]",
2387+
"SortExec: [c1@1 ASC]",
23932388
"metrics=[output_rows=5, elapsed_compute="
23942389
);
23952390
assert_metrics!(
@@ -2399,9 +2394,14 @@ async fn explain_analyze_baseline_metrics() {
23992394
);
24002395
assert_metrics!(
24012396
&formatted,
2402-
"GlobalLimitExec: limit=1, ",
2397+
"GlobalLimitExec: limit=3, ",
24032398
"metrics=[output_rows=1, elapsed_compute="
24042399
);
2400+
assert_metrics!(
2401+
&formatted,
2402+
"LocalLimitExec: limit=3",
2403+
"metrics=[output_rows=3, elapsed_compute="
2404+
);
24052405
assert_metrics!(
24062406
&formatted,
24072407
"ProjectionExec: expr=[COUNT(UInt8(1))",
@@ -2412,8 +2412,21 @@ async fn explain_analyze_baseline_metrics() {
24122412
"CoalesceBatchesExec: target_batch_size=4096",
24132413
"metrics=[output_rows=5, elapsed_compute"
24142414
);
2415-
2416-
assert!(false, "TODO: Test for cpaslce partitoon, union and window agg exec");
2415+
assert_metrics!(
2416+
&formatted,
2417+
"CoalescePartitionsExec",
2418+
"metrics=[output_rows=5, elapsed_compute="
2419+
);
2420+
assert_metrics!(
2421+
&formatted,
2422+
"UnionExec",
2423+
"metrics=[output_rows=3, elapsed_compute="
2424+
);
2425+
assert_metrics!(
2426+
&formatted,
2427+
"WindowAggExec",
2428+
"metrics=[output_rows=1, elapsed_compute="
2429+
);
24172430

24182431
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
24192432
use datafusion::physical_plan;
@@ -2422,12 +2435,13 @@ async fn explain_analyze_baseline_metrics() {
24222435
|| plan.as_any().downcast_ref::<physical_plan::hash_aggregate::HashAggregateExec>().is_some()
24232436
// CoalescePartitionsExec doesn't do any work so is not included
24242437
|| plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
2438+
|| plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
2439+
|| plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
24252440
|| plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
24262441
|| plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
24272442
|| plan.as_any().downcast_ref::<physical_plan::coalesce_partitions::CoalescePartitionsExec>().is_some()
24282443
|| plan.as_any().downcast_ref::<physical_plan::union::UnionExec>().is_some()
24292444
|| plan.as_any().downcast_ref::<physical_plan::windows::WindowAggExec>().is_some()
2430-
|| plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
24312445
}
24322446

24332447
// Validate that the recorded elapsed compute time was more than

0 commit comments

Comments
 (0)