Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::arrow::array_reader::{
};
use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalFixedLengthByteArrayConverter,
FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter,
Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
FixedLenBinaryConverter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter,
};
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
Expand Down Expand Up @@ -183,26 +183,11 @@ fn build_primitive_reader(
column_desc,
arrow_type,
)?)),
PhysicalType::INT96 => {
// get the optional timezone information from arrow type
let timezone = arrow_type.as_ref().and_then(|data_type| {
if let DataType::Timestamp(_, tz) = data_type {
tz.clone()
} else {
None
}
});
let converter = Int96Converter::new(Int96ArrayConverter { timezone });
Ok(Box::new(ComplexObjectArrayReader::<
Int96Type,
Int96Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
PhysicalType::INT96 => Ok(Box::new(PrimitiveArrayReader::<Int96Type>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
Expand Down
58 changes: 38 additions & 20 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::Type as PhysicalType;
use crate::column::page::PageIterator;
use crate::data_type::DataType;
use crate::data_type::{DataType, Int96};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow::array::{
ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, Decimal128Array,
Float32Array, Float64Array, Int32Array, Int64Array,
Float32Array, Float64Array, Int32Array, Int64Array,TimestampNanosecondArray, TimestampNanosecondBufferBuilder,
};
use arrow::buffer::Buffer;
use arrow::datatypes::DataType as ArrowType;
use arrow::datatypes::{DataType as ArrowType, TimeUnit};
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -98,7 +98,7 @@ where
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
let target_type = self.get_data_type().clone();
let target_type = &self.data_type;
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowType::Boolean,
PhysicalType::INT32 => {
Expand All @@ -123,9 +123,11 @@ where
}
PhysicalType::FLOAT => ArrowType::Float32,
PhysicalType::DOUBLE => ArrowType::Float64,
PhysicalType::INT96
| PhysicalType::BYTE_ARRAY
| PhysicalType::FIXED_LEN_BYTE_ARRAY => {
PhysicalType::INT96 => match target_type {
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
_ => unreachable!("INT96 must be timestamp nanosecond"),
},
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!(
"PrimitiveArrayReaders don't support complex physical types"
);
Expand All @@ -135,16 +137,31 @@ where
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary

let mut record_data = self.record_reader.consume_record_data();
let record_data = self.record_reader.consume_record_data();
let record_data = match T::get_physical_type() {
PhysicalType::BOOLEAN => {
let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len());

if T::get_physical_type() == PhysicalType::BOOLEAN {
let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len());
for e in record_data.as_slice() {
boolean_buffer.append(*e > 0);
}
boolean_buffer.finish()
}
PhysicalType::INT96 => {
// SAFETY - record_data is an aligned buffer of Int96
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once ComplexObjectArrayReader is gone, I intend to see if we can rework the parquet decoder plumbing to avoid this hackery

let (prefix, slice, suffix) =
unsafe { record_data.as_slice().align_to::<Int96>() };
assert!(prefix.is_empty() && suffix.is_empty());

let mut builder = TimestampNanosecondBufferBuilder::new(slice.len());
for v in slice {
builder.append(v.to_nanos())
}

for e in record_data.as_slice() {
boolean_buffer.append(*e > 0);
builder.finish()
}
record_data = boolean_buffer.finish();
}
_ => record_data,
};

let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
Expand All @@ -158,9 +175,10 @@ where
PhysicalType::INT64 => Arc::new(Int64Array::from(array_data)) as ArrayRef,
PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)) as ArrayRef,
PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)) as ArrayRef,
PhysicalType::INT96
| PhysicalType::BYTE_ARRAY
| PhysicalType::FIXED_LEN_BYTE_ARRAY => {
PhysicalType::INT96 => {
Arc::new(TimestampNanosecondArray::from(array_data)) as ArrayRef
}
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!(
"PrimitiveArrayReaders don't support complex physical types"
);
Expand All @@ -181,7 +199,7 @@ where
ArrowType::Date64 => {
// this is cheap as it internally reinterprets the data
let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
arrow::compute::cast(&a, &target_type)?
arrow::compute::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
let array = match array.data_type() {
Expand All @@ -207,11 +225,11 @@ where
));
}
}
.with_precision_and_scale(p, s)?;
.with_precision_and_scale(*p, *s)?;

Arc::new(array) as ArrayRef
}
_ => arrow::compute::cast(&array, &target_type)?,
_ => arrow::compute::cast(&array, target_type)?,
};

