@@ -43,6 +43,7 @@ mod tests {
4343 } ;
4444 use arrow:: datatypes:: { DataType , Field , Fields , Schema , SchemaBuilder } ;
4545 use arrow:: record_batch:: RecordBatch ;
46+ use arrow:: util:: pretty:: pretty_format_batches;
4647 use arrow_schema:: SchemaRef ;
4748 use bytes:: { BufMut , BytesMut } ;
4849 use datafusion_common:: config:: TableParquetOptions ;
@@ -61,8 +62,9 @@ mod tests {
6162 use datafusion_execution:: object_store:: ObjectStoreUrl ;
6263 use datafusion_expr:: { col, lit, when, Expr } ;
6364 use datafusion_physical_expr:: planner:: logical2physical;
65+ use datafusion_physical_plan:: analyze:: AnalyzeExec ;
66+ use datafusion_physical_plan:: collect;
6467 use datafusion_physical_plan:: metrics:: { ExecutionPlanMetricsSet , MetricsSet } ;
65- use datafusion_physical_plan:: { collect, displayable} ;
6668 use datafusion_physical_plan:: { ExecutionPlan , ExecutionPlanProperties } ;
6769
6870 use chrono:: { TimeZone , Utc } ;
@@ -81,10 +83,10 @@ mod tests {
8183 struct RoundTripResult {
8284 /// Data that was read back from ParquetFiles
8385 batches : Result < Vec < RecordBatch > > ,
86+ /// The EXPLAIN ANALYZE output
87+ explain : Result < String > ,
8488 /// The physical plan that was created (that has statistics, etc)
8589 parquet_exec : Arc < DataSourceExec > ,
86- /// The ParquetSource that is used in plan
87- parquet_source : ParquetSource ,
8890 }
8991
9092 /// round-trip record batches by writing each individual RecordBatch to
@@ -137,71 +139,109 @@ mod tests {
137139 self . round_trip ( batches) . await . batches
138140 }
139141
140- /// run the test, returning the `RoundTripResult`
141- async fn round_trip ( self , batches : Vec < RecordBatch > ) -> RoundTripResult {
142- let Self {
143- projection,
144- schema,
145- predicate,
146- pushdown_predicate,
147- page_index_predicate,
148- } = self ;
149-
150- let file_schema = match schema {
151- Some ( schema) => schema,
152- None => Arc :: new (
153- Schema :: try_merge (
154- batches. iter ( ) . map ( |b| b. schema ( ) . as_ref ( ) . clone ( ) ) ,
155- )
156- . unwrap ( ) ,
157- ) ,
158- } ;
159- // If testing with page_index_predicate, write parquet
160- // files with multiple pages
161- let multi_page = page_index_predicate;
162- let ( meta, _files) = store_parquet ( batches, multi_page) . await . unwrap ( ) ;
163- let file_group = meta. into_iter ( ) . map ( Into :: into) . collect ( ) ;
164-
142+ fn build_file_source ( & self , file_schema : SchemaRef ) -> Arc < ParquetSource > {
165143 // set up predicate (this is normally done by a layer higher up)
166- let predicate = predicate. map ( |p| logical2physical ( & p, & file_schema) ) ;
144+ let predicate = self
145+ . predicate
146+ . as_ref ( )
147+ . map ( |p| logical2physical ( p, & file_schema) ) ;
167148
168149 let mut source = ParquetSource :: default ( ) ;
169150 if let Some ( predicate) = predicate {
170151 source = source. with_predicate ( Arc :: clone ( & file_schema) , predicate) ;
171152 }
172153
173- if pushdown_predicate {
154+ if self . pushdown_predicate {
174155 source = source
175156 . with_pushdown_filters ( true )
176157 . with_reorder_filters ( true ) ;
177158 }
178159
179- if page_index_predicate {
160+ if self . page_index_predicate {
180161 source = source. with_enable_page_index ( true ) ;
181162 }
182163
164+ Arc :: new ( source)
165+ }
166+
167+ fn build_parquet_exec (
168+ & self ,
169+ file_schema : SchemaRef ,
170+ file_group : FileGroup ,
171+ source : Arc < ParquetSource > ,
172+ ) -> Arc < DataSourceExec > {
183173 let base_config = FileScanConfigBuilder :: new (
184174 ObjectStoreUrl :: local_filesystem ( ) ,
185175 file_schema,
186- Arc :: new ( source. clone ( ) ) ,
176+ source,
187177 )
188178 . with_file_group ( file_group)
189- . with_projection ( projection)
179+ . with_projection ( self . projection . clone ( ) )
190180 . build ( ) ;
181+ DataSourceExec :: from_data_source ( base_config)
182+ }
183+
184+ /// run the test, returning the `RoundTripResult`
185+ async fn round_trip ( & self , batches : Vec < RecordBatch > ) -> RoundTripResult {
186+ let file_schema = match & self . schema {
187+ Some ( schema) => schema,
188+ None => & Arc :: new (
189+ Schema :: try_merge (
190+ batches. iter ( ) . map ( |b| b. schema ( ) . as_ref ( ) . clone ( ) ) ,
191+ )
192+ . unwrap ( ) ,
193+ ) ,
194+ } ;
195+ let file_schema = Arc :: clone ( file_schema) ;
196+ // If testing with page_index_predicate, write parquet
197+ // files with multiple pages
198+ let multi_page = self . page_index_predicate ;
199+ let ( meta, _files) = store_parquet ( batches, multi_page) . await . unwrap ( ) ;
200+ let file_group: FileGroup = meta. into_iter ( ) . map ( Into :: into) . collect ( ) ;
201+
202+ // build a ParquetExec to return the results
203+ let parquet_source = self . build_file_source ( file_schema. clone ( ) ) ;
204+ let parquet_exec = self . build_parquet_exec (
205+ file_schema. clone ( ) ,
206+ file_group. clone ( ) ,
207+ Arc :: clone ( & parquet_source) ,
208+ ) ;
209+
210+ let analyze_exec = Arc :: new ( AnalyzeExec :: new (
211+ false ,
212+ false ,
213+ // use a new ParquetSource to avoid sharing execution metrics
214+ self . build_parquet_exec (
215+ file_schema. clone ( ) ,
216+ file_group. clone ( ) ,
217+ self . build_file_source ( file_schema. clone ( ) ) ,
218+ ) ,
219+ Arc :: new ( Schema :: new ( vec ! [
220+ Field :: new( "plan_type" , DataType :: Utf8 , true ) ,
221+ Field :: new( "plan" , DataType :: Utf8 , true ) ,
222+ ] ) ) ,
223+ ) ) ;
191224
192225 let session_ctx = SessionContext :: new ( ) ;
193226 let task_ctx = session_ctx. task_ctx ( ) ;
194227
195- let parquet_exec = DataSourceExec :: from_data_source ( base_config. clone ( ) ) ;
228+ let batches = collect (
229+ Arc :: clone ( & parquet_exec) as Arc < dyn ExecutionPlan > ,
230+ task_ctx. clone ( ) ,
231+ )
232+ . await ;
233+
234+ let explain = collect ( analyze_exec, task_ctx. clone ( ) )
235+ . await
236+ . map ( |batches| {
237+ let batches = pretty_format_batches ( & batches) . unwrap ( ) ;
238+ format ! ( "{batches}" )
239+ } ) ;
240+
196241 RoundTripResult {
197- batches : collect ( parquet_exec. clone ( ) , task_ctx) . await ,
242+ batches,
243+ explain,
198244 parquet_exec,
199- parquet_source : base_config
200- . file_source ( )
201- . as_any ( )
202- . downcast_ref :: < ParquetSource > ( )
203- . unwrap ( )
204- . clone ( ) ,
205245 }
206246 }
207247 }
@@ -1375,26 +1415,6 @@ mod tests {
13751415 create_batch ( vec ! [ ( "c1" , c1. clone( ) ) ] )
13761416 }
13771417
1378- /// Returns a int64 array with contents:
1379- /// "[-1, 1, null, 2, 3, null, null]"
1380- fn int64_batch ( ) -> RecordBatch {
1381- let contents: ArrayRef = Arc :: new ( Int64Array :: from ( vec ! [
1382- Some ( -1 ) ,
1383- Some ( 1 ) ,
1384- None ,
1385- Some ( 2 ) ,
1386- Some ( 3 ) ,
1387- None ,
1388- None ,
1389- ] ) ) ;
1390-
1391- create_batch ( vec ! [
1392- ( "a" , contents. clone( ) ) ,
1393- ( "b" , contents. clone( ) ) ,
1394- ( "c" , contents. clone( ) ) ,
1395- ] )
1396- }
1397-
13981418 #[ tokio:: test]
13991419 async fn parquet_exec_metrics ( ) {
14001420 // batch1: c1(string)
@@ -1454,110 +1474,17 @@ mod tests {
14541474 . round_trip ( vec ! [ batch1] )
14551475 . await ;
14561476
1457- // should have a pruning predicate
1458- let pruning_predicate = rt. parquet_source . pruning_predicate ( ) ;
1459- assert ! ( pruning_predicate. is_some( ) ) ;
1460-
1461- // convert to explain plan form
1462- let display = displayable ( rt. parquet_exec . as_ref ( ) )
1463- . indent ( true )
1464- . to_string ( ) ;
1465-
1466- assert_contains ! (
1467- & display,
1468- "pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)"
1469- ) ;
1470-
1471- assert_contains ! ( & display, r#"predicate=c1@0 != bar"# ) ;
1472-
1473- assert_contains ! ( & display, "projection=[c1]" ) ;
1474- }
1475-
1476- #[ tokio:: test]
1477- async fn parquet_exec_display_deterministic ( ) {
1478- // batches: a(int64), b(int64), c(int64)
1479- let batches = int64_batch ( ) ;
1480-
1481- fn extract_required_guarantees ( s : & str ) -> Option < & str > {
1482- s. split ( "required_guarantees=" ) . nth ( 1 )
1483- }
1484-
1485- // Ensuring that the required_guarantees remain consistent across every display plan of the filter conditions
1486- for _ in 0 ..100 {
1487- // c = 1 AND b = 1 AND a = 1
1488- let filter0 = col ( "c" )
1489- . eq ( lit ( 1 ) )
1490- . and ( col ( "b" ) . eq ( lit ( 1 ) ) )
1491- . and ( col ( "a" ) . eq ( lit ( 1 ) ) ) ;
1492-
1493- let rt0 = RoundTrip :: new ( )
1494- . with_predicate ( filter0)
1495- . with_pushdown_predicate ( )
1496- . round_trip ( vec ! [ batches. clone( ) ] )
1497- . await ;
1498-
1499- let pruning_predicate = rt0. parquet_source . pruning_predicate ( ) ;
1500- assert ! ( pruning_predicate. is_some( ) ) ;
1501-
1502- let display0 = displayable ( rt0. parquet_exec . as_ref ( ) )
1503- . indent ( true )
1504- . to_string ( ) ;
1505-
1506- let guarantees0: & str = extract_required_guarantees ( & display0)
1507- . expect ( "Failed to extract required_guarantees" ) ;
1508- // Compare only the required_guarantees part (Because the file_groups part will not be the same)
1509- assert_eq ! (
1510- guarantees0. trim( ) ,
1511- "[a in (1), b in (1), c in (1)]" ,
1512- "required_guarantees don't match"
1513- ) ;
1514- }
1477+ let explain = rt. explain . unwrap ( ) ;
15151478
1516- // c = 1 AND a = 1 AND b = 1
1517- let filter1 = col ( "c" )
1518- . eq ( lit ( 1 ) )
1519- . and ( col ( "a" ) . eq ( lit ( 1 ) ) )
1520- . and ( col ( "b" ) . eq ( lit ( 1 ) ) ) ;
1479+ // check that there was a pruning predicate -> row groups got pruned
1480+ assert_contains ! ( & explain, "predicate=c1@0 != bar" ) ;
15211481
1522- let rt1 = RoundTrip :: new ( )
1523- . with_predicate ( filter1)
1524- . with_pushdown_predicate ( )
1525- . round_trip ( vec ! [ batches. clone( ) ] )
1526- . await ;
1527-
1528- // b = 1 AND a = 1 AND c = 1
1529- let filter2 = col ( "b" )
1530- . eq ( lit ( 1 ) )
1531- . and ( col ( "a" ) . eq ( lit ( 1 ) ) )
1532- . and ( col ( "c" ) . eq ( lit ( 1 ) ) ) ;
1482+ // there's a single row group, but we can check that it matched
1483+ // if no pruning was done this would be 0 instead of 1
1484+ assert_contains ! ( & explain, "row_groups_matched_statistics=1" ) ;
15331485
1534- let rt2 = RoundTrip :: new ( )
1535- . with_predicate ( filter2)
1536- . with_pushdown_predicate ( )
1537- . round_trip ( vec ! [ batches] )
1538- . await ;
1539-
1540- // should have a pruning predicate
1541- let pruning_predicate = rt1. parquet_source . pruning_predicate ( ) ;
1542- assert ! ( pruning_predicate. is_some( ) ) ;
1543- let pruning_predicate = rt2. parquet_source . predicate ( ) ;
1544- assert ! ( pruning_predicate. is_some( ) ) ;
1545-
1546- // convert to explain plan form
1547- let display1 = displayable ( rt1. parquet_exec . as_ref ( ) )
1548- . indent ( true )
1549- . to_string ( ) ;
1550- let display2 = displayable ( rt2. parquet_exec . as_ref ( ) )
1551- . indent ( true )
1552- . to_string ( ) ;
1553-
1554- let guarantees1 = extract_required_guarantees ( & display1)
1555- . expect ( "Failed to extract required_guarantees" ) ;
1556- let guarantees2 = extract_required_guarantees ( & display2)
1557- . expect ( "Failed to extract required_guarantees" ) ;
1558-
1559- // Compare only the required_guarantees part (Because the predicate part will not be the same)
1560- assert_eq ! ( guarantees1, guarantees2, "required_guarantees don't match" ) ;
1486+ // check the projection
1487+ assert_contains ! ( & explain, "projection=[c1]" ) ;
15611488 }
15621489
15631490 #[ tokio:: test]
@@ -1581,16 +1508,19 @@ mod tests {
15811508 . await ;
15821509
15831510 // Should not contain a pruning predicate (since nothing can be pruned)
1584- let pruning_predicate = rt. parquet_source . pruning_predicate ( ) ;
1585- assert ! (
1586- pruning_predicate. is_none( ) ,
1587- "Still had pruning predicate: {pruning_predicate:?}"
1588- ) ;
1511+ let explain = rt. explain . unwrap ( ) ;
15891512
1590- // but does still has a pushdown down predicate
1591- let predicate = rt. parquet_source . predicate ( ) ;
1592- let filter_phys = logical2physical ( & filter, rt. parquet_exec . schema ( ) . as_ref ( ) ) ;
1593- assert_eq ! ( predicate. unwrap( ) . to_string( ) , filter_phys. to_string( ) ) ;
1513+ // When both matched and pruned are 0, it means that the pruning predicate
1514+ // was not used at all.
1515+ assert_contains ! ( & explain, "row_groups_matched_statistics=0" ) ;
1516+ assert_contains ! ( & explain, "row_groups_pruned_statistics=0" ) ;
1517+
1518+ // But pushdown predicate should be present
1519+ assert_contains ! (
1520+ & explain,
1521+ "predicate=CASE WHEN c1@0 != bar THEN true ELSE false END"
1522+ ) ;
1523+ assert_contains ! ( & explain, "pushdown_rows_pruned=5" ) ;
15941524 }
15951525
15961526 #[ tokio:: test]
@@ -1616,8 +1546,14 @@ mod tests {
16161546 . await ;
16171547
16181548 // Should have a pruning predicate
1619- let pruning_predicate = rt. parquet_source . pruning_predicate ( ) ;
1620- assert ! ( pruning_predicate. is_some( ) ) ;
1549+ let explain = rt. explain . unwrap ( ) ;
1550+ assert_contains ! (
1551+ & explain,
1552+ "predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE false END"
1553+ ) ;
1554+
1555+ // And bloom filters should have been evaluated
1556+ assert_contains ! ( & explain, "row_groups_pruned_bloom_filter=1" ) ;
16211557 }
16221558
16231559 /// Returns the sum of all the metrics with the specified name
0 commit comments