@@ -506,107 +506,6 @@ fn schema() -> SchemaRef {
506506 Arc :: clone ( & TEST_SCHEMA )
507507}
508508
509- #[ tokio:: test]
510- async fn test_hashjoin_parent_filter_pushdown ( ) {
511- use datafusion_common:: JoinType ;
512- use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
513-
514- // Create build side with limited values
515- let build_batches = vec ! [ record_batch!(
516- ( "a" , Utf8 , [ "aa" , "ab" ] ) ,
517- ( "b" , Utf8 , [ "ba" , "bb" ] ) ,
518- ( "c" , Float64 , [ 1.0 , 2.0 ] )
519- )
520- . unwrap( ) ] ;
521- let build_side_schema = Arc :: new ( Schema :: new ( vec ! [
522- Field :: new( "a" , DataType :: Utf8 , false ) ,
523- Field :: new( "b" , DataType :: Utf8 , false ) ,
524- Field :: new( "c" , DataType :: Float64 , false ) ,
525- ] ) ) ;
526- let build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_side_schema) )
527- . with_support ( true )
528- . with_batches ( build_batches)
529- . build ( ) ;
530-
531- // Create probe side with more values
532- let probe_batches = vec ! [ record_batch!(
533- ( "d" , Utf8 , [ "aa" , "ab" , "ac" , "ad" ] ) ,
534- ( "e" , Utf8 , [ "ba" , "bb" , "bc" , "bd" ] ) ,
535- ( "f" , Float64 , [ 1.0 , 2.0 , 3.0 , 4.0 ] )
536- )
537- . unwrap( ) ] ;
538- let probe_side_schema = Arc :: new ( Schema :: new ( vec ! [
539- Field :: new( "d" , DataType :: Utf8 , false ) ,
540- Field :: new( "e" , DataType :: Utf8 , false ) ,
541- Field :: new( "f" , DataType :: Float64 , false ) ,
542- ] ) ) ;
543- let probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_side_schema) )
544- . with_support ( true )
545- . with_batches ( probe_batches)
546- . build ( ) ;
547-
548- // Create HashJoinExec
549- let on = vec ! [ (
550- col( "a" , & build_side_schema) . unwrap( ) ,
551- col( "d" , & probe_side_schema) . unwrap( ) ,
552- ) ] ;
553- let join = Arc :: new (
554- HashJoinExec :: try_new (
555- build_scan,
556- probe_scan,
557- on,
558- None ,
559- & JoinType :: Inner ,
560- None ,
561- PartitionMode :: Partitioned ,
562- datafusion_common:: NullEquality :: NullEqualsNothing ,
563- )
564- . unwrap ( ) ,
565- ) ;
566-
567- // Create filters that can be pushed down to different sides
568- // We need to create filters in the context of the join output schema
569- let join_schema = join. schema ( ) ;
570-
571- // Filter on build side column: a = 'aa'
572- let left_filter = col_lit_predicate ( "a" , "aa" , & join_schema) ;
573- // Filter on probe side column: e = 'ba'
574- let right_filter = col_lit_predicate ( "e" , "ba" , & join_schema) ;
575- // Filter that references both sides: a = d (should not be pushed down)
576- let cross_filter = Arc :: new ( BinaryExpr :: new (
577- col ( "a" , & join_schema) . unwrap ( ) ,
578- Operator :: Eq ,
579- col ( "d" , & join_schema) . unwrap ( ) ,
580- ) ) as Arc < dyn PhysicalExpr > ;
581-
582- let filter =
583- Arc :: new ( FilterExec :: try_new ( left_filter, Arc :: clone ( & join) as _ ) . unwrap ( ) ) ;
584- let filter = Arc :: new ( FilterExec :: try_new ( right_filter, filter) . unwrap ( ) ) ;
585- let plan = Arc :: new ( FilterExec :: try_new ( cross_filter, filter) . unwrap ( ) )
586- as Arc < dyn ExecutionPlan > ;
587-
588- // Test that filters are pushed down correctly to each side of the join
589- insta:: assert_snapshot!(
590- OptimizationTest :: new( Arc :: clone( & plan) , FilterPushdown :: new( ) , true ) ,
591- @r"
592- OptimizationTest:
593- input:
594- - FilterExec: a@0 = d@3
595- - FilterExec: e@4 = ba
596- - FilterExec: a@0 = aa
597- - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
598- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
599- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
600- output:
601- Ok:
602- - FilterExec: e@4 = ba AND a@0 = d@3 AND a@0 = aa
603- - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
604- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
605- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba
606- "
607- ) ;
608- }
609-
610509/// Returns a predicate that is a binary expression col = lit
611510fn col_lit_predicate (
612511 column_name : & str ,
0 commit comments