// save definition and repetition buffers
Expand Down
18 changes: 17 additions & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ mod tests {
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
FixedLenByteArrayType, Int32Type, Int64Type,
FixedLenByteArrayType, Int32Type, Int64Type, Int96Type,
};
use crate::errors::Result;
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
Expand Down Expand Up @@ -858,6 +858,22 @@ mod tests {
);
}

#[test]
fn test_int96_single_column_reader_test() {
let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2,
ConvertedType::NONE,
None,
|vals| {
Arc::new(TimestampNanosecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_nanos())),
)) as _
},
encodings,
);
}

struct RandUtf8Gen {}

impl RandGen<ByteArrayType> for RandUtf8Gen {
Expand Down
23 changes: 2 additions & 21 deletions parquet/src/arrow/buffer/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::data_type::{FixedLenByteArray, Int96};
use crate::data_type::FixedLenByteArray;
use arrow::array::{
Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
IntervalYearMonthBuilder, TimestampNanosecondArray,
IntervalYearMonthBuilder,
};
use std::sync::Arc;

Expand Down Expand Up @@ -156,22 +156,6 @@ impl Converter<Vec<Option<FixedLenByteArray>>, IntervalDayTimeArray>
}
}

pub struct Int96ArrayConverter {
pub timezone: Option<String>,
}

impl Converter<Vec<Option<Int96>>, TimestampNanosecondArray> for Int96ArrayConverter {
fn convert(&self, source: Vec<Option<Int96>>) -> Result<TimestampNanosecondArray> {
Ok(TimestampNanosecondArray::from_opt_vec(
source
.into_iter()
.map(|int96| int96.map(|val| val.to_i64() * 1_000_000))
.collect(),
self.timezone.clone(),
))
}
}

#[cfg(test)]
pub struct Utf8ArrayConverter {}

Expand Down Expand Up @@ -199,9 +183,6 @@ impl Converter<Vec<Option<ByteArray>>, StringArray> for Utf8ArrayConverter {
pub type Utf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;

pub type Int96Converter =
ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray, Int96ArrayConverter>;

pub type FixedLenBinaryConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
FixedSizeBinaryArray,
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::marker::PhantomData;

use crate::arrow::buffer::bit_util::iter_set_bits_rev;
use crate::data_type::Int96;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::ArrowNativeType;

Expand Down Expand Up @@ -85,6 +86,7 @@ impl ScalarValue for u64 {}
impl ScalarValue for i64 {}
impl ScalarValue for f32 {}
impl ScalarValue for f64 {}
impl ScalarValue for Int96 {}

/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
#[derive(Debug)]
Expand Down
22 changes: 18 additions & 4 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::util::{

/// Rust representation for logical type INT96, value is backed by an array of `u32`.
/// The type only takes 12 bytes, without extra padding.
#[derive(Clone, Debug, PartialOrd, Default, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
pub struct Int96 {
value: [u32; 3],
}
Expand All @@ -59,15 +59,29 @@ impl Int96 {

/// Converts this INT96 into an i64 representing the number of MILLISECONDS since Epoch
pub fn to_i64(&self) -> i64 {
let (seconds, nanoseconds) = self.to_seconds_and_nanos();
seconds * 1_000 + nanoseconds / 1_000_000
}

/// Converts this INT96 into an i64 representing the number of NANOSECONDS since EPOCH
///
/// Will wrap around on overflow
pub fn to_nanos(&self) -> i64 {
let (seconds, nanoseconds) = self.to_seconds_and_nanos();
seconds
.wrapping_mul(1_000_000_000)
.wrapping_add(nanoseconds)
}

/// Converts this INT96 to a number of seconds and nanoseconds since EPOCH
pub fn to_seconds_and_nanos(&self) -> (i64, i64) {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
const MILLIS_PER_SECOND: i64 = 1_000;

let day = self.data()[2] as i64;
let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;

seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000
(seconds, nanoseconds)
}
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/record/triplet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl TripletIter {
Field::convert_int64(typed.column_descr(), *typed.current_value())
}
TripletIter::Int96TripletIter(ref typed) => {
Field::convert_int96(typed.column_descr(), typed.current_value().clone())
Field::convert_int96(typed.column_descr(), *typed.current_value())
}
TripletIter::FloatTripletIter(ref typed) => {
Field::convert_float(typed.column_descr(), *typed.current_value())
Expand Down