@@ -122,7 +122,123 @@ fn test_pushdown_into_scan_with_config_options() {
122122}
123123
124124#[ tokio:: test]
125- async fn test_hashjoin_parent_filter_pushdown ( ) {
125+ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk ( ) {
126+ use datafusion_common:: JoinType ;
127+ use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
128+
129+ // Create build side with limited values
130+ let build_batches = vec ! [ record_batch!(
131+ ( "a" , Utf8 , [ "aa" , "ab" ] ) ,
132+ ( "b" , Utf8 , [ "ba" , "bb" ] ) ,
133+ ( "c" , Float64 , [ 1.0 , 2.0 ] )
134+ )
135+ . unwrap( ) ] ;
136+ let build_side_schema = Arc :: new ( Schema :: new ( vec ! [
137+ Field :: new( "a" , DataType :: Utf8 , false ) ,
138+ Field :: new( "b" , DataType :: Utf8 , false ) ,
139+ Field :: new( "c" , DataType :: Float64 , false ) ,
140+ ] ) ) ;
141+ let build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_side_schema) )
142+ . with_support ( true )
143+ . with_batches ( build_batches)
144+ . build ( ) ;
145+
146+ // Create probe side with more values
147+ let probe_batches = vec ! [ record_batch!(
148+ ( "d" , Utf8 , [ "aa" , "ab" , "ac" , "ad" ] ) ,
149+ ( "e" , Utf8 , [ "ba" , "bb" , "bc" , "bd" ] ) ,
150+ ( "f" , Float64 , [ 1.0 , 2.0 , 3.0 , 4.0 ] )
151+ )
152+ . unwrap( ) ] ;
153+ let probe_side_schema = Arc :: new ( Schema :: new ( vec ! [
154+ Field :: new( "d" , DataType :: Utf8 , false ) ,
155+ Field :: new( "e" , DataType :: Utf8 , false ) ,
156+ Field :: new( "f" , DataType :: Float64 , false ) ,
157+ ] ) ) ;
158+ let probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_side_schema) )
159+ . with_support ( true )
160+ . with_batches ( probe_batches)
161+ . build ( ) ;
162+
163+ // Create HashJoinExec
164+ let on = vec ! [ (
165+ col( "a" , & build_side_schema) . unwrap( ) ,
166+ col( "d" , & probe_side_schema) . unwrap( ) ,
167+ ) ] ;
168+ let join = Arc :: new (
169+ HashJoinExec :: try_new (
170+ build_scan,
171+ probe_scan,
172+ on,
173+ None ,
174+ & JoinType :: Inner ,
175+ None ,
176+ PartitionMode :: Partitioned ,
177+ datafusion_common:: NullEquality :: NullEqualsNothing ,
178+ )
179+ . unwrap ( ) ,
180+ ) ;
181+
182+ let join_schema = join. schema ( ) ;
183+
184+ // Finally let's add a SortExec on the outside to test pushdown of dynamic filters
185+ let sort_expr =
186+ PhysicalSortExpr :: new ( col ( "e" , & join_schema) . unwrap ( ) , SortOptions :: default ( ) ) ;
187+ let plan = Arc :: new (
188+ SortExec :: new ( LexOrdering :: new ( vec ! [ sort_expr] ) . unwrap ( ) , join)
189+ . with_fetch ( Some ( 2 ) ) ,
190+ ) as Arc < dyn ExecutionPlan > ;
191+
192+ let mut config = ConfigOptions :: default ( ) ;
193+ config. optimizer . enable_dynamic_filter_pushdown = true ;
194+ config. execution . parquet . pushdown_filters = true ;
195+
196+ // Appy the FilterPushdown optimizer rule
197+ let plan = FilterPushdown :: new_post_optimization ( )
198+ . optimize ( Arc :: clone ( & plan) , & config)
199+ . unwrap ( ) ;
200+
201+ // Test that filters are pushed down correctly to each side of the join
202+ insta:: assert_snapshot!(
203+ format_plan_for_test( & plan) ,
204+ @r"
205+ - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
206+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
207+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
208+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
209+ "
210+ ) ;
211+
212+ // Put some data through the plan to check that the filter is updated to reflect the TopK state
213+ let session_ctx = SessionContext :: new_with_config ( SessionConfig :: new ( ) ) ;
214+ session_ctx. register_object_store (
215+ ObjectStoreUrl :: parse ( "test://" ) . unwrap ( ) . as_ref ( ) ,
216+ Arc :: new ( InMemory :: new ( ) ) ,
217+ ) ;
218+ let state = session_ctx. state ( ) ;
219+ let task_ctx = state. task_ctx ( ) ;
220+ let mut stream = plan. execute ( 0 , Arc :: clone ( & task_ctx) ) . unwrap ( ) ;
221+ // Iterate one batch
222+ stream. next ( ) . await . unwrap ( ) . unwrap ( ) ;
223+
224+ // Test that filters are pushed down correctly to each side of the join
225+ insta:: assert_snapshot!(
226+ format_plan_for_test( & plan) ,
227+ @r"
228+ - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
229+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
230+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
231+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
232+ "
233+ ) ;
234+ }
235+
236+ // Test both static and dynamic filter pushdown in HashJoinExec.
237+ // Note that static filter pushdown is rare: it should have already happened in the logical optimizer phase.
238+ // However users may manually construct plans that could result in a FilterExec -> HashJoinExec -> Scan setup.
239+ // Dynamic filters arise in cases such as nested inner joins or TopK -> HashJoinExec -> Scan setups.
240+ #[ tokio:: test]
241+ async fn test_static_filter_pushdown_through_hash_join ( ) {
126242 use datafusion_common:: JoinType ;
127243 use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
128244
@@ -245,7 +361,8 @@ async fn test_hashjoin_parent_filter_pushdown() {
245361
246362 let join_schema = join. schema ( ) ;
247363 let filter = col_lit_predicate ( "a" , "aa" , & join_schema) ;
248- let plan = Arc :: new ( FilterExec :: try_new ( filter, join) . unwrap ( ) ) ;
364+ let plan =
365+ Arc :: new ( FilterExec :: try_new ( filter, join) . unwrap ( ) ) as Arc < dyn ExecutionPlan > ;
249366
250367 // Test that filters are NOT pushed down for left join
251368 insta:: assert_snapshot!(
0 commit comments