From 90d162ef85ffce9ca9602409af8e1eb23501addc Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 5 Feb 2024 14:57:18 +0800 Subject: [PATCH] Suppot TimeStamp for Parquet Statistics --- .../src/datasource/file_format/parquet.rs | 143 ++++++++++++++++-- .../physical_plan/parquet/statistics.rs | 3 +- 2 files changed, 135 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6b03ee80fadb2..77bae93198dfe 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -18,9 +18,12 @@ //! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; -use arrow_array::types::Utf8Type; -use arrow_array::{ArrayRef, GenericByteArray, RecordBatch}; -use arrow_ipc::Utf8; +use arrow_array::types::{ + ArrowTimestampType, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, Utf8Type, +}; +use arrow_array::{GenericByteArray, PrimitiveArray, RecordBatch}; +use arrow_schema::TimeUnit; use async_trait::async_trait; use datafusion_common::stats::Precision; use datafusion_physical_plan::metrics::MetricsSet; @@ -28,7 +31,6 @@ use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, }; -use parquet::data_type::ByteArray; use parquet::file::writer::SerializedFileWriter; use std::any::Any; use std::fmt; @@ -43,7 +45,9 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use bytes::{BufMut, BytesMut}; -use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; +use datafusion_common::{ + exec_datafusion_err, exec_err, not_impl_err, DataFusionError, FileType, +}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use futures::{StreamExt, TryStreamExt}; @@ -56,7 +60,7 @@ use parquet::arrow::{ use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; -use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics}; +use parquet::file::statistics::Statistics as ParquetStatistics; use super::write::demux::start_demuxer_task; use super::write::{create_writer, AbortableWrite, SharedBuffer}; @@ -374,8 +378,8 @@ fn summarize_min_max( } } - ParquetStatistics::ByteArray(s) => { - if let DataType::Utf8 = fields[i].data_type() { + ParquetStatistics::ByteArray(s) => match fields[i].data_type() { + DataType::Utf8 => { if let Some(max_value) = &mut max_values[i] { let stat_value = s.max().as_utf8().unwrap_or_default(); let array: GenericByteArray = vec![Some(stat_value)].into(); @@ -395,12 +399,133 @@ fn summarize_min_max( } } } - } + DataType::Timestamp(time_unit, time_zone) => match time_unit { + TimeUnit::Nanosecond => { + (max_values[i].as_mut()) + .and_then(|max_value| { + handle_timestamp_statistics::( + max_value, + s.max_bytes(), + time_zone, + ) + .ok() + }) + .unwrap_or_else(|| max_values[i] = None); + + (min_values[i].as_mut()) + .and_then(|min_value| { + handle_timestamp_statistics::( + min_value, + s.max_bytes(), + time_zone, + ) + .ok() + }) + .unwrap_or_else(|| min_values[i] = None); + } + TimeUnit::Microsecond => { + (max_values[i].as_mut()) + .and_then(|max_value| { + handle_timestamp_statistics::( + max_value, + s.max_bytes(), + time_zone, + ) + .ok() + }) + .unwrap_or_else(|| max_values[i] = None); + + (min_values[i].as_mut()) + .and_then(|min_value| { + handle_timestamp_statistics::( + min_value, + s.max_bytes(), + time_zone, + ) + .ok() + }) + .unwrap_or_else(|| min_values[i] = None); + } + TimeUnit::Millisecond => { + (max_values[i].as_mut()) + .and_then(|max_value| { + handle_timestamp_statistics::( + max_value, + s.max_bytes(), + time_zone, + ) + .ok() + }) + .unwrap_or_else(|| max_values[i] = None); + + (min_values[i].as_mut()) + .and_then(|min_value| { + handle_timestamp_statistics::( + min_value, + s.max_bytes(), + time_zone, + ) + .ok() + }) + .unwrap_or_else(|| min_values[i] = None); + } + TimeUnit::Second => { + (max_values[i].as_mut()) + .and_then(|max_value| { + handle_timestamp_statistics::( + max_value, + s.max_bytes(), + time_zone, + ) + .ok() + }) + .unwrap_or_else(|| max_values[i] = None); + + (min_values[i].as_mut()) + .and_then(|min_value| { + handle_timestamp_statistics::( + min_value, + s.max_bytes(), + time_zone, + ) + .ok() + }) + .unwrap_or_else(|| min_values[i] = None); + } + }, + _ => { + max_values[i] = None; + min_values[i] = None; + } + }, _ => { max_values[i] = None; min_values[i] = None; } } + fn handle_timestamp_statistics( + accumulator: &mut dyn Accumulator, + time_value: &[u8], + time_zone: &Option>, + ) -> Result<()> { + let factor = match T::UNIT { + TimeUnit::Second => 1_000_000_000, + TimeUnit::Millisecond => 1_000_000, + TimeUnit::Microsecond => 1_000, + TimeUnit::Nanosecond => 1, + }; + + let stat_value = std::str::from_utf8(time_value) + .map_err(|_| exec_datafusion_err!("Failed to parse timestamp statistics"))?; + let ts = string_to_timestamp_nanos(stat_value).map(|ts| ts / factor)?; + + let array = PrimitiveArray::::from_iter_values(Some(ts)); + let array = array.with_timezone_opt(time_zone.clone()); + + accumulator.update_batch(&[Arc::new(array)])?; + + Ok(()) + } } /// Fetches parquet metadata from ObjectStore for given object diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 55bb0a314cc8a..7b282452b75d9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -20,8 +20,7 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 use crate::arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; -use crate::arrow::datatypes::TimestampNanosecondType; -use crate::arrow::datatypes::{ArrowTimestampType, TimeUnit}; +use crate::arrow::datatypes::TimeUnit; use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::new_empty_array; use arrow_schema::{FieldRef, Schema};