Skip to content

Commit

Permalink
Add coalescing into FilterExec
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 25, 2024
1 parent 5e59804 commit fdfd2cc
Showing 1 changed file with 34 additions and 21 deletions.
55 changes: 34 additions & 21 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
Expand All @@ -44,6 +44,7 @@ use datafusion_physical_expr::{
analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr,
};

use crate::coalescer::BatchCoalescer;
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -278,10 +279,12 @@ impl ExecutionPlan for FilterExec {
trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
schema: self.input.schema(),
done: false,
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
baseline_metrics,
// TODO use actual target batch size, for now hardcode the default size
coalescer: BatchCoalescer::new(self.input.schema(), 8192),
}))
}

Expand Down Expand Up @@ -336,14 +339,16 @@ fn collect_new_statistics(
/// The FilterExec streams wraps the input iterator and applies the predicate expression to
/// determine which rows to include in its output batches
struct FilterExecStream {
/// Output schema, which is the same as the input schema for this operator
schema: SchemaRef,
/// Is the sstream done?
done: bool,
/// The expression to filter on. This expression must evaluate to a boolean value.
predicate: Arc<dyn PhysicalExpr>,
/// The input partition to filter.
input: SendableRecordBatchStream,
/// runtime metrics recording
baseline_metrics: BaselineMetrics,
/// Build up output batches incrementally
coalescer: BatchCoalescer,
}

pub(crate) fn batch_filter(
Expand Down Expand Up @@ -373,28 +378,36 @@ impl Stream for FilterExecStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
let poll;
loop {
match self.input.poll_next_unpin(cx) {
Poll::Ready(value) => match value {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = batch_filter(&batch, &self.predicate)?;
// skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
// clone timer so we can borrow self mutably
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
let timer = elapsed_compute.timer();
let filtered_batch = batch_filter(&batch, &self.predicate)?;
if let Some(batch) = self.coalescer.push_batch(filtered_batch)? {
timer.done();
poll = Poll::Ready(Some(Ok(filtered_batch)));
poll = Poll::Ready(Some(Ok(batch)));
break;
}
_ => {
poll = Poll::Ready(value);
break;
// otherwise still need more rows to coalesce
else {
continue;
}
},
Poll::Pending => {
poll = Poll::Pending;
}
// end of input, see if we have any remaining batches
None => {
self.done = true;
let maybe_batch = self.coalescer.finish().transpose();
poll = Poll::Ready(maybe_batch);
break;
}
err => {
poll = Poll::Ready(err);
break;
}
}
Expand All @@ -410,7 +423,7 @@ impl Stream for FilterExecStream {

impl RecordBatchStream for FilterExecStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
self.coalescer.schema()
}
}

Expand Down

0 comments on commit fdfd2cc

Please sign in to comment.