From 1635f5bfdd2a6c53ab52555721dced8457e22ca6 Mon Sep 17 00:00:00 2001 From: Jeffrey <22608443+Jefffrey@users.noreply.github.com> Date: Wed, 8 Nov 2023 22:24:26 +1100 Subject: [PATCH] Remove ByteBufferPtr and replace with Bytes (#5055) * Replace usages of ByteBufferPtr with Bytes * Remove parquet memory.rs module --- parquet/src/arrow/array_reader/byte_array.rs | 22 +- .../array_reader/byte_array_dictionary.rs | 8 +- .../array_reader/fixed_len_byte_array.rs | 10 +- parquet/src/arrow/array_reader/test_util.rs | 8 +- parquet/src/arrow/arrow_writer/byte_array.rs | 6 +- parquet/src/arrow/arrow_writer/mod.rs | 2 +- parquet/src/arrow/decoder/delta_byte_array.rs | 11 +- parquet/src/arrow/decoder/dictionary_index.rs | 7 +- .../arrow/record_reader/definition_levels.rs | 14 +- parquet/src/column/page.rs | 27 ++- parquet/src/column/reader.rs | 32 +-- parquet/src/column/reader/decoder.rs | 27 +-- parquet/src/column/writer/encoder.rs | 7 +- parquet/src/column/writer/mod.rs | 31 ++- parquet/src/data_type.rs | 57 +++-- parquet/src/encodings/decoding.rs | 228 +++++------------- .../src/encodings/encoding/dict_encoder.rs | 11 +- parquet/src/encodings/encoding/mod.rs | 32 ++- parquet/src/encodings/rle.rs | 77 +++--- parquet/src/file/serialized_reader.rs | 10 +- parquet/src/file/writer.rs | 27 +-- parquet/src/lib.rs | 4 - parquet/src/util/bit_util.rs | 41 ++-- parquet/src/util/memory.rs | 149 ------------ parquet/src/util/mod.rs | 1 - parquet/src/util/test_common/page_util.rs | 15 +- parquet/src/util/test_common/rand_gen.rs | 4 +- 27 files changed, 307 insertions(+), 561 deletions(-) delete mode 100644 parquet/src/util/memory.rs diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 4612f816146a..01666c0af4e6 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -29,12 +29,12 @@ use crate::data_type::Int32Type; use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::memory::ByteBufferPtr; use arrow_array::{ Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, OffsetSizeTrait, }; use arrow_buffer::{i256, Buffer}; use arrow_schema::DataType as ArrowType; +use bytes::Bytes; use std::any::Any; use std::ops::Range; use std::sync::Arc; @@ -189,7 +189,7 @@ impl ColumnValueDecoder fn set_dict( &mut self, - buf: ByteBufferPtr, + buf: Bytes, num_values: u32, encoding: Encoding, _is_sorted: bool, @@ -219,7 +219,7 @@ impl ColumnValueDecoder fn set_data( &mut self, encoding: Encoding, - data: ByteBufferPtr, + data: Bytes, num_levels: usize, num_values: Option, ) -> Result<()> { @@ -263,7 +263,7 @@ pub enum ByteArrayDecoder { impl ByteArrayDecoder { pub fn new( encoding: Encoding, - data: ByteBufferPtr, + data: Bytes, num_levels: usize, num_values: Option, validate_utf8: bool, @@ -339,7 +339,7 @@ impl ByteArrayDecoder { /// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`] pub struct ByteArrayDecoderPlain { - buf: ByteBufferPtr, + buf: Bytes, offset: usize, validate_utf8: bool, @@ -350,7 +350,7 @@ pub struct ByteArrayDecoderPlain { impl ByteArrayDecoderPlain { pub fn new( - buf: ByteBufferPtr, + buf: Bytes, num_levels: usize, num_values: Option, validate_utf8: bool, @@ -438,16 +438,16 @@ impl ByteArrayDecoderPlain { /// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`] pub struct ByteArrayDecoderDeltaLength { lengths: Vec, - data: ByteBufferPtr, + data: Bytes, length_offset: usize, data_offset: usize, validate_utf8: bool, } impl ByteArrayDecoderDeltaLength { - fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result { + fn new(data: Bytes, validate_utf8: bool) -> Result { let mut len_decoder = DeltaBitPackDecoder::::new(); - len_decoder.set_data(data.all(), 0)?; + len_decoder.set_data(data.clone(), 0)?; let values = len_decoder.values_left(); let mut lengths = vec![0; values]; @@ -522,7 +522,7 @@ pub struct ByteArrayDecoderDelta { } impl ByteArrayDecoderDelta { - fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result { + fn new(data: Bytes, validate_utf8: bool) -> Result { Ok(Self { decoder: DeltaByteArrayDecoder::new(data)?, validate_utf8, @@ -558,7 +558,7 @@ pub struct ByteArrayDecoderDictionary { } impl ByteArrayDecoderDictionary { - fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option) -> Self { + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { Self { decoder: DictIndexDecoder::new(data, num_levels, num_values), } diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 841f5a95fd4e..0d216fa08327 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use arrow_array::{Array, ArrayRef, OffsetSizeTrait}; use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_schema::DataType as ArrowType; +use bytes::Bytes; use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; @@ -39,7 +40,6 @@ use crate::encodings::rle::RleDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::FromBytes; -use crate::util::memory::ByteBufferPtr; /// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`] macro_rules! make_reader { @@ -253,7 +253,7 @@ where fn set_dict( &mut self, - buf: ByteBufferPtr, + buf: Bytes, num_values: u32, encoding: Encoding, _is_sorted: bool, @@ -286,7 +286,7 @@ where fn set_data( &mut self, encoding: Encoding, - data: ByteBufferPtr, + data: Bytes, num_levels: usize, num_values: Option, ) -> Result<()> { @@ -294,7 +294,7 @@ where Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { let bit_width = data[0]; let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data.start_from(1)); + decoder.set_data(data.slice(1..)); MaybeDictionaryDecoder::Dict { decoder, max_remaining_values: num_values.unwrap_or(num_levels), diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index b06091b6b57a..3b1a50ebcce8 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -26,7 +26,6 @@ use crate::column::page::PageIterator; use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice}; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::memory::ByteBufferPtr; use arrow_array::{ ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, IntervalDayTimeArray, IntervalYearMonthArray, @@ -34,6 +33,7 @@ use arrow_array::{ use arrow_buffer::{i256, Buffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::{DataType as ArrowType, IntervalUnit}; +use bytes::Bytes; use std::any::Any; use std::ops::Range; use std::sync::Arc; @@ -298,7 +298,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { struct ValueDecoder { byte_length: usize, - dict_page: Option, + dict_page: Option, decoder: Option, } @@ -315,7 +315,7 @@ impl ColumnValueDecoder for ValueDecoder { fn set_dict( &mut self, - buf: ByteBufferPtr, + buf: Bytes, num_values: u32, encoding: Encoding, _is_sorted: bool, @@ -345,7 +345,7 @@ impl ColumnValueDecoder for ValueDecoder { fn set_data( &mut self, encoding: Encoding, - data: ByteBufferPtr, + data: Bytes, num_levels: usize, num_values: Option, ) -> Result<()> { @@ -434,7 +434,7 @@ impl ColumnValueDecoder for ValueDecoder { } enum Decoder { - Plain { buf: ByteBufferPtr, offset: usize }, + Plain { buf: Bytes, offset: usize }, Dict { decoder: DictIndexDecoder }, Delta { decoder: DeltaByteArrayDecoder }, } diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index 7e66efead2e5..05032920139b 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -17,6 +17,7 @@ use arrow_array::{Array, ArrayRef}; use arrow_schema::DataType as ArrowType; +use bytes::Bytes; use std::any::Any; use std::sync::Arc; @@ -27,7 +28,6 @@ use crate::data_type::{ByteArray, ByteArrayType}; use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; use crate::errors::Result; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type}; -use crate::util::memory::ByteBufferPtr; /// Returns a descriptor for a UTF-8 column pub fn utf8_column() -> ColumnDescPtr { @@ -45,7 +45,7 @@ pub fn utf8_column() -> ColumnDescPtr { } /// Encode `data` with the provided `encoding` -pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr { +pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> Bytes { let mut encoder = get_encoder::(encoding).unwrap(); encoder.put(data).unwrap(); @@ -53,7 +53,7 @@ pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPt } /// Returns the encoded dictionary and value data -pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) { +pub fn encode_dictionary(data: &[ByteArray]) -> (Bytes, Bytes) { let mut dict_encoder = DictEncoder::::new(utf8_column()); dict_encoder.put(data).unwrap(); @@ -68,7 +68,7 @@ pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) { /// Returns an array of data with its associated encoding, along with an encoded dictionary pub fn byte_array_all_encodings( data: Vec>, -) -> (Vec<(Encoding, ByteBufferPtr)>, ByteBufferPtr) { +) -> (Vec<(Encoding, Bytes)>, Bytes) { let data: Vec<_> = data.into_iter().map(Into::into).collect(); let (encoded_dictionary, encoded_rle) = encode_dictionary(&data); diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 3db2e4a6a063..28c7c3b00540 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -236,7 +236,7 @@ impl FallbackEncoder { let lengths = lengths.flush_buffer()?; let mut out = Vec::with_capacity(lengths.len() + buffer.len()); - out.extend_from_slice(lengths.data()); + out.extend_from_slice(&lengths); out.extend_from_slice(buffer); buffer.clear(); (out, Encoding::DELTA_LENGTH_BYTE_ARRAY) @@ -252,8 +252,8 @@ impl FallbackEncoder { let mut out = Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len()); - out.extend_from_slice(prefix_lengths.data()); - out.extend_from_slice(suffix_lengths.data()); + out.extend_from_slice(&prefix_lengths); + out.extend_from_slice(&suffix_lengths); out.extend_from_slice(buffer); buffer.clear(); last_value.clear(); diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index a9cd1afb2479..eca1dea791be 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -331,7 +331,7 @@ impl PageWriter for ArrowPageWriter { buf.length += compressed_size; buf.data.push(header); - buf.data.push(data.into()); + buf.data.push(data); Ok(spec) } diff --git a/parquet/src/arrow/decoder/delta_byte_array.rs b/parquet/src/arrow/decoder/delta_byte_array.rs index c731cfea97e9..7686a4292c43 100644 --- a/parquet/src/arrow/decoder/delta_byte_array.rs +++ b/parquet/src/arrow/decoder/delta_byte_array.rs @@ -15,16 +15,17 @@ // specific language governing permissions and limitations // under the License. +use bytes::Bytes; + use crate::data_type::Int32Type; use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; use crate::errors::{ParquetError, Result}; -use crate::util::memory::ByteBufferPtr; /// Decoder for `Encoding::DELTA_BYTE_ARRAY` pub struct DeltaByteArrayDecoder { prefix_lengths: Vec, suffix_lengths: Vec, - data: ByteBufferPtr, + data: Bytes, length_offset: usize, data_offset: usize, last_value: Vec, @@ -32,16 +33,16 @@ pub struct DeltaByteArrayDecoder { impl DeltaByteArrayDecoder { /// Create a new [`DeltaByteArrayDecoder`] with the provided data page - pub fn new(data: ByteBufferPtr) -> Result { + pub fn new(data: Bytes) -> Result { let mut prefix = DeltaBitPackDecoder::::new(); - prefix.set_data(data.all(), 0)?; + prefix.set_data(data.clone(), 0)?; let num_prefix = prefix.values_left(); let mut prefix_lengths = vec![0; num_prefix]; assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix); let mut suffix = DeltaBitPackDecoder::::new(); - suffix.set_data(data.start_from(prefix.get_offset()), 0)?; + suffix.set_data(data.slice(prefix.get_offset()..), 0)?; let num_suffix = suffix.values_left(); let mut suffix_lengths = vec![0; num_suffix]; diff --git a/parquet/src/arrow/decoder/dictionary_index.rs b/parquet/src/arrow/decoder/dictionary_index.rs index 32efd564dffb..38f2b058360c 100644 --- a/parquet/src/arrow/decoder/dictionary_index.rs +++ b/parquet/src/arrow/decoder/dictionary_index.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. +use bytes::Bytes; + use crate::encodings::rle::RleDecoder; use crate::errors::Result; -use crate::util::memory::ByteBufferPtr; /// Decoder for `Encoding::RLE_DICTIONARY` indices pub struct DictIndexDecoder { @@ -41,10 +42,10 @@ pub struct DictIndexDecoder { impl DictIndexDecoder { /// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels /// associated with this data page, and the number of non-null values (if known) - pub fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option) -> Self { + pub fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { let bit_width = data[0]; let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data.start_from(1)); + decoder.set_data(data.slice(1..)); Self { decoder, diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 20cda536ae1c..9009c596c4bf 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,6 +20,7 @@ use std::ops::Range; use arrow_array::builder::BooleanBufferBuilder; use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use arrow_buffer::Buffer; +use bytes::Bytes; use crate::arrow::buffer::bit_util::count_set_bits; use crate::basic::Encoding; @@ -28,7 +29,6 @@ use crate::column::reader::decoder::{ }; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::memory::ByteBufferPtr; use super::buffer::ScalarBuffer; @@ -152,7 +152,7 @@ impl DefinitionLevelBufferDecoder { impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { type Slice = DefinitionLevelBuffer; - fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) { + fn set_data(&mut self, encoding: Encoding, data: Bytes) { match &mut self.decoder { MaybePacked::Packed(d) => d.set_data(encoding, data), MaybePacked::Fallback(d) => d.set_data(encoding, data), @@ -219,7 +219,7 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { /// [RLE]: https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3 /// [BIT_PACKED]: https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4 struct PackedDecoder { - data: ByteBufferPtr, + data: Bytes, data_offset: usize, rle_left: usize, rle_value: bool, @@ -278,7 +278,7 @@ impl PackedDecoder { impl PackedDecoder { fn new() -> Self { Self { - data: ByteBufferPtr::new(vec![]), + data: Bytes::from(vec![]), data_offset: 0, rle_left: 0, rle_value: false, @@ -287,7 +287,7 @@ impl PackedDecoder { } } - fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) { + fn set_data(&mut self, encoding: Encoding, data: Bytes) { self.rle_left = 0; self.rle_value = false; self.packed_offset = 0; @@ -385,7 +385,7 @@ mod tests { let encoded = encoder.consume(); let mut decoder = PackedDecoder::new(); - decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded)); + decoder.set_data(Encoding::RLE, encoded.into()); // Decode data in random length intervals let mut decoded = BooleanBufferBuilder::new(len); @@ -424,7 +424,7 @@ mod tests { let encoded = encoder.consume(); let mut decoder = PackedDecoder::new(); - decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded)); + decoder.set_data(Encoding::RLE, encoded.into()); let mut skip_value = 0; let mut read_value = 0; diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 933e42386272..947a633f48a2 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -17,11 +17,12 @@ //! Contains Parquet Page definitions and page reader interface. +use bytes::Bytes; + use crate::basic::{Encoding, PageType}; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics}; use crate::format::PageHeader; -use crate::util::memory::ByteBufferPtr; /// Parquet Page definition. /// @@ -31,7 +32,7 @@ use crate::util::memory::ByteBufferPtr; #[derive(Clone)] pub enum Page { DataPage { - buf: ByteBufferPtr, + buf: Bytes, num_values: u32, encoding: Encoding, def_level_encoding: Encoding, @@ -39,7 +40,7 @@ pub enum Page { statistics: Option, }, DataPageV2 { - buf: ByteBufferPtr, + buf: Bytes, num_values: u32, encoding: Encoding, num_nulls: u32, @@ -50,7 +51,7 @@ pub enum Page { statistics: Option, }, DictionaryPage { - buf: ByteBufferPtr, + buf: Bytes, num_values: u32, encoding: Encoding, is_sorted: bool, @@ -68,7 +69,7 @@ impl Page { } /// Returns internal byte buffer reference for this page. - pub fn buffer(&self) -> &ByteBufferPtr { + pub fn buffer(&self) -> &Bytes { match self { Page::DataPage { ref buf, .. } => buf, Page::DataPageV2 { ref buf, .. } => buf, @@ -159,7 +160,7 @@ impl CompressedPage { /// Returns slice of compressed buffer in the page. pub fn data(&self) -> &[u8] { - self.compressed_page.buffer().data() + self.compressed_page.buffer() } /// Returns the thrift page header @@ -370,7 +371,7 @@ mod tests { #[test] fn test_page() { let data_page = Page::DataPage { - buf: ByteBufferPtr::new(vec![0, 1, 2]), + buf: Bytes::from(vec![0, 1, 2]), num_values: 10, encoding: Encoding::PLAIN, def_level_encoding: Encoding::RLE, @@ -378,7 +379,7 @@ mod tests { statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)), }; assert_eq!(data_page.page_type(), PageType::DATA_PAGE); - assert_eq!(data_page.buffer().data(), vec![0, 1, 2].as_slice()); + assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice()); assert_eq!(data_page.num_values(), 10); assert_eq!(data_page.encoding(), Encoding::PLAIN); assert_eq!( @@ -387,7 +388,7 @@ mod tests { ); let data_page_v2 = Page::DataPageV2 { - buf: ByteBufferPtr::new(vec![0, 1, 2]), + buf: Bytes::from(vec![0, 1, 2]), num_values: 10, encoding: Encoding::PLAIN, num_nulls: 5, @@ -398,7 +399,7 @@ mod tests { statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)), }; assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2); - assert_eq!(data_page_v2.buffer().data(), vec![0, 1, 2].as_slice()); + assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice()); assert_eq!(data_page_v2.num_values(), 10); assert_eq!(data_page_v2.encoding(), Encoding::PLAIN); assert_eq!( @@ -407,13 +408,13 @@ mod tests { ); let dict_page = Page::DictionaryPage { - buf: ByteBufferPtr::new(vec![0, 1, 2]), + buf: Bytes::from(vec![0, 1, 2]), num_values: 10, encoding: Encoding::PLAIN, is_sorted: false, }; assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE); - assert_eq!(dict_page.buffer().data(), vec![0, 1, 2].as_slice()); + assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice()); assert_eq!(dict_page.num_values(), 10); assert_eq!(dict_page.encoding(), Encoding::PLAIN); assert_eq!(dict_page.statistics(), None); @@ -422,7 +423,7 @@ mod tests { #[test] fn test_compressed_page() { let data_page = Page::DataPage { - buf: ByteBufferPtr::new(vec![0, 1, 2]), + buf: Bytes::from(vec![0, 1, 2]), num_values: 10, encoding: Encoding::PLAIN, def_level_encoding: Encoding::RLE, diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 854e5d994ee8..adfcd6390720 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -17,6 +17,8 @@ //! Contains column reader API. +use bytes::Bytes; + use super::page::{Page, PageReader}; use crate::basic::*; use crate::column::reader::decoder::{ @@ -27,7 +29,6 @@ use crate::data_type::*; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes}; -use crate::util::memory::ByteBufferPtr; pub(crate) mod decoder; @@ -474,7 +475,7 @@ where max_rep_level, num_values, rep_level_encoding, - buf.start_from(offset), + buf.slice(offset..), )?; offset += bytes_read; @@ -492,7 +493,7 @@ where max_def_level, num_values, def_level_encoding, - buf.start_from(offset), + buf.slice(offset..), )?; offset += bytes_read; @@ -504,7 +505,7 @@ where self.values_decoder.set_data( encoding, - buf.start_from(offset), + buf.slice(offset..), num_values as usize, None, )?; @@ -540,7 +541,7 @@ where self.rep_level_decoder.as_mut().unwrap().set_data( Encoding::RLE, - buf.range(0, rep_levels_byte_len as usize), + buf.slice(..rep_levels_byte_len as usize), ); } @@ -549,18 +550,16 @@ where if self.descr.max_def_level() > 0 { self.def_level_decoder.as_mut().unwrap().set_data( Encoding::RLE, - buf.range( - rep_levels_byte_len as usize, - def_levels_byte_len as usize, + buf.slice( + rep_levels_byte_len as usize + ..(rep_levels_byte_len + def_levels_byte_len) as usize, ), ); } self.values_decoder.set_data( encoding, - buf.start_from( - (rep_levels_byte_len + def_levels_byte_len) as usize, - ), + buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..), num_values as usize, Some((num_values - num_nulls) as usize), )?; @@ -595,13 +594,16 @@ fn parse_v1_level( max_level: i16, num_buffered_values: u32, encoding: Encoding, - buf: ByteBufferPtr, -) -> Result<(usize, ByteBufferPtr)> { + buf: Bytes, +) -> Result<(usize, Bytes)> { match encoding { Encoding::RLE => { let i32_size = std::mem::size_of::(); let data_size = read_num_bytes::(i32_size, buf.as_ref()) as usize; - Ok((i32_size + data_size, buf.range(i32_size, data_size))) + Ok(( + i32_size + data_size, + buf.slice(i32_size..i32_size + data_size), + )) } Encoding::BIT_PACKED => { let bit_width = num_required_bits(max_level as u64); @@ -609,7 +611,7 @@ fn parse_v1_level( (num_buffered_values as usize * bit_width as usize) as i64, 8, ) as usize; - Ok((num_bytes, buf.range(0, num_bytes))) + Ok((num_bytes, buf.slice(..num_bytes))) } _ => Err(general_err!("invalid level encoding: {}", encoding)), } diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index ec57c4032574..ef62724689a8 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -18,6 +18,8 @@ use std::collections::HashMap; use std::ops::Range; +use bytes::Bytes; + use crate::basic::Encoding; use crate::data_type::DataType; use crate::encodings::{ @@ -26,10 +28,7 @@ use crate::encodings::{ }; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::{ - bit_util::{num_required_bits, BitReader}, - memory::ByteBufferPtr, -}; +use crate::util::bit_util::{num_required_bits, BitReader}; /// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`] pub trait LevelsBufferSlice { @@ -67,7 +66,7 @@ pub trait ColumnLevelDecoder { type Slice: LevelsBufferSlice + ?Sized; /// Set data for this [`ColumnLevelDecoder`] - fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr); + fn set_data(&mut self, encoding: Encoding, data: Bytes); } pub trait RepetitionLevelDecoder: ColumnLevelDecoder { @@ -132,7 +131,7 @@ pub trait ColumnValueDecoder { /// Set the current dictionary page fn set_dict( &mut self, - buf: ByteBufferPtr, + buf: Bytes, num_values: u32, encoding: Encoding, is_sorted: bool, @@ -152,7 +151,7 @@ pub trait ColumnValueDecoder { fn set_data( &mut self, encoding: Encoding, - data: ByteBufferPtr, + data: Bytes, num_levels: usize, num_values: Option, ) -> Result<()>; @@ -197,7 +196,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { fn set_dict( &mut self, - buf: ByteBufferPtr, + buf: Bytes, num_values: u32, mut encoding: Encoding, _is_sorted: bool, @@ -229,7 +228,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { fn set_data( &mut self, mut encoding: Encoding, - data: ByteBufferPtr, + data: Bytes, num_levels: usize, num_values: Option, ) -> Result<()> { @@ -294,7 +293,7 @@ enum LevelDecoder { } impl LevelDecoder { - fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self { + fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self { match encoding { Encoding::RLE => { let mut decoder = RleDecoder::new(bit_width); @@ -335,7 +334,7 @@ impl DefinitionLevelDecoderImpl { impl ColumnLevelDecoder for DefinitionLevelDecoderImpl { type Slice = [i16]; - fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) { + fn set_data(&mut self, encoding: Encoding, data: Bytes) { self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)) } } @@ -426,7 +425,7 @@ impl RepetitionLevelDecoderImpl { impl ColumnLevelDecoder for RepetitionLevelDecoderImpl { type Slice = [i16]; - fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) { + fn set_data(&mut self, encoding: Encoding, data: Bytes) { self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)); self.buffer_len = 0; self.buffer_offset = 0; @@ -511,7 +510,7 @@ mod tests { let mut encoder = RleEncoder::new(1, 1024); encoder.put(0); (0..3).for_each(|_| encoder.put(1)); - let data = ByteBufferPtr::new(encoder.consume()); + let data = Bytes::from(encoder.consume()); let mut decoder = RepetitionLevelDecoderImpl::new(1); decoder.set_data(Encoding::RLE, data.clone()); @@ -537,7 +536,7 @@ mod tests { for v in &encoded { encoder.put(*v as _) } - let data = ByteBufferPtr::new(encoder.consume()); + let data = Bytes::from(encoder.consume()); let mut decoder = RepetitionLevelDecoderImpl::new(5); decoder.set_data(Encoding::RLE, data); diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 5fd0f9e194d2..2273ae777444 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use bytes::Bytes; + use crate::basic::{Encoding, Type}; use crate::bloom_filter::Sbbf; use crate::column::writer::{ @@ -26,7 +28,6 @@ use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; use crate::errors::{ParquetError, Result}; use crate::file::properties::{EnabledStatistics, WriterProperties}; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; -use crate::util::memory::ByteBufferPtr; /// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`] pub trait ColumnValues { @@ -49,14 +50,14 @@ impl ColumnValues for [T] { /// The encoded data for a dictionary page pub struct DictionaryPage { - pub buf: ByteBufferPtr, + pub buf: Bytes, pub num_values: usize, pub is_sorted: bool, } /// The encoded values for a data page, with optional statistics pub struct DataPageValues { - pub buf: ByteBufferPtr, + pub buf: Bytes, pub num_values: usize, pub encoding: Encoding, pub min_value: Option, diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 307804e7dc5c..60db90c5d46d 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -17,6 +17,8 @@ //! Contains column writer API. +use bytes::Bytes; + use crate::bloom_filter::Sbbf; use crate::format::{ColumnIndex, OffsetIndex}; use std::collections::{BTreeSet, VecDeque}; @@ -38,7 +40,6 @@ use crate::file::{ properties::{WriterProperties, WriterPropertiesPtr, WriterVersion}, }; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; -use crate::util::memory::ByteBufferPtr; pub(crate) mod encoder; @@ -731,7 +732,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ); } - buffer.extend_from_slice(values_data.buf.data()); + buffer.extend_from_slice(&values_data.buf); let uncompressed_size = buffer.len(); if let Some(ref mut cmpr) = self.compressor { @@ -741,7 +742,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } let data_page = Page::DataPage { - buf: ByteBufferPtr::new(buffer), + buf: buffer.into(), num_values: self.page_metrics.num_buffered_values, encoding: values_data.encoding, def_level_encoding: Encoding::RLE, @@ -774,13 +775,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // Data Page v2 compresses values only. match self.compressor { Some(ref mut cmpr) => { - cmpr.compress(values_data.buf.data(), &mut buffer)?; + cmpr.compress(&values_data.buf, &mut buffer)?; } - None => buffer.extend_from_slice(values_data.buf.data()), + None => buffer.extend_from_slice(&values_data.buf), } let data_page = Page::DataPageV2 { - buf: ByteBufferPtr::new(buffer), + buf: buffer.into(), num_values: self.page_metrics.num_buffered_values, encoding: values_data.encoding, num_nulls: self.page_metrics.num_page_nulls as u32, @@ -920,8 +921,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { if let Some(ref mut cmpr) = self.compressor { let mut output_buf = Vec::with_capacity(uncompressed_size); - cmpr.compress(page.buf.data(), &mut output_buf)?; - page.buf = ByteBufferPtr::new(output_buf); + cmpr.compress(&page.buf, &mut output_buf)?; + page.buf = Bytes::from(output_buf); } let dict_page = Page::DictionaryPage { @@ -2350,10 +2351,10 @@ mod tests { let mut data = vec![FixedLenByteArray::default(); 3]; // This is the expected min value - "aaa..." - data[0].set_data(ByteBufferPtr::new(vec![97_u8; 200])); + data[0].set_data(Bytes::from(vec![97_u8; 200])); // This is the expected max value - "ZZZ..." - data[1].set_data(ByteBufferPtr::new(vec![112_u8; 200])); - data[2].set_data(ByteBufferPtr::new(vec![98_u8; 200])); + data[1].set_data(Bytes::from(vec![112_u8; 200])); + data[2].set_data(Bytes::from(vec![98_u8; 200])); writer.write_batch(&data, None, None).unwrap(); @@ -2420,9 +2421,7 @@ mod tests { let mut data = vec![FixedLenByteArray::default(); 1]; // This is the expected min value - data[0].set_data(ByteBufferPtr::new( - String::from("Blart Versenwald III").into_bytes(), - )); + data[0].set_data(Bytes::from(String::from("Blart Versenwald III"))); writer.write_batch(&data, None, None).unwrap(); @@ -2493,9 +2492,9 @@ mod tests { // Also show that BinaryArray level comparison works here let mut greater = ByteArray::new(); - greater.set_data(ByteBufferPtr::new(v)); + greater.set_data(Bytes::from(v)); let mut original = ByteArray::new(); - original.set_data(ByteBufferPtr::new("hello".as_bytes().to_vec())); + original.set_data(Bytes::from("hello".as_bytes().to_vec())); assert!(greater > original); // UTF8 string diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index eaf4389d4350..b895c2507018 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -28,7 +28,7 @@ use crate::basic::Type; use crate::column::reader::{ColumnReader, ColumnReaderImpl}; use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; -use crate::util::{bit_util::FromBytes, memory::ByteBufferPtr}; +use crate::util::bit_util::FromBytes; /// Rust representation for logical type INT96, value is backed by an array of `u32`. /// The type only takes 12 bytes, without extra padding. @@ -103,7 +103,7 @@ impl fmt::Display for Int96 { /// Value is backed by a byte buffer. #[derive(Clone, Default)] pub struct ByteArray { - data: Option, + data: Option, } // Special case Debug that prints out byte arrays that are valid utf8 as &str's @@ -130,7 +130,7 @@ impl PartialOrd for ByteArray { (Some(_), None) => Some(Ordering::Greater), (Some(self_data), Some(other_data)) => { // compare slices directly - self_data.data().partial_cmp(other_data.data()) + self_data.partial_cmp(&other_data) } } } @@ -167,7 +167,7 @@ impl ByteArray { /// Set data from another byte buffer. #[inline] - pub fn set_data(&mut self, data: ByteBufferPtr) { + pub fn set_data(&mut self, data: Bytes) { self.data = Some(data); } @@ -178,7 +178,7 @@ impl ByteArray { self.data .as_ref() .expect("set_data should have been called") - .range(start, len), + .slice(start..start + len), ) } @@ -194,7 +194,7 @@ impl ByteArray { impl From> for ByteArray { fn from(buf: Vec) -> ByteArray { Self { - data: Some(ByteBufferPtr::new(buf)), + data: Some(buf.into()), } } } @@ -204,7 +204,7 @@ impl<'a> From<&'a [u8]> for ByteArray { let mut v = Vec::new(); v.extend_from_slice(b); Self { - data: Some(ByteBufferPtr::new(v)), + data: Some(v.into()), } } } @@ -214,20 +214,14 @@ impl<'a> From<&'a str> for ByteArray { let mut v = Vec::new(); v.extend_from_slice(s.as_bytes()); Self { - data: Some(ByteBufferPtr::new(v)), + data: Some(v.into()), } } } -impl From for ByteArray { - fn from(ptr: ByteBufferPtr) -> ByteArray { - Self { data: Some(ptr) } - } -} - impl From for ByteArray { fn from(value: Bytes) -> Self { - ByteBufferPtr::from(value).into() + Self { data: Some(value) } } } @@ -580,9 +574,10 @@ impl AsBytes for str { } pub(crate) mod private { + use bytes::Bytes; + use crate::encodings::decoding::PlainDecoderDetails; use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter}; - use crate::util::memory::ByteBufferPtr; use crate::basic::Type; use std::convert::TryInto; @@ -618,7 +613,7 @@ pub(crate) mod private { ) -> Result<()>; /// Establish the data that will be decoded in a buffer - fn set_data(decoder: &mut PlainDecoderDetails, data: ByteBufferPtr, num_values: usize); + fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize); /// Decode the value from a given buffer for a higher level decoder fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result; @@ -671,7 +666,7 @@ pub(crate) mod private { } #[inline] - fn set_data(decoder: &mut PlainDecoderDetails, data: ByteBufferPtr, num_values: usize) { + fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) { decoder.bit_reader.replace(BitReader::new(data)); decoder.num_values = num_values; } @@ -728,7 +723,7 @@ pub(crate) mod private { } #[inline] - fn set_data(decoder: &mut PlainDecoderDetails, data: ByteBufferPtr, num_values: usize) { + fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) { decoder.data.replace(data); decoder.start = 0; decoder.num_values = num_values; @@ -748,7 +743,9 @@ pub(crate) mod private { // SAFETY: Raw types should be as per the standard rust bit-vectors unsafe { let raw_buffer = &mut Self::slice_as_bytes_mut(buffer)[..bytes_to_decode]; - raw_buffer.copy_from_slice(data.range(decoder.start, bytes_to_decode).as_ref()); + raw_buffer.copy_from_slice(data.slice( + decoder.start..decoder.start + bytes_to_decode + ).as_ref()); }; decoder.start += bytes_to_decode; decoder.num_values -= num_values; @@ -815,7 +812,7 @@ pub(crate) mod private { } #[inline] - fn set_data(decoder: &mut PlainDecoderDetails, data: ByteBufferPtr, num_values: usize) { + fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) { decoder.data.replace(data); decoder.start = 0; decoder.num_values = num_values; @@ -836,8 +833,8 @@ pub(crate) mod private { return Err(eof_err!("Not enough bytes to decode")); } - let data_range = data.range(decoder.start, bytes_to_decode); - let bytes: &[u8] = data_range.data(); + let data_range = data.slice(decoder.start..decoder.start + bytes_to_decode); + let bytes: &[u8] = &data_range; decoder.start += bytes_to_decode; let mut pos = 0; // position in byte array @@ -902,7 +899,7 @@ pub(crate) mod private { } #[inline] - fn set_data(decoder: &mut PlainDecoderDetails, data: ByteBufferPtr, num_values: usize) { + fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) { decoder.data.replace(data); decoder.start = 0; decoder.num_values = num_values; @@ -917,7 +914,7 @@ pub(crate) mod private { let num_values = std::cmp::min(buffer.len(), decoder.num_values); for val_array in buffer.iter_mut().take(num_values) { let len: usize = - read_num_bytes::(4, data.start_from(decoder.start).as_ref()) as usize; + read_num_bytes::(4, data.slice(decoder.start..).as_ref()) as usize; decoder.start += std::mem::size_of::(); if data.len() < decoder.start + len { @@ -926,7 +923,7 @@ pub(crate) mod private { let val: &mut Self = val_array.as_mut_any().downcast_mut().unwrap(); - val.set_data(data.range(decoder.start, len)); + val.set_data(data.slice(decoder.start..decoder.start + len)); decoder.start += len; } decoder.num_values -= num_values; @@ -943,7 +940,7 @@ pub(crate) mod private { for _ in 0..num_values { let len: usize = - read_num_bytes::(4, data.start_from(decoder.start).as_ref()) as usize; + read_num_bytes::(4, data.slice(decoder.start..).as_ref()) as usize; decoder.start += std::mem::size_of::() + len; } decoder.num_values -= num_values; @@ -984,7 +981,7 @@ pub(crate) mod private { } #[inline] - fn set_data(decoder: &mut PlainDecoderDetails, data: ByteBufferPtr, num_values: usize) { + fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) { decoder.data.replace(data); decoder.start = 0; decoder.num_values = num_values; @@ -1007,7 +1004,7 @@ pub(crate) mod private { return Err(eof_err!("Not enough bytes to decode")); } - item.set_data(data.range(decoder.start, len)); + item.set_data(data.slice(decoder.start..decoder.start + len)); decoder.start += len; } decoder.num_values -= num_values; @@ -1241,7 +1238,7 @@ mod tests { ); assert_eq!(ByteArray::from("ABC").data(), &[b'A', b'B', b'C']); assert_eq!( - ByteArray::from(ByteBufferPtr::new(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(), + ByteArray::from(Bytes::from(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(), &[1u8, 2u8, 3u8, 4u8, 5u8] ); let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8]; diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 7aed6df419ee..5843acdb6d0f 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -17,6 +17,7 @@ //! Contains all supported decoders for Parquet. +use bytes::Bytes; use num::traits::WrappingAdd; use num::FromPrimitive; use std::{cmp, marker::PhantomData, mem}; @@ -28,10 +29,7 @@ use crate::data_type::private::ParquetValueType; use crate::data_type::*; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::{ - bit_util::{self, BitReader}, - memory::ByteBufferPtr, -}; +use crate::util::bit_util::{self, BitReader}; pub(crate) mod private { use super::*; @@ -145,7 +143,7 @@ pub(crate) mod private { pub trait Decoder: Send { /// Sets the data to decode to be `data`, which should contain `num_values` of values /// to decode. - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()>; + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>; /// Consumes values from this decoder and write the results to `buffer`. This will try /// to fill up `buffer`. @@ -238,7 +236,7 @@ pub struct PlainDecoderDetails { pub(crate) type_length: i32, // The byte array to decode from. Not set if `T` is bool. - pub(crate) data: Option, + pub(crate) data: Option, // Read `data` bit by bit. Only set if `T` is bool. pub(crate) bit_reader: Option, @@ -275,7 +273,7 @@ impl PlainDecoder { impl Decoder for PlainDecoder { #[inline] - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { T::T::set_data(&mut self.inner, data, num_values); Ok(()) } @@ -350,11 +348,11 @@ impl DictDecoder { } impl Decoder for DictDecoder { - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { // First byte in `data` is bit width let bit_width = data.as_ref()[0]; let mut rle_decoder = RleDecoder::new(bit_width); - rle_decoder.set_data(data.start_from(1)); + rle_decoder.set_data(data.slice(1..)); self.num_values = num_values; self.rle_decoder = Some(rle_decoder); Ok(()) @@ -418,7 +416,7 @@ impl RleValueDecoder { impl Decoder for RleValueDecoder { #[inline] - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { // Only support RLE value reader for boolean values with bit width of 1. ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType"); @@ -426,7 +424,8 @@ impl Decoder for RleValueDecoder { const I32_SIZE: usize = mem::size_of::(); let data_size = bit_util::read_num_bytes::(I32_SIZE, data.as_ref()) as usize; self.decoder = RleDecoder::new(1); - self.decoder.set_data(data.range(I32_SIZE, data_size)); + self.decoder + .set_data(data.slice(I32_SIZE..I32_SIZE + data_size)); self.values_left = num_values; Ok(()) } @@ -604,7 +603,7 @@ where { // # of total values is derived from encoding #[inline] - fn set_data(&mut self, data: ByteBufferPtr, _index: usize) -> Result<()> { + fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> { self.bit_reader = BitReader::new(data); self.initialized = true; @@ -811,7 +810,7 @@ pub struct DeltaLengthByteArrayDecoder { current_idx: usize, // Concatenated byte array data - data: Option, + data: Option, // Offset into `data`, always point to the beginning of next byte array. offset: usize, @@ -844,16 +843,16 @@ impl DeltaLengthByteArrayDecoder { } impl Decoder for DeltaLengthByteArrayDecoder { - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { match T::get_physical_type() { Type::BYTE_ARRAY => { let mut len_decoder = DeltaBitPackDecoder::::new(); - len_decoder.set_data(data.all(), num_values)?; + len_decoder.set_data(data.clone(), num_values)?; let num_lengths = len_decoder.values_left(); self.lengths.resize(num_lengths, 0); len_decoder.get(&mut self.lengths[..])?; - self.data = Some(data.start_from(len_decoder.get_offset())); + self.data = Some(data.slice(len_decoder.get_offset()..)); self.offset = 0; self.current_idx = 0; self.num_values = num_lengths; @@ -879,7 +878,7 @@ impl Decoder for DeltaLengthByteArrayDecoder { item.as_mut_any() .downcast_mut::() .unwrap() - .set_data(data.range(self.offset, len)); + .set_data(data.slice(self.offset..self.offset + len)); self.offset += len; self.current_idx += 1; @@ -977,18 +976,18 @@ impl DeltaByteArrayDecoder { } impl Decoder for DeltaByteArrayDecoder { - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { match T::get_physical_type() { Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { let mut prefix_len_decoder = DeltaBitPackDecoder::::new(); - prefix_len_decoder.set_data(data.all(), num_values)?; + prefix_len_decoder.set_data(data.clone(), num_values)?; let num_prefixes = prefix_len_decoder.values_left(); self.prefix_lengths.resize(num_prefixes, 0); prefix_len_decoder.get(&mut self.prefix_lengths[..])?; let mut suffix_decoder = DeltaLengthByteArrayDecoder::new(); suffix_decoder - .set_data(data.start_from(prefix_len_decoder.get_offset()), num_values)?; + .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?; self.suffix_decoder = Some(suffix_decoder); self.num_values = num_prefixes; self.current_idx = 0; @@ -1023,7 +1022,7 @@ impl Decoder for DeltaByteArrayDecoder { result.extend_from_slice(&self.previous_value[0..prefix_len]); result.extend_from_slice(suffix); - let data = ByteBufferPtr::new(result.clone()); + let data = Bytes::from(result.clone()); match ty { Type::BYTE_ARRAY => item @@ -1131,33 +1130,21 @@ mod tests { let data = [42, 18, 52]; let data_bytes = Int32Type::to_byte_array(&data[..]); let mut buffer = [0; 3]; - test_plain_decode::( - ByteBufferPtr::new(data_bytes), - 3, - -1, - &mut buffer[..], - &data[..], - ); + test_plain_decode::(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]); } #[test] fn test_plain_skip_int32() { let data = [42, 18, 52]; let data_bytes = Int32Type::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 3, - 1, - -1, - &data[1..], - ); + test_plain_skip::(Bytes::from(data_bytes), 3, 1, -1, &data[1..]); } #[test] fn test_plain_skip_all_int32() { let data = [42, 18, 52]; let data_bytes = Int32Type::to_byte_array(&data[..]); - test_plain_skip::(ByteBufferPtr::new(data_bytes), 3, 5, -1, &[]); + test_plain_skip::(Bytes::from(data_bytes), 3, 5, -1, &[]); } #[test] @@ -1169,7 +1156,7 @@ mod tests { let num_nulls = 5; let valid_bits = [0b01001010]; test_plain_decode_spaced::( - ByteBufferPtr::new(data_bytes), + Bytes::from(data_bytes), 3, -1, &mut buffer[..], @@ -1184,33 +1171,21 @@ mod tests { let data = [42, 18, 52]; let data_bytes = Int64Type::to_byte_array(&data[..]); let mut buffer = [0; 3]; - test_plain_decode::( - ByteBufferPtr::new(data_bytes), - 3, - -1, - &mut buffer[..], - &data[..], - ); + test_plain_decode::(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]); } #[test] fn test_plain_skip_int64() { let data = [42, 18, 52]; let data_bytes = Int64Type::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 3, - 2, - -1, - &data[2..], - ); + test_plain_skip::(Bytes::from(data_bytes), 3, 2, -1, &data[2..]); } #[test] fn test_plain_skip_all_int64() { let data = [42, 18, 52]; let data_bytes = Int64Type::to_byte_array(&data[..]); - test_plain_skip::(ByteBufferPtr::new(data_bytes), 3, 3, -1, &[]); + test_plain_skip::(Bytes::from(data_bytes), 3, 3, -1, &[]); } #[test] @@ -1218,53 +1193,35 @@ mod tests { let data = [PI_f32, 2.414, 12.51]; let data_bytes = FloatType::to_byte_array(&data[..]); let mut buffer = [0.0; 3]; - test_plain_decode::( - ByteBufferPtr::new(data_bytes), - 3, - -1, - &mut buffer[..], - &data[..], - ); + test_plain_decode::(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]); } #[test] fn test_plain_skip_float() { let data = [PI_f32, 2.414, 12.51]; let data_bytes = FloatType::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 3, - 1, - -1, - &data[1..], - ); + test_plain_skip::(Bytes::from(data_bytes), 3, 1, -1, &data[1..]); } #[test] fn test_plain_skip_all_float() { let data = [PI_f32, 2.414, 12.51]; let data_bytes = FloatType::to_byte_array(&data[..]); - test_plain_skip::(ByteBufferPtr::new(data_bytes), 3, 4, -1, &[]); + test_plain_skip::(Bytes::from(data_bytes), 3, 4, -1, &[]); } #[test] fn test_plain_skip_double() { let data = [PI_f64, 2.414f64, 12.51f64]; let data_bytes = DoubleType::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 3, - 1, - -1, - &data[1..], - ); + test_plain_skip::(Bytes::from(data_bytes), 3, 1, -1, &data[1..]); } #[test] fn test_plain_skip_all_double() { let data = [PI_f64, 2.414f64, 12.51f64]; let data_bytes = DoubleType::to_byte_array(&data[..]); - test_plain_skip::(ByteBufferPtr::new(data_bytes), 3, 5, -1, &[]); + test_plain_skip::(Bytes::from(data_bytes), 3, 5, -1, &[]); } #[test] @@ -1272,13 +1229,7 @@ mod tests { let data = [PI_f64, 2.414f64, 12.51f64]; let data_bytes = DoubleType::to_byte_array(&data[..]); let mut buffer = [0.0f64; 3]; - test_plain_decode::( - ByteBufferPtr::new(data_bytes), - 3, - -1, - &mut buffer[..], - &data[..], - ); + test_plain_decode::(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]); } #[test] @@ -1290,13 +1241,7 @@ mod tests { data[3].set_data(40, 50, 60); let data_bytes = Int96Type::to_byte_array(&data[..]); let mut buffer = [Int96::new(); 4]; - test_plain_decode::( - ByteBufferPtr::new(data_bytes), - 4, - -1, - &mut buffer[..], - &data[..], - ); + test_plain_decode::(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]); } #[test] @@ -1307,13 +1252,7 @@ mod tests { data[2].set_data(10, 20, 30); data[3].set_data(40, 50, 60); let data_bytes = Int96Type::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 4, - 2, - -1, - &data[2..], - ); + test_plain_skip::(Bytes::from(data_bytes), 4, 2, -1, &data[2..]); } #[test] @@ -1324,7 +1263,7 @@ mod tests { data[2].set_data(10, 20, 30); data[3].set_data(40, 50, 60); let data_bytes = Int96Type::to_byte_array(&data[..]); - test_plain_skip::(ByteBufferPtr::new(data_bytes), 4, 8, -1, &[]); + test_plain_skip::(Bytes::from(data_bytes), 4, 8, -1, &[]); } #[test] @@ -1334,13 +1273,7 @@ mod tests { ]; let data_bytes = BoolType::to_byte_array(&data[..]); let mut buffer = [false; 10]; - test_plain_decode::( - ByteBufferPtr::new(data_bytes), - 10, - -1, - &mut buffer[..], - &data[..], - ); + test_plain_decode::(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]); } #[test] @@ -1349,13 +1282,7 @@ mod tests { false, true, false, false, true, false, true, true, false, true, ]; let data_bytes = BoolType::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 10, - 5, - -1, - &data[5..], - ); + test_plain_skip::(Bytes::from(data_bytes), 10, 5, -1, &data[5..]); } #[test] @@ -1364,18 +1291,18 @@ mod tests { false, true, false, false, true, false, true, true, false, true, ]; let data_bytes = BoolType::to_byte_array(&data[..]); - test_plain_skip::(ByteBufferPtr::new(data_bytes), 10, 20, -1, &[]); + test_plain_skip::(Bytes::from(data_bytes), 10, 20, -1, &[]); } #[test] fn test_plain_decode_byte_array() { let mut data = vec![ByteArray::new(); 2]; - data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes())); - data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes())); + data[0].set_data(Bytes::from(String::from("hello"))); + data[1].set_data(Bytes::from(String::from("parquet"))); let data_bytes = ByteArrayType::to_byte_array(&data[..]); let mut buffer = vec![ByteArray::new(); 2]; test_plain_decode::( - ByteBufferPtr::new(data_bytes), + Bytes::from(data_bytes), 2, -1, &mut buffer[..], @@ -1386,37 +1313,31 @@ mod tests { #[test] fn test_plain_skip_byte_array() { let mut data = vec![ByteArray::new(); 2]; - data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes())); - data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes())); + data[0].set_data(Bytes::from(String::from("hello"))); + data[1].set_data(Bytes::from(String::from("parquet"))); let data_bytes = ByteArrayType::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 2, - 1, - -1, - &data[1..], - ); + test_plain_skip::(Bytes::from(data_bytes), 2, 1, -1, &data[1..]); } #[test] fn test_plain_skip_all_byte_array() { let mut data = vec![ByteArray::new(); 2]; - data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes())); - data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes())); + data[0].set_data(Bytes::from(String::from("hello"))); + data[1].set_data(Bytes::from(String::from("parquet"))); let data_bytes = ByteArrayType::to_byte_array(&data[..]); - test_plain_skip::(ByteBufferPtr::new(data_bytes), 2, 2, -1, &[]); + test_plain_skip::(Bytes::from(data_bytes), 2, 2, -1, &[]); } #[test] fn test_plain_decode_fixed_len_byte_array() { let mut data = vec![FixedLenByteArray::default(); 3]; - data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes())); - data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes())); - data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes())); + data[0].set_data(Bytes::from(String::from("bird"))); + data[1].set_data(Bytes::from(String::from("come"))); + data[2].set_data(Bytes::from(String::from("flow"))); let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]); let mut buffer = vec![FixedLenByteArray::default(); 3]; test_plain_decode::( - ByteBufferPtr::new(data_bytes), + Bytes::from(data_bytes), 3, 4, &mut buffer[..], @@ -1427,37 +1348,25 @@ mod tests { #[test] fn test_plain_skip_fixed_len_byte_array() { let mut data = vec![FixedLenByteArray::default(); 3]; - data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes())); - data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes())); - data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes())); + data[0].set_data(Bytes::from(String::from("bird"))); + data[1].set_data(Bytes::from(String::from("come"))); + data[2].set_data(Bytes::from(String::from("flow"))); let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 3, - 1, - 4, - &data[1..], - ); + test_plain_skip::(Bytes::from(data_bytes), 3, 1, 4, &data[1..]); } #[test] fn test_plain_skip_all_fixed_len_byte_array() { let mut data = vec![FixedLenByteArray::default(); 3]; - data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes())); - data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes())); - data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes())); + data[0].set_data(Bytes::from(String::from("bird"))); + data[1].set_data(Bytes::from(String::from("come"))); + data[2].set_data(Bytes::from(String::from("flow"))); let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]); - test_plain_skip::( - ByteBufferPtr::new(data_bytes), - 3, - 6, - 4, - &[], - ); + test_plain_skip::(Bytes::from(data_bytes), 3, 6, 4, &[]); } fn test_plain_decode( - data: ByteBufferPtr, + data: Bytes, num_values: usize, type_length: i32, buffer: &mut [T::T], @@ -1473,7 +1382,7 @@ mod tests { } fn test_plain_skip( - data: ByteBufferPtr, + data: Bytes, num_values: usize, skip: usize, type_length: i32, @@ -1501,7 +1410,7 @@ mod tests { } fn test_plain_decode_spaced( - data: ByteBufferPtr, + data: Bytes, num_values: usize, type_length: i32, buffer: &mut [T::T], @@ -1530,9 +1439,7 @@ mod tests { #[should_panic(expected = "RleValueDecoder only supports BoolType")] fn test_rle_value_decode_int32_not_supported() { let mut decoder = RleValueDecoder::::new(); - decoder - .set_data(ByteBufferPtr::new(vec![5, 0, 0, 0]), 1) - .unwrap(); + decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap(); } #[test] @@ -1730,9 +1637,8 @@ mod tests { 128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ]; - let buffer = ByteBufferPtr::new(data_bytes); let mut decoder: DeltaBitPackDecoder = DeltaBitPackDecoder::new(); - decoder.set_data(buffer, 3).unwrap(); + decoder.set_data(data_bytes.into(), 3).unwrap(); // check exact offsets, because when reading partial values we end up with // some data not being read from bit reader assert_eq!(decoder.get_offset(), 5); @@ -1794,7 +1700,7 @@ mod tests { let length = data.len(); - let ptr = ByteBufferPtr::new(data); + let ptr = Bytes::from(data); let mut reader = BitReader::new(ptr.clone()); assert_eq!(reader.get_vlq_int().unwrap(), 256); assert_eq!(reader.get_vlq_int().unwrap(), 4); @@ -1810,7 +1716,7 @@ mod tests { assert_eq!(decoder.get_offset(), length); // Test with truncated buffer - decoder.set_data(ptr.range(0, 12), 0).unwrap(); + decoder.set_data(ptr.slice(..12), 0).unwrap(); let err = decoder.get(&mut output).unwrap_err().to_string(); assert!( err.contains("Expected to read 64 values from miniblock got 8"), diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 4f4a6ab4f55a..dafae064afbf 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -18,6 +18,8 @@ // ---------------------------------------------------------------------- // Dictionary encoding +use bytes::Bytes; + use crate::basic::{Encoding, Type}; use crate::data_type::private::ParquetValueType; use crate::data_type::DataType; @@ -27,7 +29,6 @@ use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; -use crate::util::memory::ByteBufferPtr; #[derive(Debug)] struct KeyStorage { @@ -112,7 +113,7 @@ impl DictEncoder { /// Writes out the dictionary values with PLAIN encoding in a byte buffer, and return /// the result. - pub fn write_dict(&self) -> Result { + pub fn write_dict(&self) -> Result { let mut plain_encoder = PlainEncoder::::new(); plain_encoder.put(&self.interner.storage().uniques)?; plain_encoder.flush_buffer() @@ -120,7 +121,7 @@ impl DictEncoder { /// Writes out the dictionary values with RLE encoding in a byte buffer, and return /// the result. - pub fn write_indices(&mut self) -> Result { + pub fn write_indices(&mut self) -> Result { let buffer_len = self.estimated_data_encoded_size(); let mut buffer = Vec::with_capacity(buffer_len); buffer.push(self.bit_width()); @@ -131,7 +132,7 @@ impl DictEncoder { encoder.put(*index) } self.indices.clear(); - Ok(ByteBufferPtr::new(encoder.consume())) + Ok(encoder.consume().into()) } fn put_one(&mut self, value: &T::T) { @@ -165,7 +166,7 @@ impl Encoder for DictEncoder { RleEncoder::max_buffer_size(bit_width, self.indices.len()) } - fn flush_buffer(&mut self) -> Result { + fn flush_buffer(&mut self) -> Result { self.write_indices() } } diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index 3088f332183b..89e61ee226ad 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -24,11 +24,9 @@ use crate::data_type::private::ParquetValueType; use crate::data_type::*; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; -use crate::util::{ - bit_util::{self, num_required_bits, BitWriter}, - memory::ByteBufferPtr, -}; +use crate::util::bit_util::{self, num_required_bits, BitWriter}; +use bytes::Bytes; pub use dict_encoder::DictEncoder; mod dict_encoder; @@ -70,7 +68,7 @@ pub trait Encoder: Send { /// Flushes the underlying byte buffer that's being processed by this encoder, and /// return the immutable copy of it. This will also reset the internal state. - fn flush_buffer(&mut self) -> Result; + fn flush_buffer(&mut self) -> Result; } /// Gets a encoder for the particular data type `T` and encoding `encoding`. Memory usage @@ -143,7 +141,7 @@ impl Encoder for PlainEncoder { } #[inline] - fn flush_buffer(&mut self) -> Result { + fn flush_buffer(&mut self) -> Result { self.buffer .extend_from_slice(self.bit_writer.flush_buffer()); self.bit_writer.clear(); @@ -223,7 +221,7 @@ impl Encoder for RleValueEncoder { } #[inline] - fn flush_buffer(&mut self) -> Result { + fn flush_buffer(&mut self) -> Result { ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType"); let rle_encoder = self .encoder @@ -238,7 +236,7 @@ impl Encoder for RleValueEncoder { let len = (buf.len() - 4) as i32; buf[..4].copy_from_slice(&len.to_le_bytes()); - Ok(ByteBufferPtr::new(buf)) + Ok(buf.into()) } } @@ -456,7 +454,7 @@ impl Encoder for DeltaBitPackEncoder { self.bit_writer.bytes_written() } - fn flush_buffer(&mut self) -> Result { + fn flush_buffer(&mut self) -> Result { // Write remaining values self.flush_block_values()?; // Write page header with total values @@ -597,7 +595,7 @@ impl Encoder for DeltaLengthByteArrayEncoder { self.len_encoder.estimated_data_encoded_size() + self.encoded_size } - fn flush_buffer(&mut self) -> Result { + fn flush_buffer(&mut self) -> Result { ensure_phys_ty!( Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY, "DeltaLengthByteArrayEncoder only supports ByteArrayType" @@ -605,14 +603,14 @@ impl Encoder for DeltaLengthByteArrayEncoder { let mut total_bytes = vec![]; let lengths = self.len_encoder.flush_buffer()?; - total_bytes.extend_from_slice(lengths.data()); + total_bytes.extend_from_slice(&lengths); self.data.iter().for_each(|byte_array| { total_bytes.extend_from_slice(byte_array.data()); }); self.data.clear(); self.encoded_size = 0; - Ok(ByteBufferPtr::new(total_bytes)) + Ok(total_bytes.into()) } } @@ -696,7 +694,7 @@ impl Encoder for DeltaByteArrayEncoder { + self.suffix_writer.estimated_data_encoded_size() } - fn flush_buffer(&mut self) -> Result { + fn flush_buffer(&mut self) -> Result { match T::get_physical_type() { Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { // TODO: investigate if we can merge lengths and suffixes @@ -704,17 +702,17 @@ impl Encoder for DeltaByteArrayEncoder { let mut total_bytes = vec![]; // Insert lengths ... let lengths = self.prefix_len_encoder.flush_buffer()?; - total_bytes.extend_from_slice(lengths.data()); + total_bytes.extend_from_slice(&lengths); // ... followed by suffixes let suffixes = self.suffix_writer.flush_buffer()?; - total_bytes.extend_from_slice(suffixes.data()); + total_bytes.extend_from_slice(&suffixes); self.previous.clear(); - Ok(ByteBufferPtr::new(total_bytes)) + Ok(total_bytes.into()) } _ => panic!( "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType" - ) + ), } } } diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 63ab15c73ead..5807f6b9c527 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -17,12 +17,11 @@ use std::{cmp, mem::size_of}; +use bytes::Bytes; + use crate::errors::{ParquetError, Result}; use crate::util::bit_util::from_le_slice; -use crate::util::{ - bit_util::{self, BitReader, BitWriter, FromBytes}, - memory::ByteBufferPtr, -}; +use crate::util::bit_util::{self, BitReader, BitWriter, FromBytes}; /// Rle/Bit-Packing Hybrid Encoding /// The grammar for this encoding looks like the following (copied verbatim @@ -326,7 +325,7 @@ impl RleDecoder { } #[inline] - pub fn set_data(&mut self, data: ByteBufferPtr) { + pub fn set_data(&mut self, data: Bytes) { if let Some(ref mut bit_reader) = self.bit_reader { bit_reader.reset(data); } else { @@ -543,17 +542,15 @@ mod tests { use crate::util::bit_util::ceil; use rand::{self, distributions::Standard, thread_rng, Rng, SeedableRng}; - use crate::util::memory::ByteBufferPtr; - const MAX_WIDTH: usize = 32; #[test] fn test_rle_decode_int32() { // Test data: 0-7 with bit width 3 // 00000011 10001000 11000110 11111010 - let data = ByteBufferPtr::new(vec![0x03, 0x88, 0xC6, 0xFA]); + let data = vec![0x03, 0x88, 0xC6, 0xFA]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data); + decoder.set_data(data.into()); let mut buffer = vec![0; 8]; let expected = vec![0, 1, 2, 3, 4, 5, 6, 7]; let result = decoder.get_batch::(&mut buffer); @@ -565,9 +562,9 @@ mod tests { fn test_rle_skip_int32() { // Test data: 0-7 with bit width 3 // 00000011 10001000 11000110 11111010 - let data = ByteBufferPtr::new(vec![0x03, 0x88, 0xC6, 0xFA]); + let data = vec![0x03, 0x88, 0xC6, 0xFA]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data); + decoder.set_data(data.into()); let expected = vec![2, 3, 4, 5, 6, 7]; let skipped = decoder.skip(2).expect("skipping values"); assert_eq!(skipped, 2); @@ -598,18 +595,17 @@ mod tests { fn test_rle_decode_bool() { // RLE test data: 50 1s followed by 50 0s // 01100100 00000001 01100100 00000000 - let data1 = ByteBufferPtr::new(vec![0x64, 0x01, 0x64, 0x00]); + let data1 = vec![0x64, 0x01, 0x64, 0x00]; // Bit-packing test data: alternating 1s and 0s, 100 total // 100 / 8 = 13 groups // 00011011 10101010 ... 00001010 - let data2 = ByteBufferPtr::new(vec![ - 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, - 0x0A, - ]); + let data2 = vec![ + 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A, + ]; let mut decoder: RleDecoder = RleDecoder::new(1); - decoder.set_data(data1); + decoder.set_data(data1.into()); let mut buffer = vec![false; 100]; let mut expected = vec![]; for i in 0..100 { @@ -623,7 +619,7 @@ mod tests { assert!(result.is_ok()); assert_eq!(buffer, expected); - decoder.set_data(data2); + decoder.set_data(data2.into()); let mut buffer = vec![false; 100]; let mut expected = vec![]; for i in 0..100 { @@ -642,18 +638,17 @@ mod tests { fn test_rle_skip_bool() { // RLE test data: 50 1s followed by 50 0s // 01100100 00000001 01100100 00000000 - let data1 = ByteBufferPtr::new(vec![0x64, 0x01, 0x64, 0x00]); + let data1 = vec![0x64, 0x01, 0x64, 0x00]; // Bit-packing test data: alternating 1s and 0s, 100 total // 100 / 8 = 13 groups // 00011011 10101010 ... 00001010 - let data2 = ByteBufferPtr::new(vec![ - 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, - 0x0A, - ]); + let data2 = vec![ + 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A, + ]; let mut decoder: RleDecoder = RleDecoder::new(1); - decoder.set_data(data1); + decoder.set_data(data1.into()); let mut buffer = vec![true; 50]; let expected = vec![false; 50]; @@ -665,7 +660,7 @@ mod tests { assert_eq!(remainder, 50); assert_eq!(buffer, expected); - decoder.set_data(data2); + decoder.set_data(data2.into()); let mut buffer = vec![false; 50]; let mut expected = vec![]; for i in 0..50 { @@ -689,9 +684,9 @@ mod tests { // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s // 00000110 00000000 00001000 00000001 00001010 00000010 let dict = vec![10, 20, 30]; - let data = ByteBufferPtr::new(vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]); + let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data); + decoder.set_data(data.into()); let mut buffer = vec![0; 12]; let expected = vec![10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30, 30]; let result = decoder.get_batch_with_dict::(&dict, &mut buffer, 12); @@ -702,9 +697,9 @@ mod tests { // 011 100 101 011 100 101 011 100 101 100 101 101 // 00000011 01100011 11000111 10001110 00000011 01100101 00001011 let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"]; - let data = ByteBufferPtr::new(vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]); + let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data); + decoder.set_data(data.into()); let mut buffer = vec![""; 12]; let expected = vec![ "ddd", "eee", "fff", "ddd", "eee", "fff", "ddd", "eee", "fff", "eee", "fff", @@ -724,9 +719,9 @@ mod tests { // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s // 00000110 00000000 00001000 00000001 00001010 00000010 let dict = vec![10, 20, 30]; - let data = ByteBufferPtr::new(vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]); + let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data); + decoder.set_data(data.into()); let mut buffer = vec![0; 10]; let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30]; let skipped = decoder.skip(2).expect("skipping two values"); @@ -741,9 +736,9 @@ mod tests { // 011 100 101 011 100 101 011 100 101 100 101 101 // 00000011 01100011 11000111 10001110 00000011 01100101 00001011 let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"]; - let data = ByteBufferPtr::new(vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]); + let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data); + decoder.set_data(data.into()); let mut buffer = vec![""; 8]; let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"]; let skipped = decoder.skip(4).expect("skipping four values"); @@ -766,7 +761,7 @@ mod tests { for v in values { encoder.put(*v as u64) } - let buffer = ByteBufferPtr::new(encoder.consume()); + let buffer: Bytes = encoder.consume().into(); if expected_len != -1 { assert_eq!(buffer.len(), expected_len as usize); } @@ -776,7 +771,7 @@ mod tests { // Verify read let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer.all()); + decoder.set_data(buffer.clone()); for v in values { let val: i64 = decoder .get() @@ -888,7 +883,7 @@ mod tests { (3 << 1) | 1, // bit-packed run of 3 * 8 ]; data.extend(std::iter::repeat(0xFF).take(20)); - let data = ByteBufferPtr::new(data); + let data: Bytes = data.into(); let mut decoder = RleDecoder::new(8); decoder.set_data(data.clone()); @@ -926,7 +921,7 @@ mod tests { buffer.push(0); let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(ByteBufferPtr::new(buffer)); + decoder.set_data(buffer.into()); // We don't always reliably know how many non-null values are contained in a page // and so the decoder must work correctly without a precise value count @@ -963,7 +958,7 @@ mod tests { for _ in 0..run_bytes { writer.put_aligned(0xFF_u8, 1); } - let buffer = ByteBufferPtr::new(writer.consume()); + let buffer: Bytes = writer.consume().into(); let mut decoder = RleDecoder::new(1); decoder.set_data(buffer.clone()); @@ -992,7 +987,7 @@ mod tests { } let buffer = encoder.consume(); let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(ByteBufferPtr::new(buffer)); + decoder.set_data(Bytes::from(buffer)); let mut actual_values: Vec = vec![0; values.len()]; decoder .get_batch(&mut actual_values) @@ -1007,11 +1002,11 @@ mod tests { encoder.put(*v as u64) } - let buffer = ByteBufferPtr::new(encoder.consume()); + let buffer = Bytes::from(encoder.consume()); // Verify read let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer.all()); + decoder.set_data(buffer.clone()); for v in values { let val = decoder .get::() diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 0d032c27aa06..43e169cd085b 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -40,7 +40,7 @@ use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; -use crate::util::memory::ByteBufferPtr; +use bytes::Bytes; use thrift::protocol::TCompactInputProtocol; impl TryFrom for SerializedFileReader { @@ -386,7 +386,7 @@ fn read_page_header_len(input: &mut T) -> Result<(usize, PageHeader)> { /// Decodes a [`Page`] from the provided `buffer` pub(crate) fn decode_page( page_header: PageHeader, - buffer: ByteBufferPtr, + buffer: Bytes, physical_type: Type, decompressor: Option<&mut Box>, ) -> Result { @@ -428,7 +428,7 @@ pub(crate) fn decode_page( )); } - ByteBufferPtr::new(decompressed) + Bytes::from(decompressed) } _ => buffer, }; @@ -627,7 +627,7 @@ impl PageReader for SerializedPageReader { decode_page( header, - ByteBufferPtr::new(buffer), + Bytes::from(buffer), self.physical_type, self.decompressor.as_mut(), )? @@ -656,7 +656,7 @@ impl PageReader for SerializedPageReader { let bytes = buffer.slice(offset..); decode_page( header, - bytes.into(), + bytes, self.physical_type, self.decompressor.as_mut(), )? diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index dbbd8b4b99a2..2b9f261d9f42 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -756,7 +756,6 @@ mod tests { use crate::record::{Row, RowAccessor}; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescriptor, ColumnPath}; - use crate::util::memory::ByteBufferPtr; #[test] fn test_row_group_writer_error_not_all_columns_written() { @@ -1040,7 +1039,7 @@ mod tests { fn test_page_writer_data_pages() { let pages = vec![ Page::DataPage { - buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5, 6, 7, 8]), + buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]), num_values: 10, encoding: Encoding::DELTA_BINARY_PACKED, def_level_encoding: Encoding::RLE, @@ -1048,7 +1047,7 @@ mod tests { statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)), }, Page::DataPageV2 { - buf: ByteBufferPtr::new(vec![4; 128]), + buf: Bytes::from(vec![4; 128]), num_values: 10, encoding: Encoding::DELTA_BINARY_PACKED, num_nulls: 2, @@ -1068,13 +1067,13 @@ mod tests { fn test_page_writer_dict_pages() { let pages = vec![ Page::DictionaryPage { - buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5]), + buf: Bytes::from(vec![1, 2, 3, 4, 5]), num_values: 5, encoding: Encoding::RLE_DICTIONARY, is_sorted: false, }, Page::DataPage { - buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5, 6, 7, 8]), + buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]), num_values: 10, encoding: Encoding::DELTA_BINARY_PACKED, def_level_encoding: Encoding::RLE, @@ -1082,7 +1081,7 @@ mod tests { statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)), }, Page::DataPageV2 { - buf: ByteBufferPtr::new(vec![4; 128]), + buf: Bytes::from(vec![4; 128]), num_values: 10, encoding: Encoding::DELTA_BINARY_PACKED, num_nulls: 2, @@ -1122,10 +1121,10 @@ mod tests { ref statistics, } => { total_num_values += num_values as i64; - let output_buf = compress_helper(compressor.as_mut(), buf.data()); + let output_buf = compress_helper(compressor.as_mut(), buf); Page::DataPage { - buf: ByteBufferPtr::new(output_buf), + buf: Bytes::from(output_buf), num_values, encoding, def_level_encoding, @@ -1147,12 +1146,12 @@ mod tests { } => { total_num_values += num_values as i64; let offset = (def_levels_byte_len + rep_levels_byte_len) as usize; - let cmp_buf = compress_helper(compressor.as_mut(), &buf.data()[offset..]); - let mut output_buf = Vec::from(&buf.data()[..offset]); + let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]); + let mut output_buf = Vec::from(&buf[..offset]); output_buf.extend_from_slice(&cmp_buf[..]); Page::DataPageV2 { - buf: ByteBufferPtr::new(output_buf), + buf: Bytes::from(output_buf), num_values, encoding, num_nulls, @@ -1170,10 +1169,10 @@ mod tests { encoding, is_sorted, } => { - let output_buf = compress_helper(compressor.as_mut(), buf.data()); + let output_buf = compress_helper(compressor.as_mut(), buf); Page::DictionaryPage { - buf: ByteBufferPtr::new(output_buf), + buf: Bytes::from(output_buf), num_values, encoding, is_sorted, @@ -1248,7 +1247,7 @@ mod tests { /// Check if pages match. fn assert_page(left: &Page, right: &Page) { assert_eq!(left.page_type(), right.page_type()); - assert_eq!(left.buffer().data(), right.buffer().data()); + assert_eq!(&left.buffer(), &right.buffer()); assert_eq!(left.num_values(), right.num_values()); assert_eq!(left.encoding(), right.encoding()); assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics())); diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index f1612c90cc2a..0279bbc382ea 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -74,10 +74,6 @@ pub mod data_type; #[doc(hidden)] pub use self::encodings::{decoding, encoding}; -#[cfg(feature = "experimental")] -#[doc(hidden)] -pub use self::util::memory; - experimental!(#[macro_use] mod util); #[cfg(feature = "arrow")] pub mod arrow; diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 597190a46eff..b1dd23574a19 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -17,10 +17,11 @@ use std::{cmp, mem::size_of}; +use bytes::Bytes; + use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96}; use crate::errors::{ParquetError, Result}; use crate::util::bit_pack::{unpack16, unpack32, unpack64, unpack8}; -use crate::util::memory::ByteBufferPtr; #[inline] pub fn from_le_slice(bs: &[u8]) -> T { @@ -341,7 +342,7 @@ pub const MAX_VLQ_BYTE_LEN: usize = 10; pub struct BitReader { /// The byte buffer to read from, passed in by client - buffer: ByteBufferPtr, + buffer: Bytes, /// Bytes are memcpy'd from `buffer` and values are read from this variable. /// This is faster than reading values byte by byte directly from `buffer` @@ -365,7 +366,7 @@ pub struct BitReader { /// Utility class to read bit/byte stream. This class can read bits or bytes that are /// either byte aligned or not. impl BitReader { - pub fn new(buffer: ByteBufferPtr) -> Self { + pub fn new(buffer: Bytes) -> Self { BitReader { buffer, buffered_values: 0, @@ -374,7 +375,7 @@ impl BitReader { } } - pub fn reset(&mut self, buffer: ByteBufferPtr) { + pub fn reset(&mut self, buffer: Bytes) { self.buffer = buffer; self.buffered_values = 0; self.byte_offset = 0; @@ -456,8 +457,6 @@ impl BitReader { } } - let in_buf = self.buffer.data(); - // Read directly into output buffer match size_of::() { 1 => { @@ -465,7 +464,7 @@ impl BitReader { let out = unsafe { std::slice::from_raw_parts_mut(ptr, batch.len()) }; while values_to_read - i >= 8 { let out_slice = (&mut out[i..i + 8]).try_into().unwrap(); - unpack8(&in_buf[self.byte_offset..], out_slice, num_bits); + unpack8(&self.buffer[self.byte_offset..], out_slice, num_bits); self.byte_offset += num_bits; i += 8; } @@ -475,7 +474,7 @@ impl BitReader { let out = unsafe { std::slice::from_raw_parts_mut(ptr, batch.len()) }; while values_to_read - i >= 16 { let out_slice = (&mut out[i..i + 16]).try_into().unwrap(); - unpack16(&in_buf[self.byte_offset..], out_slice, num_bits); + unpack16(&self.buffer[self.byte_offset..], out_slice, num_bits); self.byte_offset += 2 * num_bits; i += 16; } @@ -485,7 +484,7 @@ impl BitReader { let out = unsafe { std::slice::from_raw_parts_mut(ptr, batch.len()) }; while values_to_read - i >= 32 { let out_slice = (&mut out[i..i + 32]).try_into().unwrap(); - unpack32(&in_buf[self.byte_offset..], out_slice, num_bits); + unpack32(&self.buffer[self.byte_offset..], out_slice, num_bits); self.byte_offset += 4 * num_bits; i += 32; } @@ -495,7 +494,7 @@ impl BitReader { let out = unsafe { std::slice::from_raw_parts_mut(ptr, batch.len()) }; while values_to_read - i >= 64 { let out_slice = (&mut out[i..i + 64]).try_into().unwrap(); - unpack64(&in_buf[self.byte_offset..], out_slice, num_bits); + unpack64(&self.buffer[self.byte_offset..], out_slice, num_bits); self.byte_offset += 8 * num_bits; i += 64; } @@ -506,7 +505,7 @@ impl BitReader { // Try to read smaller batches if possible if size_of::() > 4 && values_to_read - i >= 32 && num_bits <= 32 { let mut out_buf = [0_u32; 32]; - unpack32(&in_buf[self.byte_offset..], &mut out_buf, num_bits); + unpack32(&self.buffer[self.byte_offset..], &mut out_buf, num_bits); self.byte_offset += 4 * num_bits; for out in out_buf { @@ -520,7 +519,7 @@ impl BitReader { if size_of::() > 2 && values_to_read - i >= 16 && num_bits <= 16 { let mut out_buf = [0_u16; 16]; - unpack16(&in_buf[self.byte_offset..], &mut out_buf, num_bits); + unpack16(&self.buffer[self.byte_offset..], &mut out_buf, num_bits); self.byte_offset += 2 * num_bits; for out in out_buf { @@ -534,7 +533,7 @@ impl BitReader { if size_of::() > 1 && values_to_read - i >= 8 && num_bits <= 8 { let mut out_buf = [0_u8; 8]; - unpack8(&in_buf[self.byte_offset..], &mut out_buf, num_bits); + unpack8(&self.buffer[self.byte_offset..], &mut out_buf, num_bits); self.byte_offset += num_bits; for out in out_buf { @@ -595,7 +594,7 @@ impl BitReader { self.byte_offset = self.get_byte_offset(); self.bit_offset = 0; - let src = &self.buffer.data()[self.byte_offset..]; + let src = &self.buffer[self.byte_offset..]; let to_read = num_bytes.min(src.len()); buf.extend_from_slice(&src[..to_read]); @@ -620,7 +619,7 @@ impl BitReader { } // Advance byte_offset to next unread byte and read num_bytes - let v = read_num_bytes::(num_bytes, &self.buffer.data()[self.byte_offset..]); + let v = read_num_bytes::(num_bytes, &self.buffer[self.byte_offset..]); self.byte_offset += num_bytes; Some(v) @@ -672,14 +671,14 @@ impl BitReader { fn load_buffered_values(&mut self) { let bytes_to_read = cmp::min(self.buffer.len() - self.byte_offset, 8); self.buffered_values = - read_num_bytes::(bytes_to_read, &self.buffer.data()[self.byte_offset..]); + read_num_bytes::(bytes_to_read, &self.buffer[self.byte_offset..]); } } impl From> for BitReader { #[inline] fn from(buffer: Vec) -> Self { - BitReader::new(ByteBufferPtr::new(buffer)) + BitReader::new(buffer.into()) } } @@ -771,12 +770,12 @@ mod tests { #[test] fn test_bit_reader_get_aligned() { // 01110101 11001011 - let buffer = ByteBufferPtr::new(vec![0x75, 0xCB]); - let mut bit_reader = BitReader::new(buffer.all()); + let buffer = Bytes::from(vec![0x75, 0xCB]); + let mut bit_reader = BitReader::new(buffer.clone()); assert_eq!(bit_reader.get_value::(3), Some(5)); assert_eq!(bit_reader.get_aligned::(1), Some(203)); assert_eq!(bit_reader.get_value::(1), None); - bit_reader.reset(buffer.all()); + bit_reader.reset(buffer.clone()); assert_eq!(bit_reader.get_aligned::(3), None); } @@ -1128,7 +1127,7 @@ mod tests { #[test] fn test_get_batch_zero_extend() { let to_read = vec![0xFF; 4]; - let mut reader = BitReader::new(ByteBufferPtr::new(to_read)); + let mut reader = BitReader::from(to_read); // Create a non-zeroed output buffer let mut output = [u64::MAX; 32]; diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs deleted file mode 100644 index 25d15dd4ff73..000000000000 --- a/parquet/src/util/memory.rs +++ /dev/null @@ -1,149 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utility methods and structs for working with memory. - -use bytes::Bytes; -use std::{ - fmt::{Debug, Display, Formatter, Result as FmtResult}, - ops::Index, -}; - -// ---------------------------------------------------------------------- -// Immutable Buffer (BufferPtr) classes - -/// An representation of a slice on a reference-counting and read-only byte array. -/// Sub-slices can be further created from this. The byte array will be released -/// when all slices are dropped. -/// -/// TODO: Remove and replace with [`bytes::Bytes`] -#[derive(Clone, Debug)] -pub struct ByteBufferPtr { - data: Bytes, -} - -impl ByteBufferPtr { - /// Creates new buffer from a vector. - pub fn new(v: Vec) -> Self { - Self { data: v.into() } - } - - /// Returns slice of data in this buffer. - #[inline] - pub fn data(&self) -> &[u8] { - &self.data - } - - /// Returns length of this buffer - #[inline] - pub fn len(&self) -> usize { - self.data.len() - } - - /// Returns whether this buffer is empty - #[inline] - pub fn is_empty(&self) -> bool { - self.data.is_empty() - } - - /// Returns a shallow copy of the buffer. - /// Reference counted pointer to the data is copied. - pub fn all(&self) -> Self { - self.clone() - } - - /// Returns a shallow copy of the buffer that starts with `start` position. - pub fn start_from(&self, start: usize) -> Self { - Self { - data: self.data.slice(start..), - } - } - - /// Returns a shallow copy that is a range slice within this buffer. - pub fn range(&self, start: usize, len: usize) -> Self { - Self { - data: self.data.slice(start..start + len), - } - } -} - -impl Index for ByteBufferPtr { - type Output = u8; - - fn index(&self, index: usize) -> &u8 { - &self.data[index] - } -} - -impl Display for ByteBufferPtr { - fn fmt(&self, f: &mut Formatter) -> FmtResult { - write!(f, "{:?}", self.data) - } -} - -impl AsRef<[u8]> for ByteBufferPtr { - #[inline] - fn as_ref(&self) -> &[u8] { - &self.data - } -} - -impl From> for ByteBufferPtr { - fn from(data: Vec) -> Self { - Self { data: data.into() } - } -} - -impl From for ByteBufferPtr { - fn from(data: Bytes) -> Self { - Self { data } - } -} - -impl From for Bytes { - fn from(value: ByteBufferPtr) -> Self { - value.data - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_byte_ptr() { - let values = (0..50).collect(); - let ptr = ByteBufferPtr::new(values); - assert_eq!(ptr.len(), 50); - assert_eq!(ptr[40], 40); - - let ptr2 = ptr.all(); - assert_eq!(ptr2.len(), 50); - assert_eq!(ptr2[40], 40); - - let ptr3 = ptr.start_from(20); - assert_eq!(ptr3.len(), 30); - assert_eq!(ptr3[0], 20); - - let ptr4 = ptr3.range(10, 10); - assert_eq!(ptr4.len(), 10); - assert_eq!(ptr4[0], 30); - - let expected: Vec = (30..40).collect(); - assert_eq!(ptr4.as_ref(), expected.as_slice()); - } -} diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index d96a62a9f363..dfa1285afcf2 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -pub mod memory; #[macro_use] pub mod bit_util; mod bit_pack; diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index c51c5158cd42..b4fed752fdc5 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use bytes::Bytes; + use crate::basic::Encoding; use crate::column::page::{Page, PageIterator}; use crate::column::page::{PageMetadata, PageReader}; @@ -23,7 +25,6 @@ use crate::encodings::encoding::{get_encoder, Encoder}; use crate::encodings::levels::LevelEncoder; use crate::errors::Result; use crate::schema::types::ColumnDescPtr; -use crate::util::memory::ByteBufferPtr; use std::iter::Peekable; use std::mem; @@ -31,7 +32,7 @@ pub trait DataPageBuilder { fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]); fn add_def_levels(&mut self, max_level: i16, def_levels: &[i16]); fn add_values(&mut self, encoding: Encoding, values: &[T::T]); - fn add_indices(&mut self, indices: ByteBufferPtr); + fn add_indices(&mut self, indices: Bytes); fn consume(self) -> Page; } @@ -112,18 +113,18 @@ impl DataPageBuilder for DataPageBuilderImpl { let encoded_values = encoder .flush_buffer() .expect("consume_buffer() should be OK"); - self.buffer.extend_from_slice(encoded_values.data()); + self.buffer.extend_from_slice(&encoded_values); } - fn add_indices(&mut self, indices: ByteBufferPtr) { + fn add_indices(&mut self, indices: Bytes) { self.encoding = Some(Encoding::RLE_DICTIONARY); - self.buffer.extend_from_slice(indices.data()); + self.buffer.extend_from_slice(&indices); } fn consume(self) -> Page { if self.datapage_v2 { Page::DataPageV2 { - buf: ByteBufferPtr::new(self.buffer), + buf: Bytes::from(self.buffer), num_values: self.num_values, encoding: self.encoding.unwrap(), num_nulls: 0, /* set to dummy value - don't need this when reading @@ -137,7 +138,7 @@ impl DataPageBuilder for DataPageBuilderImpl { } } else { Page::DataPage { - buf: ByteBufferPtr::new(self.buffer), + buf: Bytes::from(self.buffer), num_values: self.num_values, encoding: self.encoding.unwrap(), def_level_encoding: Encoding::RLE, diff --git a/parquet/src/util/test_common/rand_gen.rs b/parquet/src/util/test_common/rand_gen.rs index c36b9060ca58..a267c34840c1 100644 --- a/parquet/src/util/test_common/rand_gen.rs +++ b/parquet/src/util/test_common/rand_gen.rs @@ -17,6 +17,7 @@ use crate::basic::Encoding; use crate::column::page::Page; +use bytes::Bytes; use rand::{ distributions::{uniform::SampleUniform, Distribution, Standard}, thread_rng, Rng, @@ -26,7 +27,6 @@ use std::collections::VecDeque; use crate::data_type::*; use crate::encodings::encoding::{DictEncoder, Encoder}; use crate::schema::types::ColumnDescPtr; -use crate::util::memory::ByteBufferPtr; use crate::util::{DataPageBuilder, DataPageBuilderImpl}; /// Random generator of data type `T` values and sequences. @@ -90,7 +90,7 @@ impl RandGen for ByteArrayType { for _ in 0..len { value.push(rng.gen_range(0..255)); } - result.set_data(ByteBufferPtr::new(value)); + result.set_data(Bytes::from(value)); result } }