@@ -28,15 +28,13 @@ use datafusion_common::{
2828 JoinConstraint , Result ,
2929} ;
3030use datafusion_expr:: expr_rewriter:: replace_col;
31- use datafusion_expr:: logical_plan:: {
32- CrossJoin , Join , JoinType , LogicalPlan , TableScan , Union ,
33- } ;
31+ use datafusion_expr:: logical_plan:: { CrossJoin , Join , JoinType , LogicalPlan , Union } ;
3432use datafusion_expr:: utils:: {
3533 conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
3634} ;
3735use datafusion_expr:: {
3836 and, build_join_schema, or, BinaryExpr , Expr , Filter , LogicalPlanBuilder , Operator ,
39- Projection , TableProviderFilterPushDown ,
37+ Projection , TableProviderFilterPushDown , TableScan ,
4038} ;
4139
4240use crate :: optimizer:: ApplyOrder ;
@@ -897,23 +895,103 @@ impl OptimizerRule for PushDownFilter {
897895 . map ( |( pred, _) | pred) ;
898896 let new_scan_filters: Vec < Expr > =
899897 new_scan_filters. unique ( ) . cloned ( ) . collect ( ) ;
898+
899+ let source_schema = scan. source . schema ( ) ;
900+ let mut additional_projection = HashSet :: new ( ) ;
900901 let new_predicate: Vec < Expr > = zip
901- . filter ( |( _, res) | res != & TableProviderFilterPushDown :: Exact )
902+ . filter ( |( expr, res) | {
903+ if * res == TableProviderFilterPushDown :: Exact {
904+ return false ;
905+ }
906+ expr. apply ( |expr| {
907+ if let Expr :: Column ( column) = expr {
908+ if let Ok ( idx) = source_schema. index_of ( column. name ( ) ) {
909+ if scan
910+ . projection
911+ . as_ref ( )
912+ . is_some_and ( |p| !p. contains ( & idx) )
913+ {
914+ additional_projection. insert ( idx) ;
915+ }
916+ }
917+ }
918+ Ok ( TreeNodeRecursion :: Continue )
919+ } )
920+ . unwrap ( ) ;
921+ true
922+ } )
902923 . map ( |( pred, _) | pred. clone ( ) )
903924 . collect ( ) ;
904925
905- let new_scan = LogicalPlan :: TableScan ( TableScan {
906- filters : new_scan_filters,
907- ..scan
908- } ) ;
909-
910- Transformed :: yes ( new_scan) . transform_data ( |new_scan| {
911- if let Some ( predicate) = conjunction ( new_predicate) {
912- make_filter ( predicate, Arc :: new ( new_scan) ) . map ( Transformed :: yes)
926+ // Wraps with a filter if some filters are not supported exactly.
927+ let filtered = move |plan| {
928+ if let Some ( new_predicate) = conjunction ( new_predicate) {
929+ Filter :: try_new ( new_predicate, Arc :: new ( plan) )
930+ . map ( LogicalPlan :: Filter )
913931 } else {
914- Ok ( Transformed :: no ( new_scan ) )
932+ Ok ( plan )
915933 }
916- } )
934+ } ;
935+
936+ if additional_projection. is_empty ( ) {
937+ // No additional projection is required.
938+ let new_scan = LogicalPlan :: TableScan ( TableScan {
939+ filters : new_scan_filters,
940+ ..scan
941+ } ) ;
942+ return filtered ( new_scan) . map ( Transformed :: yes) ;
943+ }
944+
945+ let scan_table_name = & scan. table_name ;
946+ let new_scan = filtered (
947+ LogicalPlanBuilder :: scan_with_filters_fetch (
948+ scan_table_name. clone ( ) ,
949+ Arc :: clone ( & scan. source ) ,
950+ scan. projection . clone ( ) . map ( |mut projection| {
951+ // Extend a projection.
952+ projection. extend ( additional_projection) ;
953+ projection
954+ } ) ,
955+ new_scan_filters,
956+ scan. fetch ,
957+ ) ?
958+ . build ( ) ?,
959+ ) ?;
960+
961+ // Project fields required by the initial projection.
962+ let new_plan = LogicalPlan :: Projection ( Projection :: try_new_with_schema (
963+ scan. projection
964+ . as_ref ( )
965+ . map ( |projection| {
966+ projection
967+ . into_iter ( )
968+ . cloned ( )
969+ . map ( |idx| {
970+ Expr :: Column ( Column :: new (
971+ Some ( scan_table_name. clone ( ) ) ,
972+ source_schema. field ( idx) . name ( ) ,
973+ ) )
974+ } )
975+ . collect ( )
976+ } )
977+ . unwrap_or_else ( || {
978+ source_schema
979+ . fields ( )
980+ . iter ( )
981+ . map ( |field| {
982+ Expr :: Column ( Column :: new (
983+ Some ( scan_table_name. clone ( ) ) ,
984+ field. name ( ) ,
985+ ) )
986+ } )
987+ . collect ( )
988+ } ) ,
989+ Arc :: new ( new_scan) ,
990+ // Preserve a projected schema.
991+ scan. projected_schema ,
992+ ) ?) ;
993+
994+ Ok ( Transformed :: yes ( new_plan) )
917995 }
918996 LogicalPlan :: Extension ( extension_plan) => {
919997 let prevent_cols =
@@ -1206,8 +1284,8 @@ mod tests {
12061284 use datafusion_expr:: logical_plan:: table_scan;
12071285 use datafusion_expr:: {
12081286 col, in_list, in_subquery, lit, ColumnarValue , Extension , ScalarUDF ,
1209- ScalarUDFImpl , Signature , TableSource , TableType , UserDefinedLogicalNodeCore ,
1210- Volatility ,
1287+ ScalarUDFImpl , Signature , TableScan , TableSource , TableType ,
1288+ UserDefinedLogicalNodeCore , Volatility ,
12111289 } ;
12121290
12131291 use crate :: optimizer:: Optimizer ;
@@ -2452,6 +2530,34 @@ mod tests {
24522530 . build ( )
24532531 }
24542532
2533+ #[ test]
2534+ fn projection_is_updated_when_filter_becomes_unsupported ( ) -> Result < ( ) > {
2535+ let test_provider = PushDownProvider {
2536+ filter_support : TableProviderFilterPushDown :: Unsupported ,
2537+ } ;
2538+
2539+ let projeted_schema = test_provider. schema ( ) . project ( & [ 0 ] ) ?;
2540+ let table_scan = LogicalPlan :: TableScan ( TableScan {
2541+ table_name : "test" . into ( ) ,
2542+ // Emulate that there were pushed filters but now
2543+ // provider cannot support it.
2544+ filters : vec ! [ col( "b" ) . eq( lit( 1i64 ) ) ] ,
2545+ projected_schema : Arc :: new ( DFSchema :: try_from ( projeted_schema) ?) ,
2546+ projection : Some ( vec ! [ 0 ] ) ,
2547+ source : Arc :: new ( test_provider) ,
2548+ fetch : None ,
2549+ } ) ;
2550+
2551+ let plan = LogicalPlanBuilder :: from ( table_scan)
2552+ . filter ( col ( "a" ) . eq ( lit ( 1i64 ) ) ) ?
2553+ . build ( ) ?;
2554+
2555+ let expected = "Projection: test.a\
2556+ \n Filter: a = Int64(1) AND b = Int64(1)\
2557+ \n TableScan: test projection=[a, b]";
2558+ assert_optimized_plan_eq ( plan, expected)
2559+ }
2560+
24552561 #[ test]
24562562 fn filter_with_table_provider_exact ( ) -> Result < ( ) > {
24572563 let plan = table_scan_with_pushdown_provider ( TableProviderFilterPushDown :: Exact ) ?;
@@ -2514,7 +2620,7 @@ mod tests {
25142620 projected_schema : Arc :: new ( DFSchema :: try_from (
25152621 ( * test_provider. schema ( ) ) . clone ( ) ,
25162622 ) ?) ,
2517- projection : Some ( vec ! [ 0 ] ) ,
2623+ projection : Some ( vec ! [ 0 , 1 ] ) ,
25182624 source : Arc :: new ( test_provider) ,
25192625 fetch : None ,
25202626 } ) ;
@@ -2526,7 +2632,7 @@ mod tests {
25262632
25272633 let expected = "Projection: a, b\
25282634 \n Filter: a = Int64(10) AND b > Int64(11)\
2529- \n TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]";
2635+ \n TableScan: test projection=[a, b ], partial_filters=[a = Int64(10), b > Int64(11)]";
25302636
25312637 assert_optimized_plan_eq ( plan, expected)
25322638 }
0 commit comments