@@ -11,12 +11,15 @@ use arrow_schema::SchemaRef;
1111use datafusion_common:: DataFusionError ;
1212use datafusion_common:: Result as DFResult ;
1313use datafusion_common:: arrow:: array:: RecordBatch ;
14+ use datafusion_common:: arrow:: compute:: filter_record_batch;
15+ use datafusion_common:: cast:: as_boolean_array;
1416use datafusion_datasource:: PartitionedFile ;
1517use datafusion_datasource:: file_meta:: FileMeta ;
1618use datafusion_datasource:: file_stream:: FileOpenFuture ;
1719use datafusion_datasource:: file_stream:: FileOpener ;
1820use datafusion_datasource:: schema_adapter:: SchemaAdapterFactory ;
1921use datafusion_physical_expr:: PhysicalExprRef ;
22+ use datafusion_physical_expr:: conjunction;
2023use datafusion_physical_expr:: simplifier:: PhysicalExprSimplifier ;
2124use datafusion_physical_expr:: split_conjunction;
2225use datafusion_physical_expr_adapter:: PhysicalExprAdapterFactory ;
@@ -32,8 +35,9 @@ use object_store::path::Path;
3235use tracing:: Instrument ;
3336use vortex:: dtype:: FieldName ;
3437use vortex:: error:: VortexError ;
38+ use vortex:: expr;
39+ use vortex:: expr:: Expression ;
3540use vortex:: expr:: root;
36- use vortex:: expr:: select;
3741use vortex:: layout:: LayoutReader ;
3842use vortex:: metrics:: VortexMetrics ;
3943use vortex:: scan:: ScanBuilder ;
@@ -42,8 +46,8 @@ use vortex_utils::aliases::dash_map::DashMap;
4246use vortex_utils:: aliases:: dash_map:: Entry ;
4347
4448use super :: cache:: VortexFileCache ;
49+ use crate :: convert:: TryFromDataFusion ;
4550use crate :: convert:: exprs:: can_be_pushed_down;
46- use crate :: convert:: exprs:: make_vortex_predicate;
4751use crate :: convert:: ranges:: apply_byte_range;
4852
4953#[ derive( Clone ) ]
@@ -228,6 +232,9 @@ impl FileOpener for VortexOpener {
228232 DataFusionError :: Execution ( format ! ( "Failed to convert file schema to arrow: {e}" ) )
229233 } ) ?) ;
230234
235+ let logical_file_schema =
236+ compute_logical_file_schema ( & physical_file_schema, & logical_schema) ;
237+
231238 if let Some ( expr_adapter_factory) = expr_adapter_factory {
232239 let partition_values = partition_fields
233240 . iter ( )
@@ -239,13 +246,8 @@ impl FileOpener for VortexOpener {
239246 // for schema evolution and divergence between the table's schema and individual files.
240247 filter = filter
241248 . map ( |filter| {
242- let logical_file_schema = compute_logical_file_schema (
243- & physical_file_schema. clone ( ) ,
244- & logical_schema,
245- ) ;
246-
247249 let expr = expr_adapter_factory
248- . create ( logical_file_schema, physical_file_schema. clone ( ) )
250+ . create ( logical_file_schema. clone ( ) , physical_file_schema. clone ( ) )
249251 . with_partition_values ( partition_values)
250252 . rewrite ( filter) ?;
251253
@@ -261,52 +263,19 @@ impl FileOpener for VortexOpener {
261263 // Create the initial mapping from physical file schema to projected schema.
262264 // This gives us the field reordering and tells us which logical schema fields
263265 // to select.
264- let ( _schema_mapping , adapted_projections) =
265- schema_adapter. map_schema ( & physical_file_schema ) ?;
266+ let ( schema_mapping , adapted_projections) =
267+ schema_adapter. map_schema ( & logical_file_schema ) ?;
266268
267269 // Build the Vortex projection expression using the adapted projections.
268270 // This will reorder the fields to match the target order.
269271 let fields = adapted_projections
270272 . iter ( )
271- . map ( |idx| {
272- let field = logical_schema . field ( * idx) ;
273+ . map ( |& idx| {
274+ let field = logical_file_schema . field ( idx) ;
273275 FieldName :: from ( field. name ( ) . as_str ( ) )
274276 } )
275277 . collect :: < Vec < _ > > ( ) ;
276- let projection_expr = select ( fields, root ( ) ) ;
277-
278- // After Vortex applies the projection, the batch will have fields in the target
279- // order (matching adapted_projections), but with the physical file types.
280- // We need a second schema mapping for type casting only, not reordering.
281- // Build a schema that represents what Vortex will return: fields in target order
282- // with physical types.
283- let projected_physical_fields: Vec < Field > = adapted_projections
284- . iter ( )
285- . map ( |& idx| {
286- let logical_field = logical_schema. field ( idx) ;
287- let field_name = logical_field. name ( ) ;
288-
289- // Find this field in the physical schema to get its physical type
290- physical_file_schema
291- . field_with_name ( field_name)
292- . map ( |phys_field| {
293- Field :: new (
294- field_name,
295- merge_field_types ( phys_field, logical_field) ,
296- phys_field. is_nullable ( ) ,
297- )
298- } )
299- . unwrap_or_else ( |_| ( * logical_field) . clone ( ) )
300- } )
301- . collect ( ) ;
302-
303- let projected_physical_schema =
304- Arc :: new ( arrow_schema:: Schema :: new ( projected_physical_fields) ) ;
305-
306- // Create a second mapping from the projected physical schema (what Vortex returns)
307- // to the final projected schema. This mapping will handle type casting without reordering.
308- let ( batch_schema_mapping, _) =
309- schema_adapter. map_schema ( & projected_physical_schema) ?;
278+ let projection_expr = expr:: select ( fields, root ( ) ) ;
310279
311280 // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
312281 let layout_reader = match layout_reader. entry ( file_meta. object_meta . location . clone ( ) ) {
@@ -346,35 +315,57 @@ impl FileOpener for VortexOpener {
346315 ) ;
347316 }
348317
349- let filter = filter
350- . and_then ( |f| {
351- let exprs = split_conjunction ( & f)
352- . into_iter ( )
353- . filter ( |expr| can_be_pushed_down ( expr, & predicate_file_schema) )
354- . collect :: < Vec < _ > > ( ) ;
318+ // Split the filter expressions into those that can be applied within the file scan,
319+ // and those that need to be applied afterward in-memory.
320+ let mut pushed_filters = Vec :: new ( ) ;
321+ let mut post_filters = Vec :: new ( ) ;
322+
323+ if let Some ( filter) = filter {
324+ for expr in split_conjunction ( & filter) {
325+ if can_be_pushed_down ( expr, & predicate_file_schema)
326+ && let Ok ( vortex_expr) = Expression :: try_from_df ( expr. as_ref ( ) )
327+ {
328+ pushed_filters. push ( vortex_expr) ;
329+ } else {
330+ post_filters. push ( expr. clone ( ) ) ;
331+ }
332+ }
333+ }
355334
356- make_vortex_predicate ( & exprs) . transpose ( )
357- } )
358- . transpose ( )
359- . map_err ( |e| DataFusionError :: External ( e. into ( ) ) ) ?;
335+ let pushed_filter = pushed_filters. into_iter ( ) . reduce ( expr:: and) ;
336+ let post_filter: Box < dyn FnMut ( DFResult < RecordBatch > ) -> DFResult < RecordBatch > + Send > =
337+ if post_filters. is_empty ( ) {
338+ Box :: new ( |batch : DFResult < RecordBatch > | batch)
339+ } else {
340+ let conjunction = conjunction ( post_filters. clone ( ) ) ;
341+ Box :: new (
342+ move |batch : DFResult < RecordBatch > | -> DFResult < RecordBatch > {
343+ let batch = batch?;
344+ let filter = conjunction. evaluate ( & batch) ?;
345+ let filter = filter. into_array ( batch. num_rows ( ) ) ?;
346+ let filter = as_boolean_array ( & filter) ?;
347+ filter_record_batch ( & batch, filter) . map_err ( DataFusionError :: from)
348+ } ,
349+ )
350+ } ;
360351
361352 tracing:: debug!(
362- ?filter ,
363- ?projection ,
353+ ?pushed_filter ,
354+ ?post_filters ,
364355 ?projection_expr,
365- "opening file with predicate and projection"
356+ "opening file with predicates and projection"
366357 ) ;
367358
368359 if let Some ( limit) = limit
369- && filter . is_none ( )
360+ && pushed_filter . is_none ( )
370361 {
371362 scan_builder = scan_builder. with_limit ( limit) ;
372363 }
373364
374365 let stream = scan_builder
375366 . with_metrics ( metrics)
376367 . with_projection ( projection_expr)
377- . with_some_filter ( filter )
368+ . with_some_filter ( pushed_filter )
378369 . with_ordered ( has_output_ordering)
379370 . map ( |chunk| RecordBatch :: try_from ( chunk. as_ref ( ) ) )
380371 . into_stream ( )
@@ -409,7 +400,12 @@ impl FileOpener for VortexOpener {
409400 ) ) ) )
410401 } )
411402 . try_flatten ( )
412- . map ( move |batch| batch. and_then ( |b| batch_schema_mapping. map_batch ( b) ) )
403+ . map ( move |batch| batch. and_then ( |b| schema_mapping. map_batch ( b) ) )
404+ // Apply the post-filter step, which will execute any filters that couldn't
405+ // be pushed down for this file. This is applicable for any filters over fields
406+ // missing from the file schema that exist in the table schema, and are filled in
407+ // from the schema adapter.
408+ . map ( post_filter)
413409 . boxed ( ) ;
414410
415411 Ok ( stream)
0 commit comments