1717
1818#[ cfg( test) ]
1919mod test {
20+ use arrow:: array:: { Int32Array , RecordBatch } ;
2021 use arrow_schema:: { DataType , Field , Schema , SortOptions } ;
2122 use datafusion:: datasource:: listing:: ListingTable ;
2223 use datafusion:: prelude:: SessionContext ;
2324 use datafusion_catalog:: TableProvider ;
2425 use datafusion_common:: stats:: Precision ;
26+ use datafusion_common:: Result ;
2527 use datafusion_common:: { ColumnStatistics , ScalarValue , Statistics } ;
2628 use datafusion_execution:: config:: SessionConfig ;
29+ use datafusion_execution:: TaskContext ;
2730 use datafusion_expr_common:: operator:: Operator ;
2831 use datafusion_physical_expr:: expressions:: { binary, lit, Column } ;
2932 use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
3033 use datafusion_physical_expr_common:: sort_expr:: { LexOrdering , PhysicalSortExpr } ;
34+ use datafusion_physical_plan:: coalesce_batches:: CoalesceBatchesExec ;
35+ use datafusion_physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
3136 use datafusion_physical_plan:: filter:: FilterExec ;
3237 use datafusion_physical_plan:: joins:: CrossJoinExec ;
38+ use datafusion_physical_plan:: limit:: { GlobalLimitExec , LocalLimitExec } ;
3339 use datafusion_physical_plan:: projection:: ProjectionExec ;
3440 use datafusion_physical_plan:: sorts:: sort:: SortExec ;
3541 use datafusion_physical_plan:: union:: UnionExec ;
36- use datafusion_physical_plan:: ExecutionPlan ;
42+ use datafusion_physical_plan:: { execute_stream_partitioned, ExecutionPlan } ;
43+ use futures:: TryStreamExt ;
3744 use std:: sync:: Arc ;
3845
3946 /// Creates a test table with statistics from the test data directory.
@@ -121,9 +128,63 @@ mod test {
121128 }
122129 }
123130
131+ /// Helper function to validate that statistics from statistics_by_partition match the actual data
132+ async fn validate_statistics_with_data (
133+ plan : Arc < dyn ExecutionPlan > ,
134+ expected_stats : Vec < ( i32 , i32 , usize ) > , // (min_id, max_id, row_count)
135+ id_column_index : usize ,
136+ ) -> Result < ( ) > {
137+ let ctx = TaskContext :: default ( ) ;
138+ let partitions = execute_stream_partitioned ( plan, Arc :: new ( ctx) ) ?;
139+
140+ let mut actual_stats = Vec :: new ( ) ;
141+ for partition_stream in partitions. into_iter ( ) {
142+ let result: Vec < RecordBatch > = partition_stream. try_collect ( ) . await ?;
143+
144+ let mut min_id = i32:: MAX ;
145+ let mut max_id = i32:: MIN ;
146+ let mut row_count = 0 ;
147+
148+ for batch in result {
149+ if batch. num_columns ( ) > id_column_index {
150+ let id_array = batch
151+ . column ( id_column_index)
152+ . as_any ( )
153+ . downcast_ref :: < Int32Array > ( )
154+ . unwrap ( ) ;
155+ for i in 0 ..batch. num_rows ( ) {
156+ let id_value = id_array. value ( i) ;
157+ min_id = min_id. min ( id_value) ;
158+ max_id = max_id. max ( id_value) ;
159+ row_count += 1 ;
160+ }
161+ }
162+ }
163+
164+ if row_count > 0 {
165+ actual_stats. push ( ( min_id, max_id, row_count) ) ;
166+ }
167+ }
168+
169+ // Compare actual data with expected statistics
170+ assert_eq ! (
171+ actual_stats. len( ) ,
172+ expected_stats. len( ) ,
173+ "Number of partitions with data doesn't match expected"
174+ ) ;
175+ for i in 0 ..actual_stats. len ( ) {
176+ assert_eq ! (
177+ actual_stats[ i] , expected_stats[ i] ,
178+ "Partition {} data doesn't match statistics" ,
179+ i
180+ ) ;
181+ }
182+
183+ Ok ( ( ) )
184+ }
185+
124186 #[ tokio:: test]
125- async fn test_statistics_by_partition_of_data_source ( ) -> datafusion_common:: Result < ( ) >
126- {
187+ async fn test_statistics_by_partition_of_data_source ( ) -> Result < ( ) > {
127188 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
128189 let statistics = scan. statistics_by_partition ( ) ?;
129190 let expected_statistic_partition_1 =
@@ -134,12 +195,19 @@ mod test {
134195 assert_eq ! ( statistics. len( ) , 2 ) ;
135196 assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
136197 assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
198+
199+ // Check the statistics_by_partition with real results
200+ let expected_stats = vec ! [
201+ ( 3 , 4 , 2 ) , // (min_id, max_id, row_count) for first partition
202+ ( 1 , 2 , 2 ) , // (min_id, max_id, row_count) for second partition
203+ ] ;
204+ validate_statistics_with_data ( scan, expected_stats, 0 ) . await ?;
205+
137206 Ok ( ( ) )
138207 }
139208
140209 #[ tokio:: test]
141- async fn test_statistics_by_partition_of_projection ( ) -> datafusion_common:: Result < ( ) >
142- {
210+ async fn test_statistics_by_partition_of_projection ( ) -> Result < ( ) > {
143211 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
144212 // Add projection execution plan
145213 let exprs: Vec < ( Arc < dyn PhysicalExpr > , String ) > =
@@ -154,12 +222,16 @@ mod test {
154222 assert_eq ! ( statistics. len( ) , 2 ) ;
155223 assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
156224 assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
225+
226+ // Check the statistics_by_partition with real results
227+ let expected_stats = vec ! [ ( 3 , 4 , 2 ) , ( 1 , 2 , 2 ) ] ;
228+ validate_statistics_with_data ( Arc :: new ( projection) , expected_stats, 0 ) . await ?;
157229 Ok ( ( ) )
158230 }
159231
160232 #[ tokio:: test]
161- async fn test_statistics_by_partition_of_sort ( ) -> datafusion_common :: Result < ( ) > {
162- let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
233+ async fn test_statistics_by_partition_of_sort ( ) -> Result < ( ) > {
234+ let scan_1 = create_scan_exec_with_statistics ( None , Some ( 1 ) ) . await ;
163235 // Add sort execution plan
164236 let sort = SortExec :: new (
165237 LexOrdering :: new ( vec ! [ PhysicalSortExpr {
@@ -169,16 +241,34 @@ mod test {
169241 nulls_first: false ,
170242 } ,
171243 } ] ) ,
172- scan ,
244+ scan_1 ,
173245 ) ;
174- let mut sort_exec = Arc :: new ( sort. clone ( ) ) ;
246+ let sort_exec = Arc :: new ( sort. clone ( ) ) ;
175247 let statistics = sort_exec. statistics_by_partition ( ) ?;
176248 let expected_statistic_partition =
177249 create_partition_statistics ( 4 , 220 , 1 , 4 , true ) ;
178250 assert_eq ! ( statistics. len( ) , 1 ) ;
179251 assert_eq ! ( statistics[ 0 ] , expected_statistic_partition) ;
252+ // Check the statistics_by_partition with real results
253+ let expected_stats = vec ! [ ( 1 , 4 , 4 ) ] ;
254+ validate_statistics_with_data ( sort_exec. clone ( ) , expected_stats, 0 ) . await ?;
180255
181- sort_exec = Arc :: new ( sort. with_preserve_partitioning ( true ) ) ;
256+ // Sort with preserve_partitioning
257+ let scan_2 = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
258+ // Add sort execution plan
259+ let sort_exec = Arc :: new (
260+ SortExec :: new (
261+ LexOrdering :: new ( vec ! [ PhysicalSortExpr {
262+ expr: Arc :: new( Column :: new( "id" , 0 ) ) ,
263+ options: SortOptions {
264+ descending: false ,
265+ nulls_first: false ,
266+ } ,
267+ } ] ) ,
268+ scan_2,
269+ )
270+ . with_preserve_partitioning ( true ) ,
271+ ) ;
182272 let expected_statistic_partition_1 =
183273 create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
184274 let expected_statistic_partition_2 =
@@ -187,11 +277,15 @@ mod test {
187277 assert_eq ! ( statistics. len( ) , 2 ) ;
188278 assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
189279 assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
280+
281+ // Check the statistics_by_partition with real results
282+ let expected_stats = vec ! [ ( 3 , 4 , 2 ) , ( 1 , 2 , 2 ) ] ;
283+ validate_statistics_with_data ( sort_exec, expected_stats, 0 ) . await ?;
190284 Ok ( ( ) )
191285 }
192286
193287 #[ tokio:: test]
194- async fn test_statistics_by_partition_of_filter ( ) -> datafusion_common :: Result < ( ) > {
288+ async fn test_statistics_by_partition_of_filter ( ) -> Result < ( ) > {
195289 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
196290 let schema = Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ;
197291 let predicate = binary (
@@ -233,7 +327,7 @@ mod test {
233327 }
234328
235329 #[ tokio:: test]
236- async fn test_statistic_by_partition_of_union ( ) -> datafusion_common :: Result < ( ) > {
330+ async fn test_statistic_by_partition_of_union ( ) -> Result < ( ) > {
237331 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
238332 let union_exec = Arc :: new ( UnionExec :: new ( vec ! [ scan. clone( ) , scan] ) ) ;
239333 let statistics = union_exec. statistics_by_partition ( ) ?;
@@ -252,12 +346,14 @@ mod test {
252346 // Verify fourth partition (from second scan - same as second partition)
253347 assert_eq ! ( statistics[ 3 ] , expected_statistic_partition_2) ;
254348
349+ // Check the statistics_by_partition with real results
350+ let expected_stats = vec ! [ ( 3 , 4 , 2 ) , ( 1 , 2 , 2 ) , ( 3 , 4 , 2 ) , ( 1 , 2 , 2 ) ] ;
351+ validate_statistics_with_data ( union_exec, expected_stats, 0 ) . await ?;
255352 Ok ( ( ) )
256353 }
257354
258355 #[ tokio:: test]
259- async fn test_statistic_by_partition_of_cross_join ( ) -> datafusion_common:: Result < ( ) >
260- {
356+ async fn test_statistic_by_partition_of_cross_join ( ) -> Result < ( ) > {
261357 let left_scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
262358 let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL) \
263359 STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\
@@ -292,6 +388,73 @@ mod test {
292388 } ) ;
293389 assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
294390 assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
391+
392+ // Check the statistics_by_partition with real results
393+ let expected_stats = vec ! [ ( 1 , 4 , 8 ) , ( 1 , 4 , 8 ) ] ;
394+ validate_statistics_with_data ( Arc :: new ( cross_join) , expected_stats, 0 ) . await ?;
395+ Ok ( ( ) )
396+ }
397+
398+ #[ tokio:: test]
399+ async fn test_statistic_by_partition_of_coalesce_batches ( ) -> Result < ( ) > {
400+ let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
401+ let coalesce_batches = CoalesceBatchesExec :: new ( scan, 2 ) ;
402+ let expected_statistic_partition_1 =
403+ create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
404+ let expected_statistic_partition_2 =
405+ create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
406+ let statistics = coalesce_batches. statistics_by_partition ( ) ?;
407+ assert_eq ! ( statistics. len( ) , 2 ) ;
408+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
409+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
410+
411+ // Check the statistics_by_partition with real results
412+ let expected_stats = vec ! [ ( 3 , 4 , 2 ) , ( 1 , 2 , 2 ) ] ;
413+ validate_statistics_with_data ( Arc :: new ( coalesce_batches) , expected_stats, 0 )
414+ . await ?;
415+ Ok ( ( ) )
416+ }
417+
418+ #[ tokio:: test]
419+ async fn test_statistic_by_partition_of_coalesce_partitions ( ) -> Result < ( ) > {
420+ let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
421+ let coalesce_partitions = CoalescePartitionsExec :: new ( scan) ;
422+ let expected_statistic_partition =
423+ create_partition_statistics ( 4 , 220 , 1 , 4 , true ) ;
424+ let statistics = coalesce_partitions. statistics_by_partition ( ) ?;
425+ assert_eq ! ( statistics. len( ) , 1 ) ;
426+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition) ;
427+
428+ // Check the statistics_by_partition with real results
429+ let expected_stats = vec ! [ ( 1 , 4 , 4 ) ] ;
430+ validate_statistics_with_data ( Arc :: new ( coalesce_partitions) , expected_stats, 0 )
431+ . await ?;
432+ Ok ( ( ) )
433+ }
434+
435+ #[ tokio:: test]
436+ async fn test_statistic_by_partition_of_local_limit ( ) -> Result < ( ) > {
437+ let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
438+ let local_limit = LocalLimitExec :: new ( scan. clone ( ) , 1 ) ;
439+ let statistics = local_limit. statistics_by_partition ( ) ?;
440+ assert_eq ! ( statistics. len( ) , 2 ) ;
441+ let schema = scan. schema ( ) ;
442+ let mut expected_statistic_partition = Statistics :: new_unknown ( & schema) ;
443+ expected_statistic_partition. num_rows = Precision :: Exact ( 1 ) ;
444+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition) ;
445+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition) ;
446+ Ok ( ( ) )
447+ }
448+
449+ #[ tokio:: test]
450+ async fn test_statistic_by_partition_of_global_limit_partitions ( ) -> Result < ( ) > {
451+ let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
452+ let global_limit = GlobalLimitExec :: new ( scan. clone ( ) , 0 , Some ( 2 ) ) ;
453+ let statistics = global_limit. statistics_by_partition ( ) ?;
454+ assert_eq ! ( statistics. len( ) , 1 ) ;
455+ let mut expected_statistic_partition = Statistics :: new_unknown ( & scan. schema ( ) ) ;
456+ expected_statistic_partition. num_rows = Precision :: Exact ( 2 ) ;
457+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition) ;
295458 Ok ( ( ) )
296459 }
297460}
0 commit comments