Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Correctly coerce Parquet Int96 timestamps into requested TimeUnits (#…
Browse files Browse the repository at this point in the history
…1532)

* Add correct coercion logic when reading int96 timestamps into specified timeunits

* Refactor to better inline nested function call in Iter::new

* Fix static check issues - immediately wrap and return an ArrayIter object

* Fix lints

---------

Co-authored-by: Jay Chia <jaychia94@gmail.com@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Sep 3, 2023
1 parent f609d0c commit a913919
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 10 deletions.
77 changes: 67 additions & 10 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,44 @@ fn unifiy_timestmap_unit(
}
}

#[inline]
pub fn int96_to_i64_us(value: [u32; 3]) -> i64 {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
const MICROS_PER_SECOND: i64 = 1_000_000;

let day = value[2] as i64;
let microseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000;
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;

seconds * MICROS_PER_SECOND + microseconds
}

#[inline]
pub fn int96_to_i64_ms(value: [u32; 3]) -> i64 {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
const MILLIS_PER_SECOND: i64 = 1_000;

let day = value[2] as i64;
let milliseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000;
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;

seconds * MILLIS_PER_SECOND + milliseconds
}

#[inline]
pub fn int96_to_i64_s(value: [u32; 3]) -> i64 {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;

let day = value[2] as i64;
let seconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000_000;
let day_seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;

day_seconds + seconds
}

fn timestamp<'a, I: Pages + 'a>(
pages: I,
physical_type: &PhysicalType,
Expand All @@ -401,16 +439,35 @@ fn timestamp<'a, I: Pages + 'a>(
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
if physical_type == &PhysicalType::Int96 {
let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns);
let logical_type = PrimitiveLogicalType::Timestamp {
unit: ParquetTimeUnit::Nanoseconds,
is_adjusted_to_utc: false,
};
let (factor, is_multiplier) = unifiy_timestmap_unit(&Some(logical_type), time_unit);
return match (factor, is_multiplier) {
(1, _) => Ok(dyn_iter(iden(iter))),
(a, true) => Ok(dyn_iter(op(iter, move |x| x * a))),
(a, false) => Ok(dyn_iter(op(iter, move |x| x / a))),
return match time_unit {
TimeUnit::Nanosecond => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
num_rows,
chunk_size,
int96_to_i64_ns,
)))),
TimeUnit::Microsecond => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
num_rows,
chunk_size,
int96_to_i64_us,
)))),
TimeUnit::Millisecond => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
num_rows,
chunk_size,
int96_to_i64_ms,
)))),
TimeUnit::Second => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
num_rows,
chunk_size,
int96_to_i64_s,
)))),
};
};

Expand Down
58 changes: 58 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,3 +782,61 @@ fn invalid_utf8() -> Result<()> {
);
Ok(())
}

#[test]
fn read_int96_timestamps() -> Result<()> {
use std::collections::BTreeMap;

let timestamp_data = &[
0x50, 0x41, 0x52, 0x31, 0x15, 0x04, 0x15, 0x48, 0x15, 0x3c, 0x4c, 0x15, 0x06, 0x15, 0x00,
0x12, 0x00, 0x00, 0x24, 0x00, 0x00, 0x0d, 0x01, 0x08, 0x9f, 0xd5, 0x1f, 0x0d, 0x0a, 0x44,
0x00, 0x00, 0x59, 0x68, 0x25, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14,
0xfb, 0x2a, 0x00, 0x15, 0x00, 0x15, 0x14, 0x15, 0x18, 0x2c, 0x15, 0x06, 0x15, 0x10, 0x15,
0x06, 0x15, 0x06, 0x1c, 0x00, 0x00, 0x00, 0x0a, 0x24, 0x02, 0x00, 0x00, 0x00, 0x06, 0x01,
0x02, 0x03, 0x24, 0x00, 0x26, 0x9e, 0x01, 0x1c, 0x15, 0x06, 0x19, 0x35, 0x10, 0x00, 0x06,
0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x73, 0x15, 0x02,
0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26, 0x60, 0x26, 0x08, 0x29, 0x2c, 0x15,
0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15, 0x10, 0x15, 0x02, 0x00, 0x00, 0x00,
0x15, 0x04, 0x19, 0x2c, 0x35, 0x00, 0x18, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x15,
0x02, 0x00, 0x15, 0x06, 0x25, 0x02, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x73, 0x00, 0x16, 0x06, 0x19, 0x1c, 0x19, 0x1c, 0x26, 0x9e, 0x01, 0x1c, 0x15,
0x06, 0x19, 0x35, 0x10, 0x00, 0x06, 0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74,
0x61, 0x6d, 0x70, 0x73, 0x15, 0x02, 0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26,
0x60, 0x26, 0x08, 0x29, 0x2c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15,
0x10, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x9e, 0x01, 0x16, 0x06, 0x26, 0x08, 0x16, 0x96,
0x01, 0x14, 0x00, 0x00, 0x28, 0x20, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x2d, 0x63,
0x70, 0x70, 0x2d, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x20, 0x31, 0x32, 0x2e, 0x30, 0x2e, 0x30, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x95,
0x00, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31,
];

let parse = |time_unit: TimeUnit| {
let mut reader = Cursor::new(timestamp_data);
let metadata = read_metadata(&mut reader)?;
let schema = arrow2::datatypes::Schema {
fields: vec![arrow2::datatypes::Field::new(
"timestamps",
arrow2::datatypes::DataType::Timestamp(time_unit, None),
false,
)],
metadata: BTreeMap::new(),
};
let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None, None);
reader.collect::<Result<Vec<_>>>()
};

// This data contains int96 timestamps in the year 1000 and 3000, which are out of range for
// Timestamp(TimeUnit::Nanoseconds) and will cause a panic in dev builds/overflow in release builds
// However, the code should work for the Microsecond/Millisecond time units
for time_unit in [
arrow2::datatypes::TimeUnit::Microsecond,
arrow2::datatypes::TimeUnit::Millisecond,
arrow2::datatypes::TimeUnit::Second,
] {
parse(time_unit).expect("Should not error");
}
std::panic::catch_unwind(|| parse(arrow2::datatypes::TimeUnit::Nanosecond))
.expect_err("Should be a panic error");

Ok(())
}

0 comments on commit a913919

Please sign in to comment.