Skip to content

Commit

Permalink
Suppot TimeStamp for Parquet Statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Feb 5, 2024
1 parent fd51edc commit 90d162e
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 11 deletions.
143 changes: 134 additions & 9 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions

use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow_array::types::Utf8Type;
use arrow_array::{ArrayRef, GenericByteArray, RecordBatch};
use arrow_ipc::Utf8;
use arrow_array::types::{
ArrowTimestampType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType, Utf8Type,
};
use arrow_array::{GenericByteArray, PrimitiveArray, RecordBatch};
use arrow_schema::TimeUnit;
use async_trait::async_trait;
use datafusion_common::stats::Precision;
use datafusion_physical_plan::metrics::MetricsSet;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn,
};
use parquet::data_type::ByteArray;
use parquet::file::writer::SerializedFileWriter;
use std::any::Any;
use std::fmt;
Expand All @@ -43,7 +45,9 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, Schema};
use bytes::{BufMut, BytesMut};
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_common::{
exec_datafusion_err, exec_err, not_impl_err, DataFusionError, FileType,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use futures::{StreamExt, TryStreamExt};
Expand All @@ -56,7 +60,7 @@ use parquet::arrow::{
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics};
use parquet::file::statistics::Statistics as ParquetStatistics;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, AbortableWrite, SharedBuffer};
Expand Down Expand Up @@ -374,8 +378,8 @@ fn summarize_min_max(
}
}

ParquetStatistics::ByteArray(s) => {
if let DataType::Utf8 = fields[i].data_type() {
ParquetStatistics::ByteArray(s) => match fields[i].data_type() {
DataType::Utf8 => {
if let Some(max_value) = &mut max_values[i] {
let stat_value = s.max().as_utf8().unwrap_or_default();
let array: GenericByteArray<Utf8Type> = vec![Some(stat_value)].into();
Expand All @@ -395,12 +399,133 @@ fn summarize_min_max(
}
}
}
}
DataType::Timestamp(time_unit, time_zone) => match time_unit {
TimeUnit::Nanosecond => {
(max_values[i].as_mut())
.and_then(|max_value| {
handle_timestamp_statistics::<TimestampNanosecondType>(
max_value,
s.max_bytes(),
time_zone,
)
.ok()
})
.unwrap_or_else(|| max_values[i] = None);

(min_values[i].as_mut())
.and_then(|min_value| {
handle_timestamp_statistics::<TimestampNanosecondType>(
min_value,
s.max_bytes(),
time_zone,
)
.ok()
})
.unwrap_or_else(|| min_values[i] = None);
}
TimeUnit::Microsecond => {
(max_values[i].as_mut())
.and_then(|max_value| {
handle_timestamp_statistics::<TimestampMicrosecondType>(
max_value,
s.max_bytes(),
time_zone,
)
.ok()
})
.unwrap_or_else(|| max_values[i] = None);

(min_values[i].as_mut())
.and_then(|min_value| {
handle_timestamp_statistics::<TimestampMicrosecondType>(
min_value,
s.max_bytes(),
time_zone,
)
.ok()
})
.unwrap_or_else(|| min_values[i] = None);
}
TimeUnit::Millisecond => {
(max_values[i].as_mut())
.and_then(|max_value| {
handle_timestamp_statistics::<TimestampMillisecondType>(
max_value,
s.max_bytes(),
time_zone,
)
.ok()
})
.unwrap_or_else(|| max_values[i] = None);

(min_values[i].as_mut())
.and_then(|min_value| {
handle_timestamp_statistics::<TimestampMillisecondType>(
min_value,
s.max_bytes(),
time_zone,
)
.ok()
})
.unwrap_or_else(|| min_values[i] = None);
}
TimeUnit::Second => {
(max_values[i].as_mut())
.and_then(|max_value| {
handle_timestamp_statistics::<TimestampSecondType>(
max_value,
s.max_bytes(),
time_zone,
)
.ok()
})
.unwrap_or_else(|| max_values[i] = None);

(min_values[i].as_mut())
.and_then(|min_value| {
handle_timestamp_statistics::<TimestampSecondType>(
min_value,
s.max_bytes(),
time_zone,
)
.ok()
})
.unwrap_or_else(|| min_values[i] = None);
}
},
_ => {
max_values[i] = None;
min_values[i] = None;
}
},
_ => {
max_values[i] = None;
min_values[i] = None;
}
}
fn handle_timestamp_statistics<T: ArrowTimestampType>(
accumulator: &mut dyn Accumulator,
time_value: &[u8],
time_zone: &Option<Arc<str>>,
) -> Result<()> {
let factor = match T::UNIT {
TimeUnit::Second => 1_000_000_000,
TimeUnit::Millisecond => 1_000_000,
TimeUnit::Microsecond => 1_000,
TimeUnit::Nanosecond => 1,
};

let stat_value = std::str::from_utf8(time_value)
.map_err(|_| exec_datafusion_err!("Failed to parse timestamp statistics"))?;
let ts = string_to_timestamp_nanos(stat_value).map(|ts| ts / factor)?;

let array = PrimitiveArray::<T>::from_iter_values(Some(ts));
let array = array.with_timezone_opt(time_zone.clone());

accumulator.update_batch(&[Arc::new(array)])?;

Ok(())
}
}

/// Fetches parquet metadata from ObjectStore for given object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use crate::arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use crate::arrow::datatypes::TimestampNanosecondType;
use crate::arrow::datatypes::{ArrowTimestampType, TimeUnit};
use crate::arrow::datatypes::TimeUnit;
use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::new_empty_array;
use arrow_schema::{FieldRef, Schema};
Expand Down

0 comments on commit 90d162e

Please sign in to comment.