Skip to content

Commit

Permalink
Support Parquet Byte Stream Split Encoding (#5293)
Browse files Browse the repository at this point in the history
* wip byte-stream-split

* decoding works

* impl split

* clean up

* whitespace

* remove println

* get compiling after rebase

* integration test, as one might call it

* update parquet-testing revision

* encoding bench

* improve performance

* test fix

* add apache headers

* one more test and readme update

---------

Co-authored-by: Simon Vandel Sillesen <simon.vandel@gmail.com>
  • Loading branch information
mwlon and simonvandel authored Jan 12, 2024
1 parent 4a6ae68 commit 4c3e9be
Show file tree
Hide file tree
Showing 10 changed files with 499 additions and 92 deletions.
5 changes: 5 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ name = "compression"
required-features = ["experimental", "default"]
harness = false

[[bench]]
name = "encoding"
required-features = ["experimental", "default"]
harness = false


[[bench]]
name = "metadata"
Expand Down
2 changes: 1 addition & 1 deletion parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ The `parquet` crate provides the following features which may be enabled in your

## Parquet Feature Status

- [x] All encodings supported, except for BYTE_STREAM_SPLIT ([#4102](https://github.com/apache/arrow-rs/issues/4102))
- [x] All encodings supported
- [x] All compression codecs supported
- [x] Read support
- [x] Primitive column value readers
Expand Down
83 changes: 83 additions & 0 deletions parquet/benches/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::*;
use parquet::basic::Encoding;
use parquet::data_type::{DataType, DoubleType, FloatType};
use parquet::decoding::{get_decoder, Decoder};
use parquet::encoding::get_encoder;
use parquet::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type};
use rand::prelude::*;
use std::sync::Arc;

fn bench_typed<T: DataType>(c: &mut Criterion, values: &[T::T], encoding: Encoding) {
let name = format!(
"dtype={}, encoding={:?}",
std::any::type_name::<T::T>(),
encoding
);
c.bench_function(&format!("encoding: {}", name), |b| {
b.iter(|| {
let mut encoder = get_encoder::<T>(encoding).unwrap();
encoder.put(values).unwrap();
encoder.flush_buffer().unwrap();
});
});

let mut encoder = get_encoder::<T>(encoding).unwrap();
encoder.put(values).unwrap();
let encoded = encoder.flush_buffer().unwrap();
println!("{} encoded as {} bytes", name, encoded.len(),);

let mut buffer = vec![T::T::default(); values.len()];
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Arc::new(
Type::primitive_type_builder("", T::get_physical_type())
.build()
.unwrap(),
),
0,
0,
ColumnPath::new(vec![]),
));
c.bench_function(&format!("decoding: {}", name), |b| {
b.iter(|| {
let mut decoder: Box<dyn Decoder<T>> =
get_decoder(column_desc_ptr.clone(), encoding).unwrap();
decoder.set_data(encoded.clone(), values.len()).unwrap();
decoder.get(&mut buffer).unwrap();
});
});
}

fn criterion_benchmark(c: &mut Criterion) {
let mut rng = StdRng::seed_from_u64(0);
let n = 16 * 1024;

let mut f32s = Vec::new();
let mut f64s = Vec::new();
for _ in 0..n {
f32s.push(rng.gen::<f32>());
f64s.push(rng.gen::<f64>());
}

bench_typed::<FloatType>(c, &f32s, Encoding::BYTE_STREAM_SPLIT);
bench_typed::<DoubleType>(c, &f64s, Encoding::BYTE_STREAM_SPLIT);
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
42 changes: 40 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,9 @@ mod tests {

use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::{Decimal128Type, Decimal256Type, DecimalType, Float16Type};
use arrow_array::types::{
Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
};
use arrow_array::*;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_buffer::{i256, ArrowNativeType, Buffer};
Expand All @@ -755,7 +757,7 @@ mod tests {
use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
Int32Type, Int64Type, Int96Type,
FloatType, Int32Type, Int64Type, Int96Type,
};
use crate::errors::Result;
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
Expand Down Expand Up @@ -861,6 +863,13 @@ mod tests {
Encoding::DELTA_BINARY_PACKED,
],
);
run_single_column_reader_tests::<FloatType, _, FloatType>(
2,
ConvertedType::NONE,
None,
|vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
&[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
);
}

#[test]
Expand Down Expand Up @@ -1390,6 +1399,35 @@ mod tests {
assert!(col.value(2).is_nan());
}

#[test]
fn test_read_float32_float64_byte_stream_split() {
let path = format!(
"{}/byte_stream_split.zstd.parquet",
arrow::util::test_util::parquet_test_data(),
);
let file = File::open(path).unwrap();
let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();

let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();
let f32_col = batch.column(0).as_primitive::<Float32Type>();
let f64_col = batch.column(1).as_primitive::<Float64Type>();

// This file contains floats from a standard normal distribution
for &x in f32_col.values() {
assert!(x > -10.0);
assert!(x < 10.0);
}
for &x in f64_col.values() {
assert!(x > -10.0);
assert!(x < 10.0);
}
}
assert_eq!(row_count, 300);
}

/// Parameters for single_column_reader_test
#[derive(Clone)]
struct TestOptions {
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1579,6 +1579,9 @@ mod tests {
| DataType::UInt32
| DataType::UInt16
| DataType::UInt8 => vec![Encoding::PLAIN, Encoding::DELTA_BINARY_PACKED],
DataType::Float32 | DataType::Float64 => {
vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
}
_ => vec![Encoding::PLAIN],
};

Expand Down
Loading

0 comments on commit 4c3e9be

Please sign in to comment.