Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ pub(crate) mod private {
+ super::SliceAsBytes
+ PartialOrd
+ Send
+ crate::encodings::decoding::private::GetDecoder
{
/// Encode the value directly from a higher level encoder
fn encode<W: std::io::Write>(
Expand Down
142 changes: 125 additions & 17 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{cmp, marker::PhantomData, mem};
use super::rle::RleDecoder;

use crate::basic::*;
use crate::data_type::private::*;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
Expand All @@ -31,6 +31,111 @@ use crate::util::{
memory::{ByteBuffer, ByteBufferPtr},
};

pub(crate) mod private {
use super::*;

/// A trait that allows getting a [`Decoder`] implementation for a [`DataType`] with
/// the corresponding [`ParquetValueType`]. This is necessary to support
/// [`Decoder`] implementations that may not be applicable for all [`DataType`]
/// and by extension all [`ParquetValueType`]
pub trait GetDecoder {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
get_decoder_default(descr, encoding)
}
}

fn get_decoder_default<T: DataType>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
"Cannot initialize this encoding through this function"
)),
Encoding::RLE
| Encoding::DELTA_BINARY_PACKED
| Encoding::DELTA_BYTE_ARRAY
| Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
"Encoding {} is not supported for type",
encoding
)),
e => Err(nyi_err!("Encoding {} is not supported", e)),
}
}

impl GetDecoder for bool {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for i32 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for i64 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for f32 {}
impl GetDecoder for f64 {}

impl GetDecoder for ByteArray {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
Encoding::DELTA_LENGTH_BYTE_ARRAY => {
Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
}
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for FixedLenByteArray {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for Int96 {}
}

// ----------------------------------------------------------------------
// Decoders

Expand Down Expand Up @@ -109,20 +214,8 @@ pub fn get_decoder<T: DataType>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
let decoder: Box<dyn Decoder<T>> = match encoding {
Encoding::PLAIN => Box::new(PlainDecoder::new(descr.type_length())),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
return Err(general_err!(
"Cannot initialize this encoding through this function"
));
}
Encoding::RLE => Box::new(RleValueDecoder::new()),
Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
e => return Err(nyi_err!("Encoding {} is not supported", e)),
};
Ok(decoder)
use self::private::GetDecoder;
T::T::get_decoder(descr, encoding)
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -817,8 +910,11 @@ mod tests {
// supported encodings
create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_BYTE_ARRAY, None);
create_and_check_decoder::<ByteArrayType>(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
None,
);
create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
create_and_check_decoder::<BoolType>(Encoding::RLE, None);

// error when initializing
Expand All @@ -834,6 +930,18 @@ mod tests {
"Cannot initialize this encoding through this function"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
Some(general_err!(
"Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::DELTA_BYTE_ARRAY,
Some(general_err!(
"Encoding DELTA_BYTE_ARRAY is not supported for type"
)),
);

// unsupported
create_and_check_decoder::<Int32Type>(
Expand Down