@@ -27,21 +27,31 @@ use datafusion::datasource::physical_plan::ParquetSource;
2727use datafusion:: datasource:: physical_plan:: {
2828 ArrowSource , CsvSource , FileSource , JsonSource ,
2929} ;
30+ use datafusion:: logical_expr:: { col, lit} ;
3031use datafusion:: physical_plan:: ExecutionPlan ;
31- use datafusion:: prelude:: SessionContext ;
32+ use datafusion:: prelude:: { SessionConfig , SessionContext } ;
3233use datafusion_common:: config:: CsvOptions ;
3334use datafusion_common:: tree_node:: { Transformed , TransformedResult , TreeNode } ;
3435use datafusion_common:: { ColumnStatistics , ScalarValue } ;
3536use datafusion_datasource:: file_scan_config:: FileScanConfigBuilder ;
3637use datafusion_datasource:: schema_adapter:: {
3738 SchemaAdapter , SchemaAdapterFactory , SchemaMapper ,
3839} ;
40+
41+ use datafusion:: assert_batches_eq;
3942use datafusion_datasource:: source:: DataSourceExec ;
4043use datafusion_datasource:: TableSchema ;
4144use datafusion_execution:: object_store:: ObjectStoreUrl ;
45+ use datafusion_expr:: Expr ;
4246use datafusion_physical_expr:: expressions:: Column ;
47+ use datafusion_physical_expr:: planner:: logical2physical;
48+ use datafusion_physical_expr:: projection:: ProjectionExprs ;
4349use datafusion_physical_expr_adapter:: { PhysicalExprAdapter , PhysicalExprAdapterFactory } ;
4450use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
51+ use datafusion_physical_optimizer:: {
52+ filter_pushdown:: FilterPushdown , OptimizerContext , PhysicalOptimizerRule ,
53+ } ;
54+ use datafusion_physical_plan:: filter:: FilterExec ;
4555use object_store:: { memory:: InMemory , path:: Path , ObjectStore } ;
4656use parquet:: arrow:: ArrowWriter ;
4757
@@ -214,12 +224,24 @@ impl PhysicalExprAdapter for UppercasePhysicalExprAdapter {
214224 }
215225}
216226
217- /// Test reading a Parquet file where the table schema is flipped (c, b, a) vs. the physical file schema (a, b, c)
227+ fn push_down_filters (
228+ plan : Arc < dyn ExecutionPlan > ,
229+ filter : Expr ,
230+ ) -> Result < Arc < dyn ExecutionPlan > > {
231+ let filter_expr = logical2physical ( & filter, & plan. schema ( ) ) ;
232+ let plan = Arc :: new ( FilterExec :: try_new ( filter_expr, plan) ?) ;
233+ let cfg = SessionConfig :: new ( )
234+ . set_str ( "datafusion.execution.parquet.pushdown_filters" , "true" ) ;
235+ let optimizer_context = OptimizerContext :: new ( cfg) ;
236+ let optimizer = FilterPushdown :: new ( ) ;
237+ optimizer. optimize_plan ( plan, & optimizer_context)
238+ }
239+
240+ /// Test reading and filtering a Parquet file where the table schema is flipped (c, b, a) vs. the physical file schema (a, b, c)
218241#[ cfg( feature = "parquet" ) ]
219242#[ tokio:: test]
220243async fn test_parquet_flipped_projection ( ) -> Result < ( ) > {
221244 // Create test data
222- use datafusion:: assert_batches_eq;
223245 let batch = RecordBatch :: try_new (
224246 Arc :: new ( Schema :: new ( vec ! [
225247 Field :: new( "a" , DataType :: Int32 , false ) ,
@@ -284,10 +306,7 @@ async fn test_parquet_flipped_projection() -> Result<()> {
284306 assert_batches_eq ! ( expected, & batches) ;
285307
286308 // And now with a projection applied that selects (`b`, `a`)
287- let projection = datafusion_physical_expr:: projection:: ProjectionExprs :: from_indices (
288- & [ 1 , 2 ] ,
289- & table_schema,
290- ) ;
309+ let projection = ProjectionExprs :: from_indices ( & [ 1 , 2 ] , & table_schema) ;
291310 let source = Arc :: new ( ParquetSource :: new ( table_schema. clone ( ) ) )
292311 . try_pushdown_projection ( & projection)
293312 . unwrap ( )
@@ -300,7 +319,7 @@ async fn test_parquet_flipped_projection() -> Result<()> {
300319 let exec = DataSourceExec :: from_data_source ( config) ;
301320 // Collect results
302321 let task_ctx = ctx. task_ctx ( ) ;
303- let stream = exec. execute ( 0 , task_ctx) ?;
322+ let stream = exec. execute ( 0 , task_ctx. clone ( ) ) ?;
304323 let batches = datafusion:: physical_plan:: common:: collect ( stream) . await ?;
305324 // There should be one batch
306325 assert_eq ! ( batches. len( ) , 1 ) ;
@@ -316,6 +335,27 @@ async fn test_parquet_flipped_projection() -> Result<()> {
316335 ] ;
317336 assert_batches_eq ! ( expected, & batches) ;
318337
338+ // And with a filter on `b`, `a`
339+ // a = 1 or b != 'foo' and a = 3 -> matches [{a=1,b=x},{b=z,a=3}]
340+ let filter = col ( "a" )
341+ . eq ( lit ( 1 ) )
342+ . or ( col ( "b" ) . not_eq ( lit ( "foo" ) ) . and ( col ( "a" ) . eq ( lit ( 3 ) ) ) ) ;
343+ let exec = push_down_filters ( exec, filter) . unwrap ( ) ;
344+ let stream = exec. execute ( 0 , task_ctx) ?;
345+ let batches = datafusion:: physical_plan:: common:: collect ( stream) . await ?;
346+ // There should be one batch
347+ assert_eq ! ( batches. len( ) , 1 ) ;
348+ #[ rustfmt:: skip]
349+ let expected = [
350+ "+---+---+" ,
351+ "| b | a |" ,
352+ "+---+---+" ,
353+ "| x | 1 |" ,
354+ "| z | 3 |" ,
355+ "+---+---+" ,
356+ ] ;
357+ assert_batches_eq ! ( expected, & batches) ;
358+
319359 Ok ( ( ) )
320360}
321361
@@ -325,8 +365,6 @@ async fn test_parquet_flipped_projection() -> Result<()> {
325365#[ tokio:: test]
326366async fn test_parquet_missing_column ( ) -> Result < ( ) > {
327367 // Create test data with columns (a, c)
328- use datafusion:: assert_batches_eq;
329- use datafusion_physical_expr:: projection:: ProjectionExprs ;
330368 let batch = RecordBatch :: try_new (
331369 Arc :: new ( Schema :: new ( vec ! [
332370 Field :: new( "a" , DataType :: Int32 , false ) ,
@@ -402,7 +440,7 @@ async fn test_parquet_missing_column() -> Result<()> {
402440 let exec = DataSourceExec :: from_data_source ( config) ;
403441 // Collect results
404442 let task_ctx = ctx. task_ctx ( ) ;
405- let stream = exec. execute ( 0 , task_ctx) ?;
443+ let stream = exec. execute ( 0 , task_ctx. clone ( ) ) ?;
406444 let batches = datafusion:: physical_plan:: common:: collect ( stream) . await ?;
407445 // There should be one batch
408446 assert_eq ! ( batches. len( ) , 1 ) ;
@@ -418,6 +456,27 @@ async fn test_parquet_missing_column() -> Result<()> {
418456 ] ;
419457 assert_batches_eq ! ( expected, & batches) ;
420458
459+ // And with a filter on a, b
460+ // a = 1 or b is null and a = 3
461+ let filter = col ( "a" )
462+ . eq ( lit ( 1 ) )
463+ . or ( col ( "b" ) . is_null ( ) . and ( col ( "a" ) . eq ( lit ( 3 ) ) ) ) ;
464+ let exec = push_down_filters ( exec, filter) . unwrap ( ) ;
465+ let stream = exec. execute ( 0 , task_ctx. clone ( ) ) ?;
466+ let batches = datafusion:: physical_plan:: common:: collect ( stream) . await ?;
467+ // There should be one batch
468+ assert_eq ! ( batches. len( ) , 1 ) ;
469+ #[ rustfmt:: skip]
470+ let expected = [
471+ "+-----+---+---+" ,
472+ "| c | a | b |" ,
473+ "+-----+---+---+" ,
474+ "| 1.1 | 1 | |" ,
475+ "| 3.3 | 3 | |" ,
476+ "+-----+---+---+" ,
477+ ] ;
478+ assert_batches_eq ! ( expected, & batches) ;
479+
421480 Ok ( ( ) )
422481}
423482
0 commit comments