Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
fix parquet datatype conversion from arrow (#1570)
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Sep 22, 2023
1 parent 7c93e35 commit 231a6fa
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/io/csv/read_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn deserialize_datetime<T: chrono::TimeZone>(string: &str, tz: &T) -> Option<i64
.to_datetime()
.map(|x| x.naive_utc())
.map(|x| tz.from_utc_datetime(&x))
.map(|x| x.timestamp_nanos())
.map(|x| x.timestamp_nanos_opt().unwrap())
.ok()
} else {
None
Expand Down Expand Up @@ -228,7 +228,7 @@ pub(crate) fn deserialize_column<B: ByteRecordGeneric>(
Timestamp(time_unit, None) => deserialize_primitive(rows, column, datatype, |bytes| {
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_nanos())
.map(|x| x.timestamp_nanos_opt().unwrap())
.map(|x| match time_unit {
TimeUnit::Second => x / 1_000_000_000,
TimeUnit::Millisecond => x / 1_000_000,
Expand Down
4 changes: 2 additions & 2 deletions src/io/odbc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ fn timestamp_ms(timestamp: &odbc_api::sys::Timestamp) -> i64 {

fn timestamp_us(timestamp: &odbc_api::sys::Timestamp) -> i64 {
timestamp_to_naive(timestamp)
.map(|x| x.timestamp_nanos() / 1000)
.map(|x| x.timestamp_nanos_opt().unwrap() / 1000)
.unwrap_or(0)
}

fn timestamp_ns(timestamp: &odbc_api::sys::Timestamp) -> i64 {
timestamp_to_naive(timestamp)
.map(|x| x.timestamp_nanos())
.map(|x| x.timestamp_nanos_opt().unwrap())
.unwrap_or(0)
}
16 changes: 14 additions & 2 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,18 @@ fn from_fixed_len_byte_array(
) -> DataType {
match (logical_type, converted_type) {
(Some(PrimitiveLogicalType::Decimal(precision, scale)), _) => {
DataType::Decimal(precision, scale)
if length < 32 {
DataType::Decimal(precision, scale)
} else {
DataType::Decimal256(precision, scale)
}
}
(None, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
DataType::Decimal(precision, scale)
if length < 32 {
DataType::Decimal(precision, scale)
} else {
DataType::Decimal256(precision, scale)
}
}
(None, Some(PrimitiveConvertedType::Interval)) => {
// There is currently no reliable way of determining which IntervalUnit
Expand Down Expand Up @@ -451,11 +459,15 @@ mod tests {
message test_schema {
REQUIRED BYTE_ARRAY binary;
REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
REQUIRED FIXED_LEN_BYTE_ARRAY (7) decimal_128 (Decimal(16, 2)) ;
REQUIRED FIXED_LEN_BYTE_ARRAY (32) decimal_256 (Decimal(44, 2)) ;
}
";
let expected = vec![
Field::new("binary", DataType::Binary, false),
Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
Field::new("decimal_128", DataType::Decimal(16, 2), false),
Field::new("decimal_256", DataType::Decimal256(44, 2), false),
];

let parquet_schema = SchemaDescriptor::try_from_message(message)?;
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
name,
PhysicalType::FixedLenByteArray(32),
repetition,
None,
None,
Some(PrimitiveConvertedType::Decimal(precision, scale)),
logical_type,
None,
)?)
}
Expand Down
12 changes: 6 additions & 6 deletions src/temporal_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ pub fn utf8_to_timestamp_scalar<T: chrono::TimeZone>(
TimeUnit::Second => x.timestamp(),
TimeUnit::Millisecond => x.timestamp_millis(),
TimeUnit::Microsecond => x.timestamp_micros(),
TimeUnit::Nanosecond => x.timestamp_nanos(),
TimeUnit::Nanosecond => x.timestamp_nanos_opt().unwrap(),
})
.ok()
} else {
Expand All @@ -390,7 +390,7 @@ pub fn utf8_to_naive_timestamp_scalar(value: &str, fmt: &str, tu: &TimeUnit) ->
TimeUnit::Second => x.timestamp(),
TimeUnit::Millisecond => x.timestamp_millis(),
TimeUnit::Microsecond => x.timestamp_micros(),
TimeUnit::Nanosecond => x.timestamp_nanos(),
TimeUnit::Nanosecond => x.timestamp_nanos_opt().unwrap(),
})
.ok()
}
Expand Down Expand Up @@ -515,8 +515,8 @@ pub fn add_naive_interval(timestamp: i64, time_unit: TimeUnit, interval: months_
match time_unit {
TimeUnit::Second => new_datetime_tz.timestamp_millis() / 1000,
TimeUnit::Millisecond => new_datetime_tz.timestamp_millis(),
TimeUnit::Microsecond => new_datetime_tz.timestamp_nanos() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.timestamp_nanos(),
TimeUnit::Microsecond => new_datetime_tz.timestamp_nanos_opt().unwrap() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.timestamp_nanos_opt().unwrap(),
}
}

Expand Down Expand Up @@ -544,7 +544,7 @@ pub fn add_interval<T: chrono::TimeZone>(
match time_unit {
TimeUnit::Second => new_datetime_tz.timestamp_millis() / 1000,
TimeUnit::Millisecond => new_datetime_tz.timestamp_millis(),
TimeUnit::Microsecond => new_datetime_tz.timestamp_nanos() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.timestamp_nanos(),
TimeUnit::Microsecond => new_datetime_tz.timestamp_nanos_opt().unwrap() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.timestamp_nanos_opt().unwrap(),
}
}

0 comments on commit 231a6fa

Please sign in to comment.