@@ -52,18 +52,20 @@ use datafusion_common::{
5252use datafusion_execution:: {
5353 object_store:: ObjectStoreUrl , SendableRecordBatchStream , TaskContext ,
5454} ;
55- use datafusion_physical_expr:: expressions:: Column ;
55+ use datafusion_physical_expr:: { expressions:: Column , utils :: reassign_predicate_columns } ;
5656use datafusion_physical_expr:: { EquivalenceProperties , Partitioning } ;
5757use datafusion_physical_expr_adapter:: PhysicalExprAdapterFactory ;
5858use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
5959use datafusion_physical_expr_common:: sort_expr:: { LexOrdering , PhysicalSortExpr } ;
60- use datafusion_physical_plan:: filter_pushdown:: FilterPushdownPropagation ;
6160use datafusion_physical_plan:: {
6261 display:: { display_orderings, ProjectSchemaDisplay } ,
6362 metrics:: ExecutionPlanMetricsSet ,
6463 projection:: { all_alias_free_columns, new_projections_for_columns, ProjectionExec } ,
6564 DisplayAs , DisplayFormatType , ExecutionPlan ,
6665} ;
66+ use datafusion_physical_plan:: {
67+ filter:: collect_columns_from_predicate, filter_pushdown:: FilterPushdownPropagation ,
68+ } ;
6769
6870use datafusion_physical_plan:: coop:: cooperative;
6971use datafusion_physical_plan:: execution_plan:: SchedulingType ;
@@ -577,8 +579,31 @@ impl DataSource for FileScanConfig {
577579
578580 fn eq_properties ( & self ) -> EquivalenceProperties {
579581 let ( schema, constraints, _, orderings) = self . project ( ) ;
580- EquivalenceProperties :: new_with_orderings ( schema, orderings)
581- . with_constraints ( constraints)
582+ let mut eq_properties =
583+ EquivalenceProperties :: new_with_orderings ( Arc :: clone ( & schema) , orderings)
584+ . with_constraints ( constraints) ;
585+ if let Some ( filter) = self . file_source . filter ( ) {
586+ // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
587+ // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
588+ match reassign_predicate_columns ( filter, & schema, true ) {
589+ Ok ( filter) => {
590+ match Self :: add_filter_equivalence_info ( filter, & mut eq_properties) {
591+ Ok ( ( ) ) => { }
592+ Err ( e) => {
593+ warn ! ( "Failed to add filter equivalence info: {e}" ) ;
594+ #[ cfg( debug_assertions) ]
595+ panic ! ( "Failed to add filter equivalence info: {e}" ) ;
596+ }
597+ }
598+ }
599+ Err ( e) => {
600+ warn ! ( "Failed to reassign predicate columns: {e}" ) ;
601+ #[ cfg( debug_assertions) ]
602+ panic ! ( "Failed to reassign predicate columns: {e}" ) ;
603+ }
604+ } ;
605+ }
606+ eq_properties
582607 }
583608
584609 fn scheduling_type ( & self ) -> SchedulingType {
@@ -724,6 +749,17 @@ impl FileScanConfig {
724749 ) )
725750 }
726751
752+ fn add_filter_equivalence_info (
753+ filter : Arc < dyn PhysicalExpr > ,
754+ eq_properties : & mut EquivalenceProperties ,
755+ ) -> Result < ( ) > {
756+ let ( equal_pairs, _) = collect_columns_from_predicate ( & filter) ;
757+ for ( lhs, rhs) in equal_pairs {
758+ eq_properties. add_equal_conditions ( Arc :: clone ( lhs) , Arc :: clone ( rhs) ) ?
759+ }
760+ Ok ( ( ) )
761+ }
762+
727763 pub fn projected_constraints ( & self ) -> Constraints {
728764 let indexes = self . projection_indices ( ) ;
729765 self . constraints . project ( & indexes) . unwrap_or_default ( )
0 commit comments