Skip to content

WIP: Test DataFusion with experimental IncrementalRecordBatchBuilder #16208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 19 additions & 44 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,33 @@ uninlined_format_args = "warn"
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unused_qualifications = "deny"


# Patch to pull in https://github.com/apache/arrow-rs/pull/7513
[patch.crates-io]
arrow = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-array = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-buffer = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-cast = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-data = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-ipc = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-schema = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-select = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-string = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-ord = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
arrow-flight = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
parquet = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/cache_filter_result" }
# /Users/andrewlamb/Software/arrow-rs
#arrow = { path= "/Users/andrewlamb/Software/arrow-rs/arrow" }
#arrow-array = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-array" }
#arrow-buffer = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-buffer" }
#arrow-cast = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-cast" }
#arrow-data = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-data" }
#arrow-ipc = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-ipc" }
#arrow-schema = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-schema" }
#arrow-select = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-select" }
#arrow-string = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-string" }
#arrow-ord = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-ord" }
#arrow-flight = { path= "/Users/andrewlamb/Software/arrow-rs/arrow-flight" }
#parquet = { path= "/Users/andrewlamb/Software/arrow-rs/parquet" }

109 changes: 91 additions & 18 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
Expand All @@ -38,6 +32,13 @@ use crate::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, ExecutionPlan,
};
use arrow::array::AsArray;
use arrow::compute::IncrementalRecordBatchBuilder;
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
Expand Down Expand Up @@ -393,6 +394,11 @@ impl ExecutionPlan for FilterExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// todo thread target size through
let batch_size = 8192;
let output_batch_builder =
IncrementalRecordBatchBuilder::try_new(self.schema(), batch_size)?;

trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
Expand All @@ -401,6 +407,7 @@ impl ExecutionPlan for FilterExec {
input: self.input.execute(partition, context)?,
baseline_metrics,
projection: self.projection.clone(),
output_batch_builder: Some(output_batch_builder),
}))
}

Expand Down Expand Up @@ -647,6 +654,12 @@ struct FilterExecStream {
baseline_metrics: BaselineMetrics,
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
/// The currently in progress output batch
///
/// This structure produces cleanly sized batches of target_size
///
/// When None, it means input is exhausted currently filtering
output_batch_builder: Option<IncrementalRecordBatchBuilder>,
}

pub fn batch_filter(
Expand Down Expand Up @@ -689,6 +702,46 @@ fn filter_and_project(
})
}

impl FilterExecStream {
/// Evaluates the predicate filter on the given batch and appends and rows that match
/// to the in progress output batch builder.
fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
self.predicate
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|filter| {
let Some(filter) = filter.as_boolean_opt() else {
return internal_err!(
"Cannot create filter_array from non-boolean predicates"
);
};

let batch = match self.projection.as_ref() {
Some(projection) => {
let projected_columns = projection
.iter()
.map(|i| Arc::clone(batch.column(*i)))
.collect();
// Safety -- the input was a valid RecordBatch and thus the projection is too
unsafe {
RecordBatch::new_unchecked(
Arc::clone(&self.schema),
projected_columns,
batch.num_rows(),
)
}
}
None => batch,
};
let output_batch_builder = self
.output_batch_builder
.as_mut()
.expect("output_batch_builder should be Some");
Ok(output_batch_builder.append_filtered(batch, filter)?)
Copy link
Contributor

@Dandandan Dandandan May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect it should be faster rather than incrementally build a new batch based on a number of arrays, to first evaluate a number of filters (self.predicate.evaluate(&batch) until the number of true values reaches the target batch size and then have a filter api to filter a list of batches. This avoids reallocations / copying as the target capacity can be calculated.
In order to avoid buffering too much batches probably have to limit this / create a batch anyway after x batches or having x megabytes in memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My feeling is this should to do this "multi batch filter" and then concat anyway if smaller batches are generated by this approach, rather than using a builder approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids reallocations / copying as the target capacity can be calculated.
In order to avoid buffering too much batches probably have to limit this / create a batch anyway after x batches or having x megabytes in memory.

I was trying to avoid having any reallocations in the IncrementalRecordBatchBuilder -- since we know the target output batch size (batch_size) it knows how much space each batch will take up front and can just straight up allocate it ( instantiate_builder function creates the builders with with_capacity)

However, now that I think about it, after a call to finish() the updated builder doesn't have the right allocation 🤔

I'll look into that more later today

Copy link
Contributor

@Dandandan Dandandan May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space each batch will take up front and can just straight up allocate

Hm yeah with primitive and views this might be okay - we might have to test other data sources though (with normal binary types were it can't be preallocated).

})
}
}

impl Stream for FilterExecStream {
type Item = Result<RecordBatch>;

Expand All @@ -698,23 +751,43 @@ impl Stream for FilterExecStream {
) -> Poll<Option<Self::Item>> {
let poll;
loop {
// No more input is done, no more batches to process, so done
let Some(output_batch_builder) = self.output_batch_builder.as_mut() else {
poll = Poll::Ready(None);
break;
};

// If we had a batch ready, return it
if let Some(batch) = output_batch_builder.next_batch() {
poll = Poll::Ready(Some(Ok(batch)));
break;
}

// poll next input batch
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = filter_and_project(
&batch,
&self.predicate,
self.projection.as_ref(),
&self.schema,
)?;
// do the actual work of filtering the batch
let time = self.baseline_metrics.elapsed_compute().clone(); // clone so we can reuse it but it shares the same underlying counter
let timer = time.timer();
self.filter_batch(batch)?;
timer.done();
// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
poll = Poll::Ready(Some(Ok(filtered_batch)));
continue; // Continue to the next batch
}
None => {
// end of input stream, finalize the output batch
let output_batch_builder = self
.output_batch_builder
.take()
.expect("output_batch_builder should be Some");
let mut completed_batches = output_batch_builder.build()?;
assert!(
completed_batches.len() <= 1,
"FilterExecStream should produce at most one batch"
);
poll = Poll::Ready(completed_batches.pop_front().map(Ok));
break;
}
// error
value => {
poll = Poll::Ready(value);
break;
Expand Down
Loading