Skip to content

Commit 8a0c97f

Browse files
committed
handle post-filtering within vortex source
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent 17a6089 commit 8a0c97f

File tree

3 files changed

+89
-89
lines changed

3 files changed

+89
-89
lines changed

vortex-datafusion/src/convert/exprs.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,6 @@ use vortex::scalar::Scalar;
3838
use crate::convert::FromDataFusion;
3939
use crate::convert::TryFromDataFusion;
4040

41-
/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty.
42-
pub(crate) fn make_vortex_predicate(
43-
predicate: &[&Arc<dyn PhysicalExpr>],
44-
) -> VortexResult<Option<Expression>> {
45-
let exprs = predicate
46-
.iter()
47-
.map(|e| Expression::try_from_df(e.as_ref()))
48-
.collect::<VortexResult<Vec<_>>>()?;
49-
50-
Ok(exprs.into_iter().reduce(and))
51-
}
52-
5341
// TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep
5442
// for that node, up to any `and` or `or` node.
5543
impl TryFromDataFusion<dyn PhysicalExpr> for Expression {

vortex-datafusion/src/persistent/opener.rs

Lines changed: 58 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@ use arrow_schema::SchemaRef;
1111
use datafusion_common::DataFusionError;
1212
use datafusion_common::Result as DFResult;
1313
use datafusion_common::arrow::array::RecordBatch;
14+
use datafusion_common::arrow::compute::filter_record_batch;
15+
use datafusion_common::cast::as_boolean_array;
1416
use datafusion_datasource::PartitionedFile;
1517
use datafusion_datasource::file_meta::FileMeta;
1618
use datafusion_datasource::file_stream::FileOpenFuture;
1719
use datafusion_datasource::file_stream::FileOpener;
1820
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
1921
use datafusion_physical_expr::PhysicalExprRef;
22+
use datafusion_physical_expr::conjunction;
2023
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
2124
use datafusion_physical_expr::split_conjunction;
2225
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
@@ -32,8 +35,9 @@ use object_store::path::Path;
3235
use tracing::Instrument;
3336
use vortex::dtype::FieldName;
3437
use vortex::error::VortexError;
38+
use vortex::expr;
39+
use vortex::expr::Expression;
3540
use vortex::expr::root;
36-
use vortex::expr::select;
3741
use vortex::layout::LayoutReader;
3842
use vortex::metrics::VortexMetrics;
3943
use vortex::scan::ScanBuilder;
@@ -42,8 +46,8 @@ use vortex_utils::aliases::dash_map::DashMap;
4246
use vortex_utils::aliases::dash_map::Entry;
4347

4448
use super::cache::VortexFileCache;
49+
use crate::convert::TryFromDataFusion;
4550
use crate::convert::exprs::can_be_pushed_down;
46-
use crate::convert::exprs::make_vortex_predicate;
4751
use crate::convert::ranges::apply_byte_range;
4852

4953
#[derive(Clone)]
@@ -228,6 +232,9 @@ impl FileOpener for VortexOpener {
228232
DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}"))
229233
})?);
230234

235+
let logical_file_schema =
236+
compute_logical_file_schema(&physical_file_schema, &logical_schema);
237+
231238
if let Some(expr_adapter_factory) = expr_adapter_factory {
232239
let partition_values = partition_fields
233240
.iter()
@@ -239,13 +246,8 @@ impl FileOpener for VortexOpener {
239246
// for schema evolution and divergence between the table's schema and individual files.
240247
filter = filter
241248
.map(|filter| {
242-
let logical_file_schema = compute_logical_file_schema(
243-
&physical_file_schema.clone(),
244-
&logical_schema,
245-
);
246-
247249
let expr = expr_adapter_factory
248-
.create(logical_file_schema, physical_file_schema.clone())
250+
.create(logical_file_schema.clone(), physical_file_schema.clone())
249251
.with_partition_values(partition_values)
250252
.rewrite(filter)?;
251253

@@ -261,52 +263,19 @@ impl FileOpener for VortexOpener {
261263
// Create the initial mapping from physical file schema to projected schema.
262264
// This gives us the field reordering and tells us which logical schema fields
263265
// to select.
264-
let (_schema_mapping, adapted_projections) =
265-
schema_adapter.map_schema(&physical_file_schema)?;
266+
let (schema_mapping, adapted_projections) =
267+
schema_adapter.map_schema(&logical_file_schema)?;
266268

267269
// Build the Vortex projection expression using the adapted projections.
268270
// This will reorder the fields to match the target order.
269271
let fields = adapted_projections
270272
.iter()
271-
.map(|idx| {
272-
let field = logical_schema.field(*idx);
273+
.map(|&idx| {
274+
let field = logical_file_schema.field(idx);
273275
FieldName::from(field.name().as_str())
274276
})
275277
.collect::<Vec<_>>();
276-
let projection_expr = select(fields, root());
277-
278-
// After Vortex applies the projection, the batch will have fields in the target
279-
// order (matching adapted_projections), but with the physical file types.
280-
// We need a second schema mapping for type casting only, not reordering.
281-
// Build a schema that represents what Vortex will return: fields in target order
282-
// with physical types.
283-
let projected_physical_fields: Vec<Field> = adapted_projections
284-
.iter()
285-
.map(|&idx| {
286-
let logical_field = logical_schema.field(idx);
287-
let field_name = logical_field.name();
288-
289-
// Find this field in the physical schema to get its physical type
290-
physical_file_schema
291-
.field_with_name(field_name)
292-
.map(|phys_field| {
293-
Field::new(
294-
field_name,
295-
merge_field_types(phys_field, logical_field),
296-
phys_field.is_nullable(),
297-
)
298-
})
299-
.unwrap_or_else(|_| (*logical_field).clone())
300-
})
301-
.collect();
302-
303-
let projected_physical_schema =
304-
Arc::new(arrow_schema::Schema::new(projected_physical_fields));
305-
306-
// Create a second mapping from the projected physical schema (what Vortex returns)
307-
// to the final projected schema. This mapping will handle type casting without reordering.
308-
let (batch_schema_mapping, _) =
309-
schema_adapter.map_schema(&projected_physical_schema)?;
278+
let projection_expr = expr::select(fields, root());
310279

311280
// We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
312281
let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) {
@@ -346,35 +315,57 @@ impl FileOpener for VortexOpener {
346315
);
347316
}
348317

349-
let filter = filter
350-
.and_then(|f| {
351-
let exprs = split_conjunction(&f)
352-
.into_iter()
353-
.filter(|expr| can_be_pushed_down(expr, &predicate_file_schema))
354-
.collect::<Vec<_>>();
318+
// Split the filter expressions into those that can be applied within the file scan,
319+
// and those that need to be applied afterward in-memory.
320+
let mut pushed_filters = Vec::new();
321+
let mut post_filters = Vec::new();
322+
323+
if let Some(filter) = filter {
324+
for expr in split_conjunction(&filter) {
325+
if can_be_pushed_down(expr, &predicate_file_schema)
326+
&& let Ok(vortex_expr) = Expression::try_from_df(expr.as_ref())
327+
{
328+
pushed_filters.push(vortex_expr);
329+
} else {
330+
post_filters.push(expr.clone());
331+
}
332+
}
333+
}
355334

356-
make_vortex_predicate(&exprs).transpose()
357-
})
358-
.transpose()
359-
.map_err(|e| DataFusionError::External(e.into()))?;
335+
let pushed_filter = pushed_filters.into_iter().reduce(expr::and);
336+
let post_filter: Box<dyn FnMut(DFResult<RecordBatch>) -> DFResult<RecordBatch> + Send> =
337+
if post_filters.is_empty() {
338+
Box::new(|batch: DFResult<RecordBatch>| batch)
339+
} else {
340+
let conjunction = conjunction(post_filters.clone());
341+
Box::new(
342+
move |batch: DFResult<RecordBatch>| -> DFResult<RecordBatch> {
343+
let batch = batch?;
344+
let filter = conjunction.evaluate(&batch)?;
345+
let filter = filter.into_array(batch.num_rows())?;
346+
let filter = as_boolean_array(&filter)?;
347+
filter_record_batch(&batch, filter).map_err(DataFusionError::from)
348+
},
349+
)
350+
};
360351

361352
tracing::debug!(
362-
?filter,
363-
?projection,
353+
?pushed_filter,
354+
?post_filters,
364355
?projection_expr,
365-
"opening file with predicate and projection"
356+
"opening file with predicates and projection"
366357
);
367358

368359
if let Some(limit) = limit
369-
&& filter.is_none()
360+
&& pushed_filter.is_none()
370361
{
371362
scan_builder = scan_builder.with_limit(limit);
372363
}
373364

374365
let stream = scan_builder
375366
.with_metrics(metrics)
376367
.with_projection(projection_expr)
377-
.with_some_filter(filter)
368+
.with_some_filter(pushed_filter)
378369
.with_ordered(has_output_ordering)
379370
.map(|chunk| RecordBatch::try_from(chunk.as_ref()))
380371
.into_stream()
@@ -409,7 +400,12 @@ impl FileOpener for VortexOpener {
409400
))))
410401
})
411402
.try_flatten()
412-
.map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b)))
403+
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
404+
// Apply the post-filter step, which will execute any filters that couldn't
405+
// be pushed down for this file. This is applicable for any filters over fields
406+
// missing from the file schema that exist in the table schema, and are filled in
407+
// from the schema adapter.
408+
.map(post_filter)
413409
.boxed();
414410

415411
Ok(stream)

vortex-datafusion/src/persistent/source.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion_physical_plan::DisplayFormatType;
2424
use datafusion_physical_plan::PhysicalExpr;
2525
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
2626
use datafusion_physical_plan::filter_pushdown::PushedDown;
27+
use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
2728
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
2829
use object_store::ObjectStore;
2930
use object_store::path::Path;
@@ -233,8 +234,6 @@ impl FileSource for VortexSource {
233234
filters: Vec<Arc<dyn PhysicalExpr>>,
234235
_config: &ConfigOptions,
235236
) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
236-
let num_filters = filters.len();
237-
238237
if filters.is_empty() {
239238
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
240239
vec![],
@@ -262,28 +261,45 @@ impl FileSource for VortexSource {
262261
// Update the predicate with any pushed filters
263262
let supported_filters = filters
264263
.into_iter()
265-
.filter(|expr| can_be_pushed_down(expr, schema))
264+
.map(|expr| {
265+
if can_be_pushed_down(&expr, schema) {
266+
PushedDownPredicate::supported(expr)
267+
} else {
268+
PushedDownPredicate::unsupported(expr)
269+
}
270+
})
266271
.collect::<Vec<_>>();
267272

273+
if supported_filters
274+
.iter()
275+
.all(|p| matches!(p.discriminant, PushedDown::No))
276+
{
277+
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
278+
vec![PushedDown::No; supported_filters.len()],
279+
)
280+
.with_updated_node(Arc::new(source) as _));
281+
}
282+
283+
let supported = supported_filters
284+
.iter()
285+
.filter_map(|p| match p.discriminant {
286+
PushedDown::Yes => Some(&p.predicate),
287+
PushedDown::No => None,
288+
})
289+
.cloned();
290+
268291
let predicate = match source.pushed_predicate {
269-
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported_filters)),
270-
None => conjunction(supported_filters),
292+
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
293+
None => conjunction(supported),
271294
};
272295

273296
tracing::debug!(%predicate, "updating predicate with new filters");
274297

275298
source.pushed_predicate = Some(predicate);
276299

277-
// NOTE: we always report no pushdown to DataFusion, which forces it to postfilter our
278-
// results. Due to schema evolution and schema adapters/expression adapters, we can't
279-
// guarantee that filters over missing columns can be executed directly in Vortex.
280-
//
281-
// But, we still return the updated source node so that the filters are used for
282-
// zone map pruning.
283-
Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
284-
PushedDown::No;
285-
num_filters
286-
])
300+
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
301+
supported_filters.iter().map(|f| f.discriminant).collect(),
302+
)
287303
.with_updated_node(Arc::new(source) as _))
288304
}
289305

0 commit comments

Comments
 (0)