Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 199 additions & 4 deletions parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ use arrow::datatypes::DataType;
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
use num::FromPrimitive;
use num_bigint::BigInt;
use parquet::arrow::array_reader::{
make_byte_array_reader, make_fixed_len_byte_array_reader,
};
use parquet::basic::Type;
use parquet::data_type::FixedLenByteArrayType;
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
use parquet::{
arrow::array_reader::ArrayReader,
Expand All @@ -47,6 +52,10 @@ fn build_test_schema() -> SchemaDescPtr {
OPTIONAL INT32 optional_decimal1_leaf (DECIMAL(8,2));
REQUIRED INT64 mandatory_decimal2_leaf (DECIMAL(16,2));
OPTIONAL INT64 optional_decimal2_leaf (DECIMAL(16,2));
REQUIRED BYTE_ARRAY mandatory_decimal3_leaf (DECIMAL(16,2));
OPTIONAL BYTE_ARRAY optional_decimal3_leaf (DECIMAL(16,2));
REQUIRED FIXED_LEN_BYTE_ARRAY (16) mandatory_decimal4_leaf (DECIMAL(16,2));
OPTIONAL FIXED_LEN_BYTE_ARRAY (16) optional_decimal4_leaf (DECIMAL(16,2));
}
";
parse_message_type(message_type)
Expand All @@ -65,6 +74,71 @@ pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}

// support byte array for decimal
fn build_encoded_decimal_bytes_page_iterator<T>(
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
null_density: f32,
encoding: Encoding,
min: i128,
max: i128,
) -> impl PageIterator + Clone
where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for _k in 0..VALUES_PER_PAGE {
let def_level = if rng.gen::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
// create the decimal value
let value = rng.gen_range(min..max);
// decimal of parquet use the big-endian to store
let bytes = match column_desc.physical_type() {
Type::BYTE_ARRAY => {
// byte array use the unfixed size
let big_int = BigInt::from(value);
big_int.to_signed_bytes_be()
}
Type::FIXED_LEN_BYTE_ARRAY => {
assert_eq!(column_desc.type_length(), 16);
// fixed length byte array use the fixed size
// the size is 16
value.to_be_bytes().to_vec()
}
_ => unimplemented!(),
};
let value = T::T::from(bytes);
values.push(value);
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<T>(encoding, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}
InMemoryPageIterator::new(schema, column_desc, pages)
}

fn build_encoded_primitive_page_iterator<T>(
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -326,6 +400,7 @@ fn bench_array_reader_skip(mut array_reader: Box<dyn ArrayReader>) -> usize {
}
total_count
}

fn create_primitive_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -354,11 +429,27 @@ fn create_primitive_array_reader(
}
}

fn create_decimal_by_bytes_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
let physical_type = column_desc.physical_type();
match physical_type {
Type::BYTE_ARRAY => {
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
Type::FIXED_LEN_BYTE_ARRAY => {
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None)
.unwrap()
}
_ => unimplemented!(),
}
}

fn create_string_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::make_byte_array_reader;
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

Expand All @@ -378,6 +469,80 @@ fn create_string_byte_array_dictionary_reader(
.unwrap()
}

fn bench_byte_decimal<T>(
group: &mut BenchmarkGroup<WallTime>,
schema: &SchemaDescPtr,
mandatory_column_desc: &ColumnDescPtr,
optional_column_desc: &ColumnDescPtr,
min: i128,
max: i128,
) where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
// all are plain encoding
let mut count: usize = 0;

// plain encoded, no NULLs
let data = build_encoded_decimal_bytes_page_iterator::<T>(
schema.clone(),
mandatory_column_desc.clone(),
0.0,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
mandatory_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let data = build_encoded_decimal_bytes_page_iterator::<T>(
schema.clone(),
optional_column_desc.clone(),
0.0,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
optional_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// half null
let data = build_encoded_decimal_bytes_page_iterator::<T>(
schema.clone(),
optional_column_desc.clone(),
0.5,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
optional_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
}

fn bench_primitive<T>(
group: &mut BenchmarkGroup<WallTime>,
schema: &SchemaDescPtr,
Expand Down Expand Up @@ -611,9 +776,39 @@ fn decimal_benches(c: &mut Criterion) {
&schema,
&mandatory_decimal2_leaf_desc,
&optional_decimal2_leaf_desc,
// precision is 18: the max is 999999999999999999
999999999999000,
999999999999999,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();

// parquet BYTE_ARRAY, logical type decimal(16,2)
let mut group = c.benchmark_group("arrow_array_reader/BYTE_ARRAY/Decimal128Array");
let mandatory_decimal3_leaf_desc = schema.column(10);
let optional_decimal3_leaf_desc = schema.column(11);
bench_byte_decimal::<ByteArrayType>(
&mut group,
&schema,
&mandatory_decimal3_leaf_desc,
&optional_decimal3_leaf_desc,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();

let mut group =
c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array");
let mandatory_decimal4_leaf_desc = schema.column(12);
let optional_decimal4_leaf_desc = schema.column(13);
bench_byte_decimal::<FixedLenByteArrayType>(
&mut group,
&schema,
&mandatory_decimal4_leaf_desc,
&optional_decimal4_leaf_desc,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();
}
Expand Down
22 changes: 14 additions & 8 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ impl From<ByteArray> for FixedLenByteArray {
}
}

impl From<Vec<u8>> for FixedLenByteArray {
fn from(buf: Vec<u8>) -> FixedLenByteArray {
FixedLenByteArray(ByteArray::from(buf))
}
}

impl From<FixedLenByteArray> for ByteArray {
fn from(other: FixedLenByteArray) -> Self {
other.0
Expand Down Expand Up @@ -1141,9 +1147,9 @@ macro_rules! make_type {
}

fn get_column_reader(
column_writer: ColumnReader,
column_reader: ColumnReader,
) -> Option<ColumnReaderImpl<Self>> {
match column_writer {
match column_reader {
ColumnReader::$reader_ident(w) => Some(w),
_ => None,
}
Expand Down Expand Up @@ -1248,29 +1254,29 @@ impl FromBytes for Int96 {
// FIXME Needed to satisfy the constraint of many decoding functions but ByteArray does not
// appear to actual be converted directly from bytes
impl FromBytes for ByteArray {
type Buffer = [u8; 8];
type Buffer = Vec<u8>;
fn from_le_bytes(bs: Self::Buffer) -> Self {
ByteArray::from(bs.to_vec())
ByteArray::from(bs)
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
}
fn from_ne_bytes(bs: Self::Buffer) -> Self {
ByteArray::from(bs.to_vec())
ByteArray::from(bs)
}
}

impl FromBytes for FixedLenByteArray {
type Buffer = [u8; 8];
type Buffer = Vec<u8>;

fn from_le_bytes(bs: Self::Buffer) -> Self {
Self(ByteArray::from(bs.to_vec()))
Self(ByteArray::from(bs))
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
}
fn from_ne_bytes(bs: Self::Buffer) -> Self {
Self(ByteArray::from(bs.to_vec()))
Self(ByteArray::from(bs))
}
}

Expand Down