@@ -133,8 +133,7 @@ mod tests {
133133 use datafusion_datasource:: file_sink_config:: { FileSink , FileSinkConfig } ;
134134 use datafusion_datasource:: { ListingTableUrl , PartitionedFile } ;
135135 use datafusion_datasource_parquet:: {
136- fetch_parquet_metadata, fetch_statistics, statistics_from_parquet_meta_calc,
137- ObjectStoreFetch , ParquetFormat , ParquetFormatFactory , ParquetSink ,
136+ ParquetFormat , ParquetFormatFactory , ParquetSink ,
138137 } ;
139138 use datafusion_execution:: object_store:: ObjectStoreUrl ;
140139 use datafusion_execution:: runtime_env:: RuntimeEnv ;
@@ -143,13 +142,15 @@ mod tests {
143142 use datafusion_physical_plan:: stream:: RecordBatchStreamAdapter ;
144143 use datafusion_physical_plan:: { collect, ExecutionPlan } ;
145144
145+ use crate :: test_util:: bounded_stream;
146146 use arrow:: array:: {
147147 types:: Int32Type , Array , ArrayRef , DictionaryArray , Int32Array , Int64Array ,
148148 StringArray ,
149149 } ;
150150 use arrow:: datatypes:: { DataType , Field } ;
151151 use async_trait:: async_trait;
152152 use datafusion_datasource:: file_groups:: FileGroup ;
153+ use datafusion_datasource_parquet:: metadata:: DFParquetMetadata ;
153154 use futures:: stream:: BoxStream ;
154155 use futures:: StreamExt ;
155156 use insta:: assert_snapshot;
@@ -167,8 +168,6 @@ mod tests {
167168 use parquet:: format:: FileMetaData ;
168169 use tokio:: fs:: File ;
169170
170- use crate :: test_util:: bounded_stream;
171-
172171 enum ForceViews {
173172 Yes ,
174173 No ,
@@ -195,31 +194,24 @@ mod tests {
195194 let format = ParquetFormat :: default ( ) . with_force_view_types ( force_views) ;
196195 let schema = format. infer_schema ( & ctx, & store, & meta) . await ?;
197196
198- let stats = fetch_statistics (
199- store. as_ref ( ) ,
200- schema. clone ( ) ,
201- & meta[ 0 ] ,
202- None ,
203- None ,
204- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
205- )
206- . await ?;
197+ let file_metadata_cache =
198+ ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
199+ let stats = DFParquetMetadata :: new ( & store, & meta[ 0 ] )
200+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
201+ . fetch_statistics ( & schema)
202+ . await ?;
207203
208204 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
209205 let c1_stats = & stats. column_statistics [ 0 ] ;
210206 let c2_stats = & stats. column_statistics [ 1 ] ;
211207 assert_eq ! ( c1_stats. null_count, Precision :: Exact ( 1 ) ) ;
212208 assert_eq ! ( c2_stats. null_count, Precision :: Exact ( 3 ) ) ;
213209
214- let stats = fetch_statistics (
215- store. as_ref ( ) ,
216- schema,
217- & meta[ 1 ] ,
218- None ,
219- None ,
220- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
221- )
222- . await ?;
210+ let stats = DFParquetMetadata :: new ( & store, & meta[ 1 ] )
211+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
212+ . fetch_statistics ( & schema)
213+ . await ?;
214+
223215 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
224216 let c1_stats = & stats. column_statistics [ 0 ] ;
225217 let c2_stats = & stats. column_statistics [ 1 ] ;
@@ -392,51 +384,27 @@ mod tests {
392384
393385 // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
394386 // for the remaining metadata
395- fetch_parquet_metadata (
396- ObjectStoreFetch :: new ( store. as_ref ( ) as & dyn ObjectStore , & meta[ 0 ] ) ,
397- & meta[ 0 ] ,
398- Some ( 9 ) ,
399- None ,
400- None ,
401- )
402- . await
403- . expect ( "error reading metadata with hint" ) ;
387+ let file_metadata_cache =
388+ ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
389+ let df_meta = DFParquetMetadata :: new ( store. as_ref ( ) , & meta[ 0 ] )
390+ . with_metadata_size_hint ( Some ( 9 ) ) ;
391+ df_meta. fetch_metadata ( ) . await ?;
404392 assert_eq ! ( store. request_count( ) , 2 ) ;
405393
394+ let df_meta =
395+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
396+
406397 // Increases by 3 because cache has no entries yet
407- fetch_parquet_metadata (
408- ObjectStoreFetch :: new ( store. as_ref ( ) as & dyn ObjectStore , & meta[ 0 ] ) ,
409- & meta[ 0 ] ,
410- Some ( 9 ) ,
411- None ,
412- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
413- )
414- . await
415- . expect ( "error reading metadata with hint" ) ;
398+ df_meta. fetch_metadata ( ) . await ?;
416399 assert_eq ! ( store. request_count( ) , 5 ) ;
417400
418401 // No increase because cache has an entry
419- fetch_parquet_metadata (
420- ObjectStoreFetch :: new ( store. as_ref ( ) as & dyn ObjectStore , & meta[ 0 ] ) ,
421- & meta[ 0 ] ,
422- Some ( 9 ) ,
423- None ,
424- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
425- )
426- . await
427- . expect ( "error reading metadata with hint" ) ;
402+ df_meta. fetch_metadata ( ) . await ?;
428403 assert_eq ! ( store. request_count( ) , 5 ) ;
429404
430405 // Increase by 2 because `get_file_metadata_cache()` is None
431- fetch_parquet_metadata (
432- ObjectStoreFetch :: new ( store. as_ref ( ) as & dyn ObjectStore , & meta[ 0 ] ) ,
433- & meta[ 0 ] ,
434- Some ( 9 ) ,
435- None ,
436- None ,
437- )
438- . await
439- . expect ( "error reading metadata with hint" ) ;
406+ let df_meta = df_meta. with_file_metadata_cache ( None ) ;
407+ df_meta. fetch_metadata ( ) . await ?;
440408 assert_eq ! ( store. request_count( ) , 7 ) ;
441409
442410 let force_views = match force_views {
@@ -454,15 +422,9 @@ mod tests {
454422 assert_eq ! ( store. request_count( ) , 10 ) ;
455423
456424 // No increase, cache being used
457- let stats = fetch_statistics (
458- store. upcast ( ) . as_ref ( ) ,
459- schema. clone ( ) ,
460- & meta[ 0 ] ,
461- Some ( 9 ) ,
462- None ,
463- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
464- )
465- . await ?;
425+ let df_meta =
426+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
427+ let stats = df_meta. fetch_statistics ( & schema) . await ?;
466428 assert_eq ! ( store. request_count( ) , 10 ) ;
467429
468430 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
@@ -477,55 +439,30 @@ mod tests {
477439
478440 // Use the file size as the hint so we can get the full metadata from the first fetch
479441 let size_hint = meta[ 0 ] . size as usize ;
442+ let df_meta = DFParquetMetadata :: new ( store. as_ref ( ) , & meta[ 0 ] )
443+ . with_metadata_size_hint ( Some ( size_hint) ) ;
480444
481- fetch_parquet_metadata (
482- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
483- & meta[ 0 ] ,
484- Some ( size_hint) ,
485- None ,
486- None ,
487- )
488- . await
489- . expect ( "error reading metadata with hint" ) ;
445+ df_meta. fetch_metadata ( ) . await ?;
490446 // ensure the requests were coalesced into a single request
491447 assert_eq ! ( store. request_count( ) , 1 ) ;
492448
493449 let session = SessionContext :: new ( ) ;
494450 let ctx = session. state ( ) ;
451+ let file_metadata_cache =
452+ ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
453+ let df_meta =
454+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
495455 // Increases by 1 because cache has no entries yet and new session context
496- fetch_parquet_metadata (
497- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
498- & meta[ 0 ] ,
499- Some ( size_hint) ,
500- None ,
501- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
502- )
503- . await
504- . expect ( "error reading metadata with hint" ) ;
456+ df_meta. fetch_metadata ( ) . await ?;
505457 assert_eq ! ( store. request_count( ) , 2 ) ;
506458
507459 // No increase because cache has an entry
508- fetch_parquet_metadata (
509- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
510- & meta[ 0 ] ,
511- Some ( size_hint) ,
512- None ,
513- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
514- )
515- . await
516- . expect ( "error reading metadata with hint" ) ;
460+ df_meta. fetch_metadata ( ) . await ?;
517461 assert_eq ! ( store. request_count( ) , 2 ) ;
518462
519463 // Increase by 1 because `get_file_metadata_cache` is None
520- fetch_parquet_metadata (
521- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
522- & meta[ 0 ] ,
523- Some ( size_hint) ,
524- None ,
525- None ,
526- )
527- . await
528- . expect ( "error reading metadata with hint" ) ;
464+ let df_meta = df_meta. with_file_metadata_cache ( None ) ;
465+ df_meta. fetch_metadata ( ) . await ?;
529466 assert_eq ! ( store. request_count( ) , 3 ) ;
530467
531468 let format = ParquetFormat :: default ( )
@@ -538,15 +475,9 @@ mod tests {
538475 let schema = format. infer_schema ( & ctx, & store. upcast ( ) , & meta) . await ?;
539476 assert_eq ! ( store. request_count( ) , 4 ) ;
540477 // No increase, cache being used
541- let stats = fetch_statistics (
542- store. upcast ( ) . as_ref ( ) ,
543- schema. clone ( ) ,
544- & meta[ 0 ] ,
545- Some ( size_hint) ,
546- None ,
547- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
548- )
549- . await ?;
478+ let df_meta =
479+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
480+ let stats = df_meta. fetch_statistics ( & schema) . await ?;
550481 assert_eq ! ( store. request_count( ) , 4 ) ;
551482
552483 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
@@ -559,29 +490,18 @@ mod tests {
559490 LocalFileSystem :: new ( ) ,
560491 ) ) ) ;
561492
562- // Use the a size hint larger than the file size to make sure we don't panic
493+ // Use a size hint larger than the file size to make sure we don't panic
563494 let size_hint = ( meta[ 0 ] . size + 100 ) as usize ;
564- fetch_parquet_metadata (
565- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
566- & meta[ 0 ] ,
567- Some ( size_hint) ,
568- None ,
569- None ,
570- )
571- . await
572- . expect ( "error reading metadata with hint" ) ;
495+ let df_meta = DFParquetMetadata :: new ( store. as_ref ( ) , & meta[ 0 ] )
496+ . with_metadata_size_hint ( Some ( size_hint) ) ;
497+
498+ df_meta. fetch_metadata ( ) . await ?;
573499 assert_eq ! ( store. request_count( ) , 1 ) ;
574500
575501 // No increase because cache has an entry
576- fetch_parquet_metadata (
577- ObjectStoreFetch :: new ( store. upcast ( ) . as_ref ( ) , & meta[ 0 ] ) ,
578- & meta[ 0 ] ,
579- Some ( size_hint) ,
580- None ,
581- Some ( ctx. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
582- )
583- . await
584- . expect ( "error reading metadata with hint" ) ;
502+ let df_meta =
503+ df_meta. with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) ) ;
504+ df_meta. fetch_metadata ( ) . await ?;
585505 assert_eq ! ( store. request_count( ) , 1 ) ;
586506
587507 Ok ( ( ) )
@@ -622,16 +542,12 @@ mod tests {
622542 assert_eq ! ( store. request_count( ) , 3 ) ;
623543
624544 // No increase in request count because cache is not empty
625- let pq_meta = fetch_parquet_metadata (
626- ObjectStoreFetch :: new ( store. as_ref ( ) , & files[ 0 ] ) ,
627- & files[ 0 ] ,
628- None ,
629- None ,
630- Some ( state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
631- )
632- . await ?;
633- assert_eq ! ( store. request_count( ) , 3 ) ;
634- let stats = statistics_from_parquet_meta_calc ( & pq_meta, schema. clone ( ) ) ?;
545+ let file_metadata_cache =
546+ state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
547+ let stats = DFParquetMetadata :: new ( store. as_ref ( ) , & files[ 0 ] )
548+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
549+ . fetch_statistics ( & schema)
550+ . await ?;
635551 assert_eq ! ( stats. num_rows, Precision :: Exact ( 4 ) ) ;
636552
637553 // column c_dic
@@ -691,16 +607,13 @@ mod tests {
691607 } ;
692608
693609 // No increase in request count because cache is not empty
694- let pq_meta = fetch_parquet_metadata (
695- ObjectStoreFetch :: new ( store. as_ref ( ) , & files[ 0 ] ) ,
696- & files[ 0 ] ,
697- None ,
698- None ,
699- Some ( state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
700- )
701- . await ?;
610+ let file_metadata_cache =
611+ state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ;
612+ let stats = DFParquetMetadata :: new ( store. as_ref ( ) , & files[ 0 ] )
613+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
614+ . fetch_statistics ( & schema)
615+ . await ?;
702616 assert_eq ! ( store. request_count( ) , 6 ) ;
703- let stats = statistics_from_parquet_meta_calc ( & pq_meta, schema. clone ( ) ) ?;
704617 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
705618 // column c1
706619 let c1_stats = & stats. column_statistics [ 0 ] ;
@@ -725,16 +638,11 @@ mod tests {
725638 assert_eq ! ( c2_stats. min_value, Precision :: Exact ( null_i64. clone( ) ) ) ;
726639
727640 // No increase in request count because cache is not empty
728- let pq_meta = fetch_parquet_metadata (
729- ObjectStoreFetch :: new ( store. as_ref ( ) , & files[ 1 ] ) ,
730- & files[ 1 ] ,
731- None ,
732- None ,
733- Some ( state. runtime_env ( ) . cache_manager . get_file_metadata_cache ( ) ) ,
734- )
735- . await ?;
641+ let stats = DFParquetMetadata :: new ( store. as_ref ( ) , & files[ 1 ] )
642+ . with_file_metadata_cache ( Some ( Arc :: clone ( & file_metadata_cache) ) )
643+ . fetch_statistics ( & schema)
644+ . await ?;
736645 assert_eq ! ( store. request_count( ) , 6 ) ;
737- let stats = statistics_from_parquet_meta_calc ( & pq_meta, schema. clone ( ) ) ?;
738646 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
739647 // column c1: missing from the file so the table treats all 3 rows as null
740648 let c1_stats = & stats. column_statistics [ 0 ] ;
0 commit comments