2121//! respect to the required sort order. See [`MinMaxStatistics`]
2222
2323use futures:: { Stream , StreamExt } ;
24- use std:: mem;
2524use std:: sync:: Arc ;
2625
2726use crate :: file_groups:: FileGroup ;
@@ -34,10 +33,12 @@ use arrow::{
3433 row:: { Row , Rows } ,
3534} ;
3635use datafusion_common:: stats:: Precision ;
37- use datafusion_common:: ScalarValue ;
3836use datafusion_common:: { plan_datafusion_err, plan_err, DataFusionError , Result } ;
3937use datafusion_physical_expr:: { expressions:: Column , PhysicalSortExpr } ;
4038use datafusion_physical_expr_common:: sort_expr:: LexOrdering ;
39+ use datafusion_physical_plan:: statistics:: {
40+ add_row_stats, compute_summary_statistics, set_max_if_greater, set_min_if_lesser,
41+ } ;
4142use datafusion_physical_plan:: { ColumnStatistics , Statistics } ;
4243
4344/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
@@ -409,62 +410,6 @@ pub async fn get_statistics_with_limit(
409410 Ok ( ( result_files, statistics) )
410411}
411412
412- /// Generic function to compute statistics across multiple items that have statistics
413- fn compute_summary_statistics < T , I > (
414- items : I ,
415- file_schema : & SchemaRef ,
416- stats_extractor : impl Fn ( & T ) -> Option < & Statistics > ,
417- ) -> Statistics
418- where
419- I : IntoIterator < Item = T > ,
420- {
421- let size = file_schema. fields ( ) . len ( ) ;
422- let mut col_stats_set = vec ! [ ColumnStatistics :: default ( ) ; size] ;
423- let mut num_rows = Precision :: < usize > :: Absent ;
424- let mut total_byte_size = Precision :: < usize > :: Absent ;
425-
426- for ( idx, item) in items. into_iter ( ) . enumerate ( ) {
427- if let Some ( item_stats) = stats_extractor ( & item) {
428- if idx == 0 {
429- // First item, set values directly
430- num_rows = item_stats. num_rows ;
431- total_byte_size = item_stats. total_byte_size ;
432- for ( index, column_stats) in
433- item_stats. column_statistics . iter ( ) . enumerate ( )
434- {
435- col_stats_set[ index] . null_count = column_stats. null_count ;
436- col_stats_set[ index] . max_value = column_stats. max_value . clone ( ) ;
437- col_stats_set[ index] . min_value = column_stats. min_value . clone ( ) ;
438- col_stats_set[ index] . sum_value = column_stats. sum_value . clone ( ) ;
439- }
440- continue ;
441- }
442-
443- // Accumulate statistics for subsequent items
444- num_rows = add_row_stats ( item_stats. num_rows , num_rows) ;
445- total_byte_size = add_row_stats ( item_stats. total_byte_size , total_byte_size) ;
446-
447- for ( item_col_stats, col_stats) in item_stats
448- . column_statistics
449- . iter ( )
450- . zip ( col_stats_set. iter_mut ( ) )
451- {
452- col_stats. null_count =
453- add_row_stats ( item_col_stats. null_count , col_stats. null_count ) ;
454- set_max_if_greater ( & item_col_stats. max_value , & mut col_stats. max_value ) ;
455- set_min_if_lesser ( & item_col_stats. min_value , & mut col_stats. min_value ) ;
456- col_stats. sum_value = item_col_stats. sum_value . add ( & col_stats. sum_value ) ;
457- }
458- }
459- }
460-
461- Statistics {
462- num_rows,
463- total_byte_size,
464- column_statistics : col_stats_set,
465- }
466- }
467-
468413/// Computes the summary statistics for a group of files(`FileGroup` level's statistics).
469414///
470415/// This function combines statistics from all files in the file group to create
@@ -489,10 +434,11 @@ pub fn compute_file_group_statistics(
489434 return Ok ( file_group) ;
490435 }
491436
492- let statistics =
493- compute_summary_statistics ( file_group. iter ( ) , & file_schema, |file| {
494- file. statistics . as_ref ( ) . map ( |stats| stats. as_ref ( ) )
495- } ) ;
437+ let statistics = compute_summary_statistics (
438+ file_group. iter ( ) ,
439+ file_schema. fields ( ) . len ( ) ,
440+ |file| file. statistics . as_ref ( ) . map ( |stats| stats. as_ref ( ) ) ,
441+ ) ;
496442
497443 Ok ( file_group. with_statistics ( statistics) )
498444}
@@ -532,10 +478,11 @@ pub fn compute_all_files_statistics(
532478 }
533479
534480 // Then summary statistics across all file groups
535- let mut statistics =
536- compute_summary_statistics ( & file_groups_with_stats, & file_schema, |file_group| {
537- file_group. statistics ( )
538- } ) ;
481+ let mut statistics = compute_summary_statistics (
482+ & file_groups_with_stats,
483+ file_schema. fields ( ) . len ( ) ,
484+ |file_group| file_group. statistics ( ) ,
485+ ) ;
539486
540487 if inexact_stats {
541488 statistics = statistics. to_inexact ( )
@@ -544,79 +491,6 @@ pub fn compute_all_files_statistics(
544491 Ok ( ( file_groups_with_stats, statistics) )
545492}
546493
547- pub fn add_row_stats (
548- file_num_rows : Precision < usize > ,
549- num_rows : Precision < usize > ,
550- ) -> Precision < usize > {
551- match ( file_num_rows, & num_rows) {
552- ( Precision :: Absent , _) => num_rows. to_inexact ( ) ,
553- ( lhs, Precision :: Absent ) => lhs. to_inexact ( ) ,
554- ( lhs, rhs) => lhs. add ( rhs) ,
555- }
556- }
557-
558- /// If the given value is numerically greater than the original maximum value,
559- /// return the new maximum value with appropriate exactness information.
560- fn set_max_if_greater (
561- max_nominee : & Precision < ScalarValue > ,
562- max_value : & mut Precision < ScalarValue > ,
563- ) {
564- match ( & max_value, max_nominee) {
565- ( Precision :: Exact ( val1) , Precision :: Exact ( val2) ) if val1 < val2 => {
566- * max_value = max_nominee. clone ( ) ;
567- }
568- ( Precision :: Exact ( val1) , Precision :: Inexact ( val2) )
569- | ( Precision :: Inexact ( val1) , Precision :: Inexact ( val2) )
570- | ( Precision :: Inexact ( val1) , Precision :: Exact ( val2) )
571- if val1 < val2 =>
572- {
573- * max_value = max_nominee. clone ( ) . to_inexact ( ) ;
574- }
575- ( Precision :: Exact ( _) , Precision :: Absent ) => {
576- let exact_max = mem:: take ( max_value) ;
577- * max_value = exact_max. to_inexact ( ) ;
578- }
579- ( Precision :: Absent , Precision :: Exact ( _) ) => {
580- * max_value = max_nominee. clone ( ) . to_inexact ( ) ;
581- }
582- ( Precision :: Absent , Precision :: Inexact ( _) ) => {
583- * max_value = max_nominee. clone ( ) ;
584- }
585- _ => { }
586- }
587- }
588-
589- /// If the given value is numerically lesser than the original minimum value,
590- /// return the new minimum value with appropriate exactness information.
591- fn set_min_if_lesser (
592- min_nominee : & Precision < ScalarValue > ,
593- min_value : & mut Precision < ScalarValue > ,
594- ) {
595- match ( & min_value, min_nominee) {
596- ( Precision :: Exact ( val1) , Precision :: Exact ( val2) ) if val1 > val2 => {
597- * min_value = min_nominee. clone ( ) ;
598- }
599- ( Precision :: Exact ( val1) , Precision :: Inexact ( val2) )
600- | ( Precision :: Inexact ( val1) , Precision :: Inexact ( val2) )
601- | ( Precision :: Inexact ( val1) , Precision :: Exact ( val2) )
602- if val1 > val2 =>
603- {
604- * min_value = min_nominee. clone ( ) . to_inexact ( ) ;
605- }
606- ( Precision :: Exact ( _) , Precision :: Absent ) => {
607- let exact_min = mem:: take ( min_value) ;
608- * min_value = exact_min. to_inexact ( ) ;
609- }
610- ( Precision :: Absent , Precision :: Exact ( _) ) => {
611- * min_value = min_nominee. clone ( ) . to_inexact ( ) ;
612- }
613- ( Precision :: Absent , Precision :: Inexact ( _) ) => {
614- * min_value = min_nominee. clone ( ) ;
615- }
616- _ => { }
617- }
618- }
619-
620494#[ cfg( test) ]
621495mod tests {
622496 use super :: * ;
@@ -679,7 +553,9 @@ mod tests {
679553
680554 // Call compute_summary_statistics
681555 let summary_stats =
682- compute_summary_statistics ( items, & schema, |item| Some ( item. as_ref ( ) ) ) ;
556+ compute_summary_statistics ( items, schema. fields ( ) . len ( ) , |item| {
557+ Some ( item. as_ref ( ) )
558+ } ) ;
683559
684560 // Verify the results
685561 assert_eq ! ( summary_stats. num_rows, Precision :: Exact ( 25 ) ) ; // 10 + 15
@@ -754,7 +630,9 @@ mod tests {
754630 let items = vec ! [ Arc :: new( stats1) , Arc :: new( stats2) ] ;
755631
756632 let summary_stats =
757- compute_summary_statistics ( items, & schema, |item| Some ( item. as_ref ( ) ) ) ;
633+ compute_summary_statistics ( items, schema. fields ( ) . len ( ) , |item| {
634+ Some ( item. as_ref ( ) )
635+ } ) ;
758636
759637 assert_eq ! ( summary_stats. num_rows, Precision :: Inexact ( 25 ) ) ;
760638 assert_eq ! ( summary_stats. total_byte_size, Precision :: Inexact ( 250 ) ) ;
@@ -784,7 +662,9 @@ mod tests {
784662 let items: Vec < Arc < Statistics > > = vec ! [ ] ;
785663
786664 let summary_stats =
787- compute_summary_statistics ( items, & schema, |item| Some ( item. as_ref ( ) ) ) ;
665+ compute_summary_statistics ( items, schema. fields ( ) . len ( ) , |item| {
666+ Some ( item. as_ref ( ) )
667+ } ) ;
788668
789669 // Verify default values for empty collection
790670 assert_eq ! ( summary_stats. num_rows, Precision :: Absent ) ;
0 commit comments