From a9139196f0f1498959a804d533fe2397cd36fb2a Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Sat, 2 Sep 2023 17:10:03 -0700 Subject: [PATCH] Correctly coerce Parquet Int96 timestamps into requested TimeUnits (#1532) * Add correct coercion logic when reading int96 timestamps into specified timeunits * Refactor to better inline nested function call in Iter::new * Fix static check issues - immediately wrap and return an ArrayIter object * Fix lints --------- Co-authored-by: Jay Chia --- src/io/parquet/read/deserialize/simple.rs | 77 ++++++++++++++++++++--- tests/it/io/parquet/read.rs | 58 +++++++++++++++++ 2 files changed, 125 insertions(+), 10 deletions(-) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index b4b614980e..d19296a4b7 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -391,6 +391,44 @@ fn unifiy_timestmap_unit( } } +#[inline] +pub fn int96_to_i64_us(value: [u32; 3]) -> i64 { + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const SECONDS_PER_DAY: i64 = 86_400; + const MICROS_PER_SECOND: i64 = 1_000_000; + + let day = value[2] as i64; + let microseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + + seconds * MICROS_PER_SECOND + microseconds +} + +#[inline] +pub fn int96_to_i64_ms(value: [u32; 3]) -> i64 { + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const SECONDS_PER_DAY: i64 = 86_400; + const MILLIS_PER_SECOND: i64 = 1_000; + + let day = value[2] as i64; + let milliseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + + seconds * MILLIS_PER_SECOND + milliseconds +} + +#[inline] +pub fn int96_to_i64_s(value: [u32; 3]) -> i64 { + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const SECONDS_PER_DAY: i64 = 86_400; + + let day = value[2] as i64; + let seconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000_000; + let day_seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + + day_seconds + seconds +} + fn timestamp<'a, I: Pages + 'a>( pages: I, physical_type: &PhysicalType, @@ -401,16 +439,35 @@ fn timestamp<'a, I: Pages + 'a>( time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { - let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns); - let logical_type = PrimitiveLogicalType::Timestamp { - unit: ParquetTimeUnit::Nanoseconds, - is_adjusted_to_utc: false, - }; - let (factor, is_multiplier) = unifiy_timestmap_unit(&Some(logical_type), time_unit); - return match (factor, is_multiplier) { - (1, _) => Ok(dyn_iter(iden(iter))), - (a, true) => Ok(dyn_iter(op(iter, move |x| x * a))), - (a, false) => Ok(dyn_iter(op(iter, move |x| x / a))), + return match time_unit { + TimeUnit::Nanosecond => Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + num_rows, + chunk_size, + int96_to_i64_ns, + )))), + TimeUnit::Microsecond => Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + num_rows, + chunk_size, + int96_to_i64_us, + )))), + TimeUnit::Millisecond => Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + num_rows, + chunk_size, + int96_to_i64_ms, + )))), + TimeUnit::Second => Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + num_rows, + chunk_size, + int96_to_i64_s, + )))), }; }; diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index a2237b4926..8f45eb874d 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -782,3 +782,61 @@ fn invalid_utf8() -> Result<()> { ); Ok(()) } + +#[test] +fn read_int96_timestamps() -> Result<()> { + use std::collections::BTreeMap; + + let timestamp_data = &[ + 0x50, 0x41, 0x52, 0x31, 0x15, 0x04, 0x15, 0x48, 0x15, 0x3c, 0x4c, 0x15, 0x06, 0x15, 0x00, + 0x12, 0x00, 0x00, 0x24, 0x00, 0x00, 0x0d, 0x01, 0x08, 0x9f, 0xd5, 0x1f, 0x0d, 0x0a, 0x44, + 0x00, 0x00, 0x59, 0x68, 0x25, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, + 0xfb, 0x2a, 0x00, 0x15, 0x00, 0x15, 0x14, 0x15, 0x18, 0x2c, 0x15, 0x06, 0x15, 0x10, 0x15, + 0x06, 0x15, 0x06, 0x1c, 0x00, 0x00, 0x00, 0x0a, 0x24, 0x02, 0x00, 0x00, 0x00, 0x06, 0x01, + 0x02, 0x03, 0x24, 0x00, 0x26, 0x9e, 0x01, 0x1c, 0x15, 0x06, 0x19, 0x35, 0x10, 0x00, 0x06, + 0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x73, 0x15, 0x02, + 0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26, 0x60, 0x26, 0x08, 0x29, 0x2c, 0x15, + 0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15, 0x10, 0x15, 0x02, 0x00, 0x00, 0x00, + 0x15, 0x04, 0x19, 0x2c, 0x35, 0x00, 0x18, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x15, + 0x02, 0x00, 0x15, 0x06, 0x25, 0x02, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x73, 0x00, 0x16, 0x06, 0x19, 0x1c, 0x19, 0x1c, 0x26, 0x9e, 0x01, 0x1c, 0x15, + 0x06, 0x19, 0x35, 0x10, 0x00, 0x06, 0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x73, 0x15, 0x02, 0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26, + 0x60, 0x26, 0x08, 0x29, 0x2c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15, + 0x10, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x9e, 0x01, 0x16, 0x06, 0x26, 0x08, 0x16, 0x96, + 0x01, 0x14, 0x00, 0x00, 0x28, 0x20, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x2d, 0x63, + 0x70, 0x70, 0x2d, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x20, 0x31, 0x32, 0x2e, 0x30, 0x2e, 0x30, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x95, + 0x00, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31, + ]; + + let parse = |time_unit: TimeUnit| { + let mut reader = Cursor::new(timestamp_data); + let metadata = read_metadata(&mut reader)?; + let schema = arrow2::datatypes::Schema { + fields: vec![arrow2::datatypes::Field::new( + "timestamps", + arrow2::datatypes::DataType::Timestamp(time_unit, None), + false, + )], + metadata: BTreeMap::new(), + }; + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None, None); + reader.collect::>>() + }; + + // This data contains int96 timestamps in the year 1000 and 3000, which are out of range for + // Timestamp(TimeUnit::Nanoseconds) and will cause a panic in dev builds/overflow in release builds + // However, the code should work for the Microsecond/Millisecond time units + for time_unit in [ + arrow2::datatypes::TimeUnit::Microsecond, + arrow2::datatypes::TimeUnit::Millisecond, + arrow2::datatypes::TimeUnit::Second, + ] { + parse(time_unit).expect("Should not error"); + } + std::panic::catch_unwind(|| parse(arrow2::datatypes::TimeUnit::Nanosecond)) + .expect_err("Should be a panic error"); + + Ok(()) +}