16
16
// under the License.
17
17
18
18
//! Functions that are query-able and searchable via the `\h` command
19
- use arrow:: array:: Int32Array ;
20
19
use arrow:: array:: { Int64Array , StringArray } ;
21
20
use arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
22
21
use arrow:: record_batch:: RecordBatch ;
@@ -33,6 +32,7 @@ use datafusion::physical_plan::memory::MemoryExec;
33
32
use datafusion:: physical_plan:: ExecutionPlan ;
34
33
use parquet:: file:: reader:: FileReader ;
35
34
use parquet:: file:: serialized_reader:: SerializedFileReader ;
35
+ use parquet:: file:: statistics:: Statistics ;
36
36
use std:: fmt;
37
37
use std:: fs:: File ;
38
38
use std:: str:: FromStr ;
@@ -253,62 +253,149 @@ impl TableFunctionImpl for ParquetMetadataFunc {
253
253
return plan_err ! ( "read_csv requires at least one string argument" ) ;
254
254
} ;
255
255
256
- let file = File :: open ( name) ?;
256
+ let file = File :: open ( name. clone ( ) ) ?;
257
257
let reader = SerializedFileReader :: new ( file) ?;
258
258
let metadata = reader. metadata ( ) ;
259
259
260
260
let schema = Arc :: new ( Schema :: new ( vec ! [
261
- Field :: new( "version" , DataType :: Int32 , false ) ,
262
- Field :: new( "num_rows" , DataType :: Int64 , false ) ,
263
- Field :: new( "created_by" , DataType :: Utf8 , false ) ,
264
- Field :: new( "columns_order" , DataType :: Utf8 , false ) ,
265
- Field :: new( "num_row_groups" , DataType :: Int64 , false ) ,
266
- Field :: new( "row_groups" , DataType :: Utf8 , false ) ,
261
+ Field :: new( "filename" , DataType :: Utf8 , true ) ,
262
+ Field :: new( "row_group_id" , DataType :: Int64 , true ) ,
263
+ Field :: new( "row_group_num_rows" , DataType :: Int64 , true ) ,
264
+ Field :: new( "row_group_num_columns" , DataType :: Int64 , true ) ,
265
+ Field :: new( "row_group_bytes" , DataType :: Int64 , true ) ,
266
+ Field :: new( "column_id" , DataType :: Int64 , true ) ,
267
+ Field :: new( "file_offset" , DataType :: Int64 , true ) ,
268
+ Field :: new( "num_values" , DataType :: Int64 , true ) ,
269
+ Field :: new( "path_in_schema" , DataType :: Utf8 , true ) ,
270
+ Field :: new( "type" , DataType :: Utf8 , true ) ,
271
+ Field :: new( "stats_min" , DataType :: Utf8 , true ) ,
272
+ Field :: new( "stats_max" , DataType :: Utf8 , true ) ,
273
+ Field :: new( "stats_null_count" , DataType :: Int64 , true ) ,
274
+ Field :: new( "stats_distinct_count" , DataType :: Int64 , true ) ,
275
+ Field :: new( "stats_min_value" , DataType :: Utf8 , true ) ,
276
+ Field :: new( "stats_max_value" , DataType :: Utf8 , true ) ,
277
+ Field :: new( "compression" , DataType :: Utf8 , true ) ,
278
+ Field :: new( "encodings" , DataType :: Utf8 , true ) ,
279
+ Field :: new( "index_page_offset" , DataType :: Int64 , true ) ,
280
+ Field :: new( "dictionary_page_offset" , DataType :: Int64 , true ) ,
281
+ Field :: new( "data_page_offset" , DataType :: Int64 , true ) ,
282
+ Field :: new( "total_compressed_size" , DataType :: Int64 , true ) ,
283
+ Field :: new( "total_uncompressed_size" , DataType :: Int64 , true ) ,
267
284
] ) ) ;
268
285
269
286
// construct recordbatch from metadata
270
- let num_groups = metadata. num_row_groups ( ) ;
271
- let row_groups = metadata
272
- . row_groups ( )
273
- . iter ( )
274
- . map ( |rg| {
275
- format ! (
276
- "num_columns: {}, num_rows: {}, total_byte_size: {}, sorting_columns: {:?}" ,
277
- rg. num_columns( ) ,
278
- rg. num_rows( ) ,
279
- rg. total_byte_size( ) ,
280
- rg. sorting_columns( )
281
- )
282
- } )
283
- . collect :: < Vec < String > > ( ) ;
287
+ let mut filename_arr = vec ! [ ] ;
288
+ let mut row_group_id_arr = vec ! [ ] ;
289
+ let mut row_group_num_rows_arr = vec ! [ ] ;
290
+ let mut row_group_num_columns_arr = vec ! [ ] ;
291
+ let mut row_group_bytes_arr = vec ! [ ] ;
292
+ let mut column_id_arr = vec ! [ ] ;
293
+ let mut file_offset_arr = vec ! [ ] ;
294
+ let mut num_values_arr = vec ! [ ] ;
295
+ let mut path_in_schema_arr = vec ! [ ] ;
296
+ let mut type_arr = vec ! [ ] ;
297
+ let mut stats_min_arr = vec ! [ ] ;
298
+ let mut stats_max_arr = vec ! [ ] ;
299
+ let mut stats_null_count_arr = vec ! [ ] ;
300
+ let mut stats_distinct_count_arr = vec ! [ ] ;
301
+ let mut stats_min_value_arr = vec ! [ ] ;
302
+ let mut stats_max_value_arr = vec ! [ ] ;
303
+ let mut compression_arr = vec ! [ ] ;
304
+ let mut encodings_arr = vec ! [ ] ;
305
+ let mut index_page_offset_arr = vec ! [ ] ;
306
+ let mut dictionary_page_offset_arr = vec ! [ ] ;
307
+ let mut data_page_offset_arr = vec ! [ ] ;
308
+ let mut total_compressed_size_arr = vec ! [ ] ;
309
+ let mut total_uncompressed_size_arr = vec ! [ ] ;
310
+ for ( rg_idx, row_group) in metadata. row_groups ( ) . iter ( ) . enumerate ( ) {
311
+ for ( col_idx, column) in row_group. columns ( ) . iter ( ) . enumerate ( ) {
312
+ filename_arr. push ( name. clone ( ) ) ;
313
+ row_group_id_arr. push ( rg_idx as i64 ) ;
314
+ row_group_num_rows_arr. push ( row_group. num_rows ( ) ) ;
315
+ row_group_num_columns_arr. push ( row_group. num_columns ( ) as i64 ) ;
316
+ row_group_bytes_arr. push ( row_group. total_byte_size ( ) ) ;
317
+ column_id_arr. push ( col_idx as i64 ) ;
318
+ file_offset_arr. push ( column. file_offset ( ) ) ;
319
+ num_values_arr. push ( column. num_values ( ) ) ;
320
+ path_in_schema_arr. push ( column. column_path ( ) . to_string ( ) ) ;
321
+ type_arr. push ( column. column_type ( ) . to_string ( ) ) ;
322
+ if let Some ( s) = column. statistics ( ) {
323
+ let ( min_val, max_val) = match s {
324
+ Statistics :: Boolean ( val) => {
325
+ ( val. min ( ) . to_string ( ) , val. max ( ) . to_string ( ) )
326
+ }
327
+ Statistics :: Int32 ( val) => {
328
+ ( val. min ( ) . to_string ( ) , val. max ( ) . to_string ( ) )
329
+ }
330
+ Statistics :: Int64 ( val) => {
331
+ ( val. min ( ) . to_string ( ) , val. max ( ) . to_string ( ) )
332
+ }
333
+ Statistics :: Int96 ( val) => {
334
+ ( val. min ( ) . to_string ( ) , val. max ( ) . to_string ( ) )
335
+ }
336
+ Statistics :: Float ( val) => {
337
+ ( val. min ( ) . to_string ( ) , val. max ( ) . to_string ( ) )
338
+ }
339
+ Statistics :: Double ( val) => {
340
+ ( val. min ( ) . to_string ( ) , val. max ( ) . to_string ( ) )
341
+ }
342
+ Statistics :: ByteArray ( val) => {
343
+ ( val. min ( ) . to_string ( ) , val. max ( ) . to_string ( ) )
344
+ }
345
+ Statistics :: FixedLenByteArray ( val) => {
346
+ ( val. min ( ) . to_string ( ) , val. max ( ) . to_string ( ) )
347
+ }
348
+ } ;
349
+ stats_min_arr. push ( Some ( min_val. clone ( ) ) ) ;
350
+ stats_max_arr. push ( Some ( max_val. clone ( ) ) ) ;
351
+ stats_null_count_arr. push ( Some ( s. null_count ( ) as i64 ) ) ;
352
+ stats_distinct_count_arr. push ( s. distinct_count ( ) . map ( |c| c as i64 ) ) ;
353
+ stats_min_value_arr. push ( Some ( min_val) ) ;
354
+ stats_max_value_arr. push ( Some ( max_val) ) ;
355
+ } else {
356
+ stats_min_arr. push ( None ) ;
357
+ stats_max_arr. push ( None ) ;
358
+ stats_null_count_arr. push ( None ) ;
359
+ stats_distinct_count_arr. push ( None ) ;
360
+ stats_min_value_arr. push ( None ) ;
361
+ stats_max_value_arr. push ( None ) ;
362
+ } ;
363
+ compression_arr. push ( format ! ( "{:?}" , column. compression( ) ) ) ;
364
+ encodings_arr. push ( format ! ( "{:?}" , column. encodings( ) ) ) ;
365
+ index_page_offset_arr. push ( column. index_page_offset ( ) ) ;
366
+ dictionary_page_offset_arr. push ( column. dictionary_page_offset ( ) ) ;
367
+ data_page_offset_arr. push ( column. data_page_offset ( ) ) ;
368
+ total_compressed_size_arr. push ( column. compressed_size ( ) ) ;
369
+ total_uncompressed_size_arr. push ( column. uncompressed_size ( ) ) ;
370
+ }
371
+ }
284
372
285
373
let rb = RecordBatch :: try_new (
286
374
schema. clone ( ) ,
287
375
vec ! [
288
- Arc :: new( Int32Array :: from( vec![
289
- metadata. file_metadata( ) . version( ) ;
290
- num_groups
291
- ] ) ) ,
292
- Arc :: new( Int64Array :: from( vec![
293
- metadata. file_metadata( ) . num_rows( ) ;
294
- num_groups
295
- ] ) ) ,
296
- Arc :: new( StringArray :: from( vec![
297
- format!(
298
- "{:?}" ,
299
- metadata. file_metadata( ) . created_by( )
300
- ) ;
301
- num_groups
302
- ] ) ) ,
303
- Arc :: new( StringArray :: from( vec![
304
- format!(
305
- "{:?}" ,
306
- metadata. file_metadata( ) . column_orders( )
307
- ) ;
308
- num_groups
309
- ] ) ) ,
310
- Arc :: new( Int64Array :: from( vec![ num_groups as i64 ; num_groups] ) ) ,
311
- Arc :: new( StringArray :: from( row_groups) ) ,
376
+ Arc :: new( StringArray :: from( filename_arr) ) ,
377
+ Arc :: new( Int64Array :: from( row_group_id_arr) ) ,
378
+ Arc :: new( Int64Array :: from( row_group_num_rows_arr) ) ,
379
+ Arc :: new( Int64Array :: from( row_group_num_columns_arr) ) ,
380
+ Arc :: new( Int64Array :: from( row_group_bytes_arr) ) ,
381
+ Arc :: new( Int64Array :: from( column_id_arr) ) ,
382
+ Arc :: new( Int64Array :: from( file_offset_arr) ) ,
383
+ Arc :: new( Int64Array :: from( num_values_arr) ) ,
384
+ Arc :: new( StringArray :: from( path_in_schema_arr) ) ,
385
+ Arc :: new( StringArray :: from( type_arr) ) ,
386
+ Arc :: new( StringArray :: from( stats_min_arr) ) ,
387
+ Arc :: new( StringArray :: from( stats_max_arr) ) ,
388
+ Arc :: new( Int64Array :: from( stats_null_count_arr) ) ,
389
+ Arc :: new( Int64Array :: from( stats_distinct_count_arr) ) ,
390
+ Arc :: new( StringArray :: from( stats_min_value_arr) ) ,
391
+ Arc :: new( StringArray :: from( stats_max_value_arr) ) ,
392
+ Arc :: new( StringArray :: from( compression_arr) ) ,
393
+ Arc :: new( StringArray :: from( encodings_arr) ) ,
394
+ Arc :: new( Int64Array :: from( index_page_offset_arr) ) ,
395
+ Arc :: new( Int64Array :: from( dictionary_page_offset_arr) ) ,
396
+ Arc :: new( Int64Array :: from( data_page_offset_arr) ) ,
397
+ Arc :: new( Int64Array :: from( total_compressed_size_arr) ) ,
398
+ Arc :: new( Int64Array :: from( total_uncompressed_size_arr) ) ,
312
399
] ,
313
400
) ?;
314
401
0 commit comments