Skip to content

Commit 4c0d430

Browse files
authored
Add metrics for FilterExec (#960)
1 parent dd5f936 commit 4c0d430

File tree

2 files changed

+35
-4
lines changed

2 files changed

+35
-4
lines changed

datafusion/src/physical_plan/filter.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::task::{Context, Poll};
2626
use super::{RecordBatchStream, SendableRecordBatchStream};
2727
use crate::error::{DataFusionError, Result};
2828
use crate::physical_plan::{
29+
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
2930
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
3031
};
3132
use arrow::array::BooleanArray;
@@ -46,6 +47,8 @@ pub struct FilterExec {
4647
predicate: Arc<dyn PhysicalExpr>,
4748
/// The input plan
4849
input: Arc<dyn ExecutionPlan>,
50+
/// Execution metrics
51+
metrics: ExecutionPlanMetricsSet,
4952
}
5053

5154
impl FilterExec {
@@ -58,6 +61,7 @@ impl FilterExec {
5861
DataType::Boolean => Ok(Self {
5962
predicate,
6063
input: input.clone(),
64+
metrics: ExecutionPlanMetricsSet::new(),
6165
}),
6266
other => Err(DataFusionError::Plan(format!(
6367
"Filter predicate must return boolean values, not {:?}",
@@ -115,10 +119,13 @@ impl ExecutionPlan for FilterExec {
115119
}
116120

117121
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
122+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
123+
118124
Ok(Box::pin(FilterExecStream {
119125
schema: self.input.schema().clone(),
120126
predicate: self.predicate.clone(),
121127
input: self.input.execute(partition).await?,
128+
baseline_metrics,
122129
}))
123130
}
124131

@@ -133,6 +140,10 @@ impl ExecutionPlan for FilterExec {
133140
}
134141
}
135142
}
143+
144+
fn metrics(&self) -> Option<MetricsSet> {
145+
Some(self.metrics.clone_inner())
146+
}
136147
}
137148

138149
/// The FilterExec streams wraps the input iterator and applies the predicate expression to
@@ -144,6 +155,8 @@ struct FilterExecStream {
144155
predicate: Arc<dyn PhysicalExpr>,
145156
/// The input partition to filter.
146157
input: SendableRecordBatchStream,
158+
/// runtime metrics recording
159+
baseline_metrics: BaselineMetrics,
147160
}
148161

149162
fn batch_filter(
@@ -176,10 +189,16 @@ impl Stream for FilterExecStream {
176189
mut self: Pin<&mut Self>,
177190
cx: &mut Context<'_>,
178191
) -> Poll<Option<Self::Item>> {
179-
self.input.poll_next_unpin(cx).map(|x| match x {
180-
Some(Ok(batch)) => Some(batch_filter(&batch, &self.predicate)),
192+
let poll = self.input.poll_next_unpin(cx).map(|x| match x {
193+
Some(Ok(batch)) => {
194+
let timer = self.baseline_metrics.elapsed_compute().timer();
195+
let filtered_batch = batch_filter(&batch, &self.predicate);
196+
timer.done();
197+
Some(filtered_batch)
198+
}
181199
other => other,
182-
})
200+
});
201+
self.baseline_metrics.record_poll(poll)
183202
}
184203

185204
fn size_hint(&self) -> (usize, Option<usize>) {

datafusion/tests/sql.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2247,7 +2247,14 @@ async fn explain_analyze_baseline_metrics() {
22472247
let mut ctx = ExecutionContext::with_config(config);
22482248
register_aggregate_csv_by_sql(&mut ctx).await;
22492249
// a query with as many operators as we have metrics for
2250-
let sql = "EXPLAIN ANALYZE select count(*) from (SELECT count(*), c1 FROM aggregate_test_100 group by c1 ORDER BY c1)";
2250+
let sql = "EXPLAIN ANALYZE \
2251+
select count(*) from \
2252+
(SELECT count(*), c1 \
2253+
FROM aggregate_test_100 \
2254+
WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
2255+
GROUP BY c1 \
2256+
ORDER BY c1)";
2257+
println!("running query: {}", sql);
22512258
let plan = ctx.create_logical_plan(sql).unwrap();
22522259
let plan = ctx.optimize(&plan).unwrap();
22532260
let physical_plan = ctx.create_physical_plan(&plan).unwrap();
@@ -2275,6 +2282,11 @@ async fn explain_analyze_baseline_metrics() {
22752282
"SortExec: [c1@0 ASC]",
22762283
"metrics=[output_rows=5, elapsed_compute="
22772284
);
2285+
assert_metrics!(
2286+
&formatted,
2287+
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
2288+
"metrics=[output_rows=99, elapsed_compute="
2289+
);
22782290

22792291
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
22802292
use datafusion::physical_plan::{

0 commit comments

Comments
 (0)