@@ -186,15 +186,15 @@ impl FileOpener for ParquetOpener {
186
186
if let Ok ( rewritten) = filter_rewriter
187
187
. rewrite ( Arc :: clone ( & file_schema) , original_predicate)
188
188
{
189
- let mut filter_schema_builder = FilterSchemaBuilder :: new (
190
- & file_schema,
191
- & table_schema,
192
- ) ;
189
+ let mut filter_schema_builder =
190
+ FilterSchemaBuilder :: new ( & file_schema, & table_schema) ;
193
191
rewritten. visit ( & mut filter_schema_builder) ?;
194
192
filter_schema = filter_schema_builder. build ( ) ;
195
193
// If we rewrote the filter we need to recompute the pruning predicates to match the new filter.
196
- page_pruning_predicate =
197
- Some ( build_page_pruning_predicate ( & rewritten, & filter_schema) ) ;
194
+ page_pruning_predicate = Some ( build_page_pruning_predicate (
195
+ & rewritten,
196
+ & filter_schema,
197
+ ) ) ;
198
198
pruning_predicate = build_pruning_predicate (
199
199
rewritten. clone ( ) ,
200
200
& filter_schema,
@@ -409,24 +409,33 @@ impl<'schema> FilterSchemaBuilder<'schema> {
409
409
}
410
410
}
411
411
412
- fn sort_fields ( fields : & mut Vec < Arc < Field > > , table_schema : & Schema , file_schema : & Schema ) {
412
+ fn sort_fields (
413
+ fields : & mut Vec < Arc < Field > > ,
414
+ table_schema : & Schema ,
415
+ file_schema : & Schema ,
416
+ ) {
413
417
fields. sort_by_key ( |f| f. name ( ) . to_string ( ) ) ;
414
418
fields. dedup_by_key ( |f| f. name ( ) . to_string ( ) ) ;
415
419
fields. sort_by_key ( |f| {
416
- let table_schema_index = table_schema. index_of ( f. name ( ) ) . unwrap_or ( std:: usize:: MAX ) ;
417
- let file_schema_index = file_schema. index_of ( f. name ( ) ) . unwrap_or ( std:: usize:: MAX ) ;
420
+ let table_schema_index =
421
+ table_schema. index_of ( f. name ( ) ) . unwrap_or ( std:: usize:: MAX ) ;
422
+ let file_schema_index =
423
+ file_schema. index_of ( f. name ( ) ) . unwrap_or ( std:: usize:: MAX ) ;
418
424
( table_schema_index, file_schema_index)
419
425
} ) ;
420
426
}
421
427
422
428
fn build ( self ) -> SchemaRef {
423
429
let mut fields = self . filter_schema_fields . into_iter ( ) . collect :: < Vec < _ > > ( ) ;
424
- FilterSchemaBuilder :: sort_fields ( & mut fields, self . table_schema , self . file_schema ) ;
430
+ FilterSchemaBuilder :: sort_fields (
431
+ & mut fields,
432
+ self . table_schema ,
433
+ self . file_schema ,
434
+ ) ;
425
435
Arc :: new ( Schema :: new ( fields) )
426
436
}
427
437
}
428
438
429
-
430
439
impl < ' node > TreeNodeVisitor < ' node > for FilterSchemaBuilder < ' _ > {
431
440
type Node = Arc < dyn PhysicalExpr > ;
432
441
@@ -441,19 +450,33 @@ impl<'node> TreeNodeVisitor<'node> for FilterSchemaBuilder<'_> {
441
450
self . filter_schema_fields . insert ( Arc :: new ( field. clone ( ) ) ) ;
442
451
} else {
443
452
// valid fields are the table schema's fields + the file schema's fields, preferring the table schema's fields when there is a conflict
444
- let mut valid_fields = self . table_schema . fields ( ) . iter ( ) . chain ( self . file_schema . fields ( ) . iter ( ) ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
445
- FilterSchemaBuilder :: sort_fields ( & mut valid_fields, self . table_schema , self . file_schema ) ;
446
- let valid_fields = valid_fields. into_iter ( ) . map ( |f| datafusion_common:: Column :: new_unqualified ( f. name ( ) ) ) . collect ( ) ;
453
+ let mut valid_fields = self
454
+ . table_schema
455
+ . fields ( )
456
+ . iter ( )
457
+ . chain ( self . file_schema . fields ( ) . iter ( ) )
458
+ . cloned ( )
459
+ . collect :: < Vec < _ > > ( ) ;
460
+ FilterSchemaBuilder :: sort_fields (
461
+ & mut valid_fields,
462
+ self . table_schema ,
463
+ self . file_schema ,
464
+ ) ;
465
+ let valid_fields = valid_fields
466
+ . into_iter ( )
467
+ . map ( |f| datafusion_common:: Column :: new_unqualified ( f. name ( ) ) )
468
+ . collect ( ) ;
447
469
let field = datafusion_common:: Column :: new_unqualified ( column. name ( ) ) ;
448
- return Err (
449
- datafusion_common:: DataFusionError :: SchemaError (
450
- SchemaError :: FieldNotFound { field : Box :: new ( field) , valid_fields } ,
451
- Box :: new ( None ) ,
452
- )
453
- )
470
+ return Err ( datafusion_common:: DataFusionError :: SchemaError (
471
+ SchemaError :: FieldNotFound {
472
+ field : Box :: new ( field) ,
473
+ valid_fields,
474
+ } ,
475
+ Box :: new ( None ) ,
476
+ ) ) ;
454
477
}
455
478
}
456
479
457
480
Ok ( TreeNodeRecursion :: Continue )
458
481
}
459
- }
482
+ }
0 commit comments