Skip to content

Commit

Permalink
Remove ByteBufferPtr and replace with Bytes (#5055)
Browse files Browse the repository at this point in the history
* Replace usages of ByteBufferPtr with Bytes

* Remove parquet memory.rs module
  • Loading branch information
Jefffrey authored Nov 8, 2023
1 parent 747dcbf commit 1635f5b
Show file tree
Hide file tree
Showing 27 changed files with 307 additions and 561 deletions.
22 changes: 11 additions & 11 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use crate::data_type::Int32Type;
use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
use arrow_array::{
Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, OffsetSizeTrait,
};
use arrow_buffer::{i256, Buffer};
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -189,7 +189,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder

fn set_dict(
&mut self,
buf: ByteBufferPtr,
buf: Bytes,
num_values: u32,
encoding: Encoding,
_is_sorted: bool,
Expand Down Expand Up @@ -219,7 +219,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
fn set_data(
&mut self,
encoding: Encoding,
data: ByteBufferPtr,
data: Bytes,
num_levels: usize,
num_values: Option<usize>,
) -> Result<()> {
Expand Down Expand Up @@ -263,7 +263,7 @@ pub enum ByteArrayDecoder {
impl ByteArrayDecoder {
pub fn new(
encoding: Encoding,
data: ByteBufferPtr,
data: Bytes,
num_levels: usize,
num_values: Option<usize>,
validate_utf8: bool,
Expand Down Expand Up @@ -339,7 +339,7 @@ impl ByteArrayDecoder {

/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
pub struct ByteArrayDecoderPlain {
buf: ByteBufferPtr,
buf: Bytes,
offset: usize,
validate_utf8: bool,

Expand All @@ -350,7 +350,7 @@ pub struct ByteArrayDecoderPlain {

impl ByteArrayDecoderPlain {
pub fn new(
buf: ByteBufferPtr,
buf: Bytes,
num_levels: usize,
num_values: Option<usize>,
validate_utf8: bool,
Expand Down Expand Up @@ -438,16 +438,16 @@ impl ByteArrayDecoderPlain {
/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
pub struct ByteArrayDecoderDeltaLength {
lengths: Vec<i32>,
data: ByteBufferPtr,
data: Bytes,
length_offset: usize,
data_offset: usize,
validate_utf8: bool,
}

impl ByteArrayDecoderDeltaLength {
fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
len_decoder.set_data(data.all(), 0)?;
len_decoder.set_data(data.clone(), 0)?;
let values = len_decoder.values_left();

let mut lengths = vec![0; values];
Expand Down Expand Up @@ -522,7 +522,7 @@ pub struct ByteArrayDecoderDelta {
}

impl ByteArrayDecoderDelta {
fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
Ok(Self {
decoder: DeltaByteArrayDecoder::new(data)?,
validate_utf8,
Expand Down Expand Up @@ -558,7 +558,7 @@ pub struct ByteArrayDecoderDictionary {
}

impl ByteArrayDecoderDictionary {
fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option<usize>) -> Self {
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
Self {
decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;

use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
Expand All @@ -39,7 +40,6 @@ use crate::encodings::rle::RleDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::FromBytes;
use crate::util::memory::ByteBufferPtr;

/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
macro_rules! make_reader {
Expand Down Expand Up @@ -253,7 +253,7 @@ where

fn set_dict(
&mut self,
buf: ByteBufferPtr,
buf: Bytes,
num_values: u32,
encoding: Encoding,
_is_sorted: bool,
Expand Down Expand Up @@ -286,15 +286,15 @@ where
fn set_data(
&mut self,
encoding: Encoding,
data: ByteBufferPtr,
data: Bytes,
num_levels: usize,
num_values: Option<usize>,
) -> Result<()> {
let decoder = match encoding {
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.start_from(1));
decoder.set_data(data.slice(1..));
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values: num_values.unwrap_or(num_levels),
Expand Down
10 changes: 5 additions & 5 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use crate::column::page::PageIterator;
use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
use arrow_array::{
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray,
IntervalDayTimeArray, IntervalYearMonthArray,
};
use arrow_buffer::{i256, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowType, IntervalUnit};
use bytes::Bytes;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -298,7 +298,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {

struct ValueDecoder {
byte_length: usize,
dict_page: Option<ByteBufferPtr>,
dict_page: Option<Bytes>,
decoder: Option<Decoder>,
}

Expand All @@ -315,7 +315,7 @@ impl ColumnValueDecoder for ValueDecoder {

fn set_dict(
&mut self,
buf: ByteBufferPtr,
buf: Bytes,
num_values: u32,
encoding: Encoding,
_is_sorted: bool,
Expand Down Expand Up @@ -345,7 +345,7 @@ impl ColumnValueDecoder for ValueDecoder {
fn set_data(
&mut self,
encoding: Encoding,
data: ByteBufferPtr,
data: Bytes,
num_levels: usize,
num_values: Option<usize>,
) -> Result<()> {
Expand Down Expand Up @@ -434,7 +434,7 @@ impl ColumnValueDecoder for ValueDecoder {
}

enum Decoder {
Plain { buf: ByteBufferPtr, offset: usize },
Plain { buf: Bytes, offset: usize },
Dict { decoder: DictIndexDecoder },
Delta { decoder: DeltaByteArrayDecoder },
}
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow_array::{Array, ArrayRef};
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
use std::sync::Arc;

Expand All @@ -27,7 +28,6 @@ use crate::data_type::{ByteArray, ByteArrayType};
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::Result;
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type};
use crate::util::memory::ByteBufferPtr;

/// Returns a descriptor for a UTF-8 column
pub fn utf8_column() -> ColumnDescPtr {
Expand All @@ -45,15 +45,15 @@ pub fn utf8_column() -> ColumnDescPtr {
}

/// Encode `data` with the provided `encoding`
pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> Bytes {
let mut encoder = get_encoder::<ByteArrayType>(encoding).unwrap();

encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
}

/// Returns the encoded dictionary and value data
pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) {
pub fn encode_dictionary(data: &[ByteArray]) -> (Bytes, Bytes) {
let mut dict_encoder = DictEncoder::<ByteArrayType>::new(utf8_column());

dict_encoder.put(data).unwrap();
Expand All @@ -68,7 +68,7 @@ pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) {
/// Returns an array of data with its associated encoding, along with an encoded dictionary
pub fn byte_array_all_encodings(
data: Vec<impl Into<ByteArray>>,
) -> (Vec<(Encoding, ByteBufferPtr)>, ByteBufferPtr) {
) -> (Vec<(Encoding, Bytes)>, Bytes) {
let data: Vec<_> = data.into_iter().map(Into::into).collect();
let (encoded_dictionary, encoded_rle) = encode_dictionary(&data);

Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl FallbackEncoder {
let lengths = lengths.flush_buffer()?;

let mut out = Vec::with_capacity(lengths.len() + buffer.len());
out.extend_from_slice(lengths.data());
out.extend_from_slice(&lengths);
out.extend_from_slice(buffer);
buffer.clear();
(out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
Expand All @@ -252,8 +252,8 @@ impl FallbackEncoder {

let mut out =
Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
out.extend_from_slice(prefix_lengths.data());
out.extend_from_slice(suffix_lengths.data());
out.extend_from_slice(&prefix_lengths);
out.extend_from_slice(&suffix_lengths);
out.extend_from_slice(buffer);
buffer.clear();
last_value.clear();
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl PageWriter for ArrowPageWriter {

buf.length += compressed_size;
buf.data.push(header);
buf.data.push(data.into());
buf.data.push(data);

Ok(spec)
}
Expand Down
11 changes: 6 additions & 5 deletions parquet/src/arrow/decoder/delta_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,34 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Bytes;

use crate::data_type::Int32Type;
use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::util::memory::ByteBufferPtr;

/// Decoder for `Encoding::DELTA_BYTE_ARRAY`
pub struct DeltaByteArrayDecoder {
prefix_lengths: Vec<i32>,
suffix_lengths: Vec<i32>,
data: ByteBufferPtr,
data: Bytes,
length_offset: usize,
data_offset: usize,
last_value: Vec<u8>,
}

impl DeltaByteArrayDecoder {
/// Create a new [`DeltaByteArrayDecoder`] with the provided data page
pub fn new(data: ByteBufferPtr) -> Result<Self> {
pub fn new(data: Bytes) -> Result<Self> {
let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
prefix.set_data(data.all(), 0)?;
prefix.set_data(data.clone(), 0)?;

let num_prefix = prefix.values_left();
let mut prefix_lengths = vec![0; num_prefix];
assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);

let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
suffix.set_data(data.start_from(prefix.get_offset()), 0)?;
suffix.set_data(data.slice(prefix.get_offset()..), 0)?;

let num_suffix = suffix.values_left();
let mut suffix_lengths = vec![0; num_suffix];
Expand Down
7 changes: 4 additions & 3 deletions parquet/src/arrow/decoder/dictionary_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Bytes;

use crate::encodings::rle::RleDecoder;
use crate::errors::Result;
use crate::util::memory::ByteBufferPtr;

/// Decoder for `Encoding::RLE_DICTIONARY` indices
pub struct DictIndexDecoder {
Expand All @@ -41,10 +42,10 @@ pub struct DictIndexDecoder {
impl DictIndexDecoder {
/// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels
/// associated with this data page, and the number of non-null values (if known)
pub fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option<usize>) -> Self {
pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.start_from(1));
decoder.set_data(data.slice(1..));

Self {
decoder,
Expand Down
14 changes: 7 additions & 7 deletions parquet/src/arrow/record_reader/definition_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::ops::Range;
use arrow_array::builder::BooleanBufferBuilder;
use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
use arrow_buffer::Buffer;
use bytes::Bytes;

use crate::arrow::buffer::bit_util::count_set_bits;
use crate::basic::Encoding;
Expand All @@ -28,7 +29,6 @@ use crate::column::reader::decoder::{
};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;

use super::buffer::ScalarBuffer;

Expand Down Expand Up @@ -152,7 +152,7 @@ impl DefinitionLevelBufferDecoder {
impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
type Slice = DefinitionLevelBuffer;

fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
fn set_data(&mut self, encoding: Encoding, data: Bytes) {
match &mut self.decoder {
MaybePacked::Packed(d) => d.set_data(encoding, data),
MaybePacked::Fallback(d) => d.set_data(encoding, data),
Expand Down Expand Up @@ -219,7 +219,7 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
/// [RLE]: https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3
/// [BIT_PACKED]: https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4
struct PackedDecoder {
data: ByteBufferPtr,
data: Bytes,
data_offset: usize,
rle_left: usize,
rle_value: bool,
Expand Down Expand Up @@ -278,7 +278,7 @@ impl PackedDecoder {
impl PackedDecoder {
fn new() -> Self {
Self {
data: ByteBufferPtr::new(vec![]),
data: Bytes::from(vec![]),
data_offset: 0,
rle_left: 0,
rle_value: false,
Expand All @@ -287,7 +287,7 @@ impl PackedDecoder {
}
}

fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
fn set_data(&mut self, encoding: Encoding, data: Bytes) {
self.rle_left = 0;
self.rle_value = false;
self.packed_offset = 0;
Expand Down Expand Up @@ -385,7 +385,7 @@ mod tests {

let encoded = encoder.consume();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
decoder.set_data(Encoding::RLE, encoded.into());

// Decode data in random length intervals
let mut decoded = BooleanBufferBuilder::new(len);
Expand Down Expand Up @@ -424,7 +424,7 @@ mod tests {

let encoded = encoder.consume();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
decoder.set_data(Encoding::RLE, encoded.into());

let mut skip_value = 0;
let mut read_value = 0;
Expand Down
Loading

0 comments on commit 1635f5b

Please sign in to comment.