@@ -133,8 +133,7 @@ mod tests {
133133 use datafusion_datasource:: file_sink_config:: { FileSink , FileSinkConfig } ;
134134 use datafusion_datasource:: { ListingTableUrl , PartitionedFile } ;
135135 use datafusion_datasource_parquet:: {
136- fetch_parquet_metadata, fetch_statistics, statistics_from_parquet_meta_calc,
137- ObjectStoreFetch , ParquetFormat , ParquetFormatFactory , ParquetSink ,
136+ ParquetFormat , ParquetFormatFactory , ParquetSink ,
138137 } ;
139138 use datafusion_execution:: object_store:: ObjectStoreUrl ;
140139 use datafusion_execution:: runtime_env:: RuntimeEnv ;
@@ -143,13 +142,15 @@ mod tests {
143142 use datafusion_physical_plan:: stream:: RecordBatchStreamAdapter ;
144143 use datafusion_physical_plan:: { collect, ExecutionPlan } ;
145144
145+ use crate :: test_util:: bounded_stream;
146146 use arrow:: array:: {
147147 types:: Int32Type , Array , ArrayRef , DictionaryArray , Int32Array , Int64Array ,
148148 StringArray ,
149149 } ;
150150 use arrow:: datatypes:: { DataType , Field } ;
151151 use async_trait:: async_trait;
152152 use datafusion_datasource:: file_groups:: FileGroup ;
153+ use datafusion_datasource_parquet:: metadata:: DFParquetMetadata ;
153154 use futures:: stream:: BoxStream ;
154155 use futures:: StreamExt ;
155156 use insta:: assert_snapshot;
@@ -167,8 +168,6 @@ mod tests {
167168 use parquet:: format:: FileMetaData ;
168169 use tokio:: fs:: File ;
169170
170- use crate :: test_util:: bounded_stream;
171-
172171 enum ForceViews {
173172 Yes ,
174173 No ,
@@ -195,31 +194,24 @@ mod tests {
195194 let format = ParquetFormat :: default ( ) . with_force_view_types ( force_views) ;
196195 let schema = format. infer_schema ( & ctx, & store, & meta) . await ?;
197196
198- let stats = fetch_statistics (
199- store. as_ref ( ) ,
200- schema. clone ( ) ,
201- & meta[ 0 ] ,
202- None ,
203- None ,
204- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
205- )
206- . await ?;
197+ let file_metadata_cache =
198+ ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
199+ let stats = DFParquetMetadata :: new ( & store, & meta[ 0 ] )
200+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
201+ . fetch_statistics ( & schema)
202+ . await ?;
207203
208204 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
209205 let c1_stats = & stats. column_statistics [ 0 ] ;
210206 let c2_stats = & stats. column_statistics [ 1 ] ;
211207 assert_eq ! ( c1_stats. null_count, Precision :: Exact ( 1 ) ) ;
212208 assert_eq ! ( c2_stats. null_count, Precision :: Exact ( 3 ) ) ;
213209
214- let stats = fetch_statistics (
215- store. as_ref ( ) ,
216- schema,
217- & meta[ 1 ] ,
218- None ,
219- None ,
220- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
221- )
222- . await ?;
210+ let stats = DFParquetMetadata :: new ( & store, & meta[ 1 ] )
211+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
212+ . fetch_statistics ( & schema)
213+ . await ?;
214+
223215 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
224216 let c1_stats = & stats. column_statistics [ 0 ] ;
225217 let c2_stats = & stats. column_statistics [ 1 ] ;
@@ -392,51 +384,25 @@ mod tests {
392384
393385 // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
394386 // for the remaining metadata
395- fetch_parquet_metadata (
396- ObjectStoreFetch :: new ( store. as_ref ( ) as & dyn ObjectStore , & meta[ 0 ] ) ,
397- & meta[ 0 ] ,
398- Some ( 9 ) ,
399- None ,
400- None ,
401- )
402- . await
403- . expect ( "error reading metadata with hint" ) ;
387+ let file_metadata_cache =
388+ ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
389+ let df_meta = DFParquetMetadata :: new ( store. as_ref ( ) , & meta[ 0 ] )
390+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
391+ . with_metadata_size_hint ( Some ( 9 ) ) ;
392+ df_meta. fetch_metadata ( ) . await ?;
404393 assert_eq ! ( store. request_count( ) , 2 ) ;
405394
406395 // Increases by 3 because cache has no entries yet
407- fetch_parquet_metadata (
408- ObjectStoreFetch :: new ( store. as_ref ( ) as & dyn ObjectStore , & meta[ 0 ] ) ,
409- & meta[ 0 ] ,
410- Some ( 9 ) ,
411- None ,
412- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
413- )
414- . await
415- . expect ( "error reading metadata with hint" ) ;
396+ df_meta. fetch_metadata ( ) . await ?;
416397 assert_eq ! ( store. request_count( ) , 5 ) ;
417398
418399 // No increase because cache has an entry
419- fetch_parquet_metadata (
420- ObjectStoreFetch :: new ( store. as_ref ( ) as & dyn ObjectStore , & meta[ 0 ] ) ,
421- & meta[ 0 ] ,
422- Some ( 9 ) ,
423- None ,
424- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
425- )
426- . await
427- . expect ( "error reading metadata with hint" ) ;
400+ df_meta. fetch_metadata ( ) . await ?;
428401 assert_eq ! ( store. request_count( ) , 5 ) ;
429402
430403 // Increase by 2 because `get_file_metadata_cache()` is None
431- fetch_parquet_metadata (
432- ObjectStoreFetch :: new ( store. as_ref ( ) as & dyn ObjectStore , & meta[ 0 ] ) ,
433- & meta[ 0 ] ,
434- Some ( 9 ) ,
435- None ,
436- None ,
437- )
438- . await
439- . expect ( "error reading metadata with hint" ) ;
404+ let df_meta = df_meta. with_file_metadata_cache ( None ) ;
405+ df_meta. fetch_metadata ( ) . await ?;
440406 assert_eq ! ( store. request_count( ) , 7 ) ;
441407
442408 let force_views = match force_views {
@@ -454,15 +420,9 @@ mod tests {
454420 assert_eq ! ( store. request_count( ) , 10 ) ;
455421
456422 // No increase, cache being used
457- let stats = fetch_statistics (
458- store. upcast ( ) . as_ref ( ) ,
459- schema. clone ( ) ,
460- & meta[ 0 ] ,
461- Some ( 9 ) ,
462- None ,
463- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
464- )
465- . await ?;
423+ let df_meta =
424+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
425+ let stats = df_meta. fetch_statistics ( & schema) . await ?;
466426 assert_eq ! ( store. request_count( ) , 10 ) ;
467427
468428 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
@@ -477,55 +437,28 @@ mod tests {
477437
478438 // Use the file size as the hint so we can get the full metadata from the first fetch
479439 let size_hint = meta[ 0 ] . size as usize ;
440+ let df_meta = DFParquetMetadata :: new ( store. as_ref ( ) , & meta[ 0 ] )
441+ . with_metadata_size_hint ( Some ( size_hint) ) ;
480442
481- fetch_parquet_metadata (
482- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
483- & meta[ 0 ] ,
484- Some ( size_hint) ,
485- None ,
486- None ,
487- )
488- . await
489- . expect ( "error reading metadata with hint" ) ;
443+ df_meta. fetch_metadata ( ) . await ?;
490444 // ensure the requests were coalesced into a single request
491445 assert_eq ! ( store. request_count( ) , 1 ) ;
492446
493447 let session = SessionContext :: new ( ) ;
494448 let ctx = session. state ( ) ;
449+ let df_meta =
450+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
495451 // Increases by 1 because cache has no entries yet and new session context
496- fetch_parquet_metadata (
497- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
498- & meta[ 0 ] ,
499- Some ( size_hint) ,
500- None ,
501- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
502- )
503- . await
504- . expect ( "error reading metadata with hint" ) ;
452+ df_meta. fetch_metadata ( ) . await ?;
505453 assert_eq ! ( store. request_count( ) , 2 ) ;
506454
507455 // No increase because cache has an entry
508- fetch_parquet_metadata (
509- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
510- & meta[ 0 ] ,
511- Some ( size_hint) ,
512- None ,
513- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
514- )
515- . await
516- . expect ( "error reading metadata with hint" ) ;
456+ df_meta. fetch_metadata ( ) . await ?;
517457 assert_eq ! ( store. request_count( ) , 2 ) ;
518458
519459 // Increase by 1 because `get_file_metadata_cache` is None
520- fetch_parquet_metadata (
521- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
522- & meta[ 0 ] ,
523- Some ( size_hint) ,
524- None ,
525- None ,
526- )
527- . await
528- . expect ( "error reading metadata with hint" ) ;
460+ let df_meta = df_meta. with_file_metadata_cache ( None ) ;
461+ df_meta. fetch_metadata ( ) . await ?;
529462 assert_eq ! ( store. request_count( ) , 3 ) ;
530463
531464 let format = ParquetFormat :: default ( )
@@ -538,15 +471,9 @@ mod tests {
538471 let schema = format. infer_schema ( & ctx, & store. upcast ( ) , & meta) . await ?;
539472 assert_eq ! ( store. request_count( ) , 4 ) ;
540473 // No increase, cache being used
541- let stats = fetch_statistics (
542- store. upcast ( ) . as_ref ( ) ,
543- schema. clone ( ) ,
544- & meta[ 0 ] ,
545- Some ( size_hint) ,
546- None ,
547- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
548- )
549- . await ?;
474+ let df_meta =
475+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
476+ let stats = df_meta. fetch_statistics ( & schema) . await ?;
550477 assert_eq ! ( store. request_count( ) , 4 ) ;
551478
552479 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
@@ -559,29 +486,18 @@ mod tests {
559486 LocalFileSystem :: new ( ) ,
560487 ) ) ) ;
561488
562- // Use the a size hint larger than the file size to make sure we don't panic
489+ // Use a size hint larger than the file size to make sure we don't panic
563490 let size_hint = ( meta[ 0 ] . size + 100 ) as usize ;
564- fetch_parquet_metadata (
565- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
566- & meta[ 0 ] ,
567- Some ( size_hint) ,
568- None ,
569- None ,
570- )
571- . await
572- . expect ( "error reading metadata with hint" ) ;
491+ let df_meta = DFParquetMetadata :: new ( store. as_ref ( ) , & meta[ 0 ] )
492+ . with_metadata_size_hint ( Some ( size_hint) ) ;
493+
494+ df_meta. fetch_metadata ( ) . await ?;
573495 assert_eq ! ( store. request_count( ) , 1 ) ;
574496
575497 // No increase because cache has an entry
576- fetch_parquet_metadata (
577- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
578- & meta[ 0 ] ,
579- Some ( size_hint) ,
580- None ,
581- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
582- )
583- . await
584- . expect ( "error reading metadata with hint" ) ;
498+ let df_meta =
499+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
500+ df_meta. fetch_metadata ( ) . await ?;
585501 assert_eq ! ( store. request_count( ) , 1 ) ;
586502
587503 Ok ( ( ) )
@@ -622,16 +538,12 @@ mod tests {
622538 assert_eq ! ( store. request_count( ) , 3 ) ;
623539
624540 // No increase in request count because cache is not empty
625- let pq_meta = fetch_parquet_metadata (
626- ObjectStoreFetch :: new ( store. as_ref ( ) , & files[ 0 ] ) ,
627- & files[ 0 ] ,
628- None ,
629- None ,
630- Some ( state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
631- )
632- . await ?;
633- assert_eq ! ( store. request_count( ) , 3 ) ;
634- let stats = statistics_from_parquet_meta_calc ( & pq_meta, schema. clone ( ) ) ?;
541+ let file_metadata_cache =
542+ state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
543+ let stats = DFParquetMetadata :: new ( store. as_ref ( ) , & files[ 0 ] )
544+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
545+ . fetch_statistics ( & schema)
546+ . await ?;
635547 assert_eq ! ( stats. num_rows, Precision :: Exact ( 4 ) ) ;
636548
637549 // column c_dic
@@ -691,16 +603,13 @@ mod tests {
691603 } ;
692604
693605 // No increase in request count because cache is not empty
694- let pq_meta = fetch_parquet_metadata (
695- ObjectStoreFetch :: new ( store. as_ref ( ) , & files[ 0 ] ) ,
696- & files[ 0 ] ,
697- None ,
698- None ,
699- Some ( state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
700- )
701- . await ?;
606+ let file_metadata_cache =
607+ state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
608+ let stats = DFParquetMetadata :: new ( store. as_ref ( ) , & files[ 0 ] )
609+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
610+ . fetch_statistics ( & schema)
611+ . await ?;
702612 assert_eq ! ( store. request_count( ) , 6 ) ;
703- let stats = statistics_from_parquet_meta_calc ( & pq_meta, schema. clone ( ) ) ?;
704613 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
705614 // column c1
706615 let c1_stats = & stats. column_statistics [ 0 ] ;
@@ -725,16 +634,11 @@ mod tests {
725634 assert_eq ! ( c2_stats. min_value, Precision :: Exact ( null_i64. clone( ) ) ) ;
726635
727636 // No increase in request count because cache is not empty
728- let pq_meta = fetch_parquet_metadata (
729- ObjectStoreFetch :: new ( store. as_ref ( ) , & files[ 1 ] ) ,
730- & files[ 1 ] ,
731- None ,
732- None ,
733- Some ( state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
734- )
735- . await ?;
637+ let stats = DFParquetMetadata :: new ( store. as_ref ( ) , & files[ 1 ] )
638+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
639+ . fetch_statistics ( & schema)
640+ . await ?;
736641 assert_eq ! ( store. request_count( ) , 6 ) ;
737- let stats = statistics_from_parquet_meta_calc ( & pq_meta, schema. clone ( ) ) ?;
738642 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
739643 // column c1: missing from the file so the table treats all 3 rows as null
740644 let c1_stats = & stats. column_statistics [ 0 ] ;
0 commit comments