1717
1818//! The table implementation. 
1919
20- use  std:: collections:: HashMap ; 
21- use  std:: { any:: Any ,  str:: FromStr ,  sync:: Arc } ; 
22- 
2320use  super :: helpers:: { expr_applicable_for_cols,  pruned_partition_list} ; 
2421use  super :: { ListingTableUrl ,  PartitionedFile } ; 
22+ use  std:: collections:: HashMap ; 
23+ use  std:: { any:: Any ,  str:: FromStr ,  sync:: Arc } ; 
2524
2625use  crate :: datasource:: { 
2726    create_ordering, 
2827    file_format:: { 
2928        file_compression_type:: FileCompressionType ,  FileFormat ,  FilePushdownSupport , 
3029    } , 
31-     get_statistics_with_limit, 
3230    physical_plan:: FileSinkConfig , 
3331} ; 
3432use  crate :: execution:: context:: SessionState ; 
@@ -55,9 +53,12 @@ use datafusion_physical_expr::{
5553
5654use  async_trait:: async_trait; 
5755use  datafusion_catalog:: Session ; 
56+ use  datafusion_common:: stats:: Precision ; 
57+ use  datafusion_datasource:: add_row_stats; 
58+ use  datafusion_datasource:: compute_all_files_statistics; 
5859use  datafusion_datasource:: file_groups:: FileGroup ; 
5960use  datafusion_physical_expr_common:: sort_expr:: LexRequirement ; 
60- use  futures:: { future,  stream,  StreamExt ,  TryStreamExt } ; 
61+ use  futures:: { future,  stream,  Stream ,   StreamExt ,  TryStreamExt } ; 
6162use  itertools:: Itertools ; 
6263use  object_store:: ObjectStore ; 
6364
@@ -1115,32 +1116,26 @@ impl ListingTable {
11151116        let  files = file_list
11161117            . map ( |part_file| async  { 
11171118                let  part_file = part_file?; 
1118-                 if  self . options . collect_stat  { 
1119-                     let  statistics =
1120-                         self . do_collect_statistics ( ctx,  & store,  & part_file) . await ?; 
1121-                     Ok ( ( part_file,  statistics) ) 
1119+                 let  statistics = if  self . options . collect_stat  { 
1120+                     self . do_collect_statistics ( ctx,  & store,  & part_file) . await ?
11221121                }  else  { 
1123-                     Ok ( ( 
1124-                         part_file, 
1125-                         Arc :: new ( Statistics :: new_unknown ( & self . file_schema ) ) , 
1126-                     ) ) 
1127-                 } 
1122+                     Arc :: new ( Statistics :: new_unknown ( & self . file_schema ) ) 
1123+                 } ; 
1124+                 Ok ( part_file. with_statistics ( statistics) ) 
11281125            } ) 
11291126            . boxed ( ) 
11301127            . buffer_unordered ( ctx. config_options ( ) . execution . meta_fetch_concurrency ) ; 
11311128
1132-         let  ( files,  statistics)  = get_statistics_with_limit ( 
1133-             files, 
1129+         let  ( file_group,  inexact_stats)  =
1130+             get_files_with_limit ( files,  limit,  self . options . collect_stat ) . await ?; 
1131+ 
1132+         let  file_groups = file_group. split_files ( self . options . target_partitions ) ; 
1133+         compute_all_files_statistics ( 
1134+             file_groups, 
11341135            self . schema ( ) , 
1135-             limit, 
11361136            self . options . collect_stat , 
1137+             inexact_stats, 
11371138        ) 
1138-         . await ?; 
1139- 
1140-         Ok ( ( 
1141-             files. split_files ( self . options . target_partitions ) , 
1142-             statistics, 
1143-         ) ) 
11441139    } 
11451140
11461141    /// Collects statistics for a given partitioned file. 
@@ -1182,6 +1177,86 @@ impl ListingTable {
11821177    } 
11831178} 
11841179
1180+ /// Processes a stream of partitioned files and returns a `FileGroup` containing the files. 
1181+ /// 
1182+ /// This function collects files from the provided stream until either: 
1183+ /// 1. The stream is exhausted 
1184+ /// 2. The accumulated number of rows exceeds the provided `limit` (if specified) 
1185+ /// 
1186+ /// # Arguments 
1187+ /// * `files` - A stream of `Result<PartitionedFile>` items to process 
1188+ /// * `limit` - An optional row count limit. If provided, the function will stop collecting files 
1189+ ///             once the accumulated number of rows exceeds this limit 
1190+ /// * `collect_stats` - Whether to collect and accumulate statistics from the files 
1191+ /// 
1192+ /// # Returns 
1193+ /// A `Result` containing a `FileGroup` with the collected files 
1194+ /// and a boolean indicating whether the statistics are inexact. 
1195+ /// 
1196+ /// # Note 
1197+ /// The function will continue processing files if statistics are not available or if the 
1198+ /// limit is not provided. If `collect_stats` is false, statistics won't be accumulated 
1199+ /// but files will still be collected. 
1200+ async  fn  get_files_with_limit ( 
1201+     files :  impl  Stream < Item  = Result < PartitionedFile > > , 
1202+     limit :  Option < usize > , 
1203+     collect_stats :  bool , 
1204+ )  -> Result < ( FileGroup ,  bool ) >  { 
1205+     let  mut  file_group = FileGroup :: default ( ) ; 
1206+     // Fusing the stream allows us to call next safely even once it is finished. 
1207+     let  mut  all_files = Box :: pin ( files. fuse ( ) ) ; 
1208+     let  mut  num_rows = Precision :: < usize > :: Absent ; 
1209+     while  let  Some ( first_file)  = all_files. next ( ) . await  { 
1210+         let  file = first_file?; 
1211+         if  let  Some ( file_statistic)  = & file. statistics  { 
1212+             num_rows = file_statistic. num_rows ; 
1213+         } 
1214+         file_group. push ( file) ; 
1215+ 
1216+         // If the number of rows exceeds the limit, we can stop processing 
1217+         // files. This only applies when we know the number of rows. It also 
1218+         // currently ignores tables that have no statistics regarding the 
1219+         // number of rows. 
1220+         let  conservative_num_rows = match  num_rows { 
1221+             Precision :: Exact ( nr)  => nr, 
1222+             _ => usize:: MIN , 
1223+         } ; 
1224+         if  conservative_num_rows <= limit. unwrap_or ( usize:: MAX )  { 
1225+             while  let  Some ( current)  = all_files. next ( ) . await  { 
1226+                 let  file = current?; 
1227+                 if  !collect_stats { 
1228+                     file_group. push ( file) ; 
1229+                     continue ; 
1230+                 } 
1231+ 
1232+                 // We accumulate the number of rows, total byte size and null 
1233+                 // counts across all the files in question. If any file does not 
1234+                 // provide any information or provides an inexact value, we demote 
1235+                 // the statistic precision to inexact. 
1236+                 if  let  Some ( file_stats)  = & file. statistics  { 
1237+                     num_rows = add_row_stats ( num_rows,  file_stats. num_rows ) ; 
1238+                 } 
1239+                 file_group. push ( file) ; 
1240+ 
1241+                 // If the number of rows exceeds the limit, we can stop processing 
1242+                 // files. This only applies when we know the number of rows. It also 
1243+                 // currently ignores tables that have no statistics regarding the 
1244+                 // number of rows. 
1245+                 if  num_rows. get_value ( ) . unwrap_or ( & usize:: MIN ) 
1246+                     > & limit. unwrap_or ( usize:: MAX ) 
1247+                 { 
1248+                     break ; 
1249+                 } 
1250+             } 
1251+         } 
1252+     } 
1253+     // If we still have files in the stream, it means that the limit kicked 
1254+     // in, and the statistic could have been different had we processed the 
1255+     // files in a different order. 
1256+     let  inexact_stats = all_files. next ( ) . await . is_some ( ) ; 
1257+     Ok ( ( file_group,  inexact_stats) ) 
1258+ } 
1259+ 
11851260#[ cfg( test) ]  
11861261mod  tests { 
11871262    use  super :: * ; 
0 commit comments