Skip to content

Commit

Permalink
fix: Several parquet reader/writer regressions (#17941)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jul 31, 2024
1 parent 4275808 commit 880e2e6
Show file tree
Hide file tree
Showing 15 changed files with 272 additions and 107 deletions.
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ impl FixedSizeBinaryArray {
polars_ensure!(*size != 0, ComputeError: "FixedSizeBinaryArray expects a positive size");
Ok(*size)
},
_ => {
polars_bail!(ComputeError: "FixedSizeBinaryArray expects DataType::FixedSizeBinary")
other => {
polars_bail!(ComputeError: "FixedSizeBinaryArray expects DataType::FixedSizeBinary. found {other:?}")
},
}
}
Expand Down
10 changes: 3 additions & 7 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,17 @@ use arrow::types::Offset;
pub(crate) use basic::BinaryDecoder;

use self::utils::Binary;
use super::utils::freeze_validity;
use super::ParquetResult;

fn finalize<O: Offset>(
data_type: ArrowDataType,
mut values: Binary<O>,
mut validity: MutableBitmap,
validity: MutableBitmap,
) -> ParquetResult<Box<dyn Array>> {
values.offsets.shrink_to_fit();
values.values.shrink_to_fit();
let validity = if validity.is_empty() {
None
} else {
validity.shrink_to_fit();
Some(validity.freeze())
};
let validity = freeze_validity(validity);

match data_type.to_physical_type() {
PhysicalType::Binary | PhysicalType::LargeBinary => unsafe {
Expand Down
9 changes: 4 additions & 5 deletions crates/polars-parquet/src/arrow/read/deserialize/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use arrow::array::{
Array, BinaryViewArray, DictionaryArray, DictionaryKey, MutableBinaryViewArray, PrimitiveArray,
Utf8ViewArray, View,
};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::{ArrowDataType, PhysicalType};
use polars_error::PolarsResult;

use super::binary::decoders::*;
use super::utils::freeze_validity;
use crate::parquet::encoding::hybrid_rle::{self, DictionaryTranslator};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{DataPage, DictPage};
Expand Down Expand Up @@ -235,11 +236,9 @@ impl utils::Decoder for BinViewDecoder {
(values, validity): Self::DecodedState,
) -> ParquetResult<Box<dyn Array>> {
let mut array: BinaryViewArray = values.freeze();
let validity: Bitmap = validity.freeze();

if validity.unset_bits() != validity.len() {
array = array.with_validity(Some(validity))
}
let validity = freeze_validity(validity);
array = array.with_validity(validity);

match data_type.to_physical_type() {
PhysicalType::BinaryView => Ok(array.boxed()),
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;

use super::utils;
use super::utils::{extend_from_decoder, Decoder, ExactSize};
use super::utils::{self, extend_from_decoder, freeze_validity, Decoder, ExactSize};
use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer;
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::encoding::Encoding;
Expand Down Expand Up @@ -50,7 +49,7 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> {
},
Encoding::Rle => {
// @NOTE: For a nullable list, we might very well overestimate the amount of
// values, but we never collect those items. We don't really have a way to now the
// values, but we never collect those items. We don't really have a way to know the
// number of valid items in the V1 specification.

// For RLE boolean values the length in bytes is pre-pended.
Expand Down Expand Up @@ -232,7 +231,8 @@ impl Decoder for BooleanDecoder {
_dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<Self::Output> {
Ok(BooleanArray::new(data_type, values.into(), validity.into()))
let validity = freeze_validity(validity);
Ok(BooleanArray::new(data_type, values.into(), validity))
}
}

Expand Down
11 changes: 3 additions & 8 deletions crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;

use super::utils::{
self, dict_indices_decoder, extend_from_decoder, BatchableCollector, Decoder, DictDecodable,
ExactSize, PageValidity, StateTranslation,
self, dict_indices_decoder, extend_from_decoder, freeze_validity, BatchableCollector, Decoder,
DictDecodable, ExactSize, PageValidity, StateTranslation,
};
use super::ParquetError;
use crate::parquet::encoding::hybrid_rle::{self, HybridRleDecoder, Translator};
Expand Down Expand Up @@ -118,12 +118,7 @@ impl<K: DictionaryKey, D: utils::DictDecodable> utils::Decoder for DictionaryDec
dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<DictionaryArray<K>> {
let validity = if validity.is_empty() || validity.unset_bits() == 0 {
None
} else {
Some(validity.freeze())
};

let validity = freeze_validity(validity);
let dict = dict.unwrap();
let keys = PrimitiveArray::new(K::PRIMITIVE.into(), values.into(), validity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;

use super::utils::{dict_indices_decoder, extend_from_decoder, not_implemented, Decoder};
use super::utils::{
dict_indices_decoder, extend_from_decoder, freeze_validity, not_implemented, Decoder,
};
use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer;
use crate::parquet::encoding::{hybrid_rle, Encoding};
use crate::parquet::error::{ParquetError, ParquetResult};
Expand Down Expand Up @@ -224,6 +226,12 @@ impl Decoder for BinaryDecoder {
}

fn gather_one(&self, target: &mut Self::Target, value: &'a [u8]) -> ParquetResult<()> {
// We make the null value length 0, which allows us to do this.
if value.is_empty() {
target.resize(target.len() + self.size, 0);
return Ok(());
}

target.extend_from_slice(value);
Ok(())
}
Expand All @@ -234,9 +242,17 @@ impl Decoder for BinaryDecoder {
value: &'a [u8],
n: usize,
) -> ParquetResult<()> {
// We make the null value length 0, which allows us to do this.
if value.is_empty() {
target.resize(target.len() + n * self.size, 0);
return Ok(());
}

debug_assert_eq!(value.len(), self.size);
for _ in 0..n {
target.extend(value);
}

Ok(())
}
}
Expand All @@ -246,7 +262,10 @@ impl Decoder for BinaryDecoder {
size: self.size,
};

let null_value = &dict[..self.size];
// @NOTE:
// This is a special case in our gatherer. If the length of the value is 0, then we just
// resize with the appropriate size. Important is that this also works for FSL with size=0.
let null_value = &[];

match page_validity {
None => {
Expand Down Expand Up @@ -274,10 +293,11 @@ impl Decoder for BinaryDecoder {
_dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<Self::Output> {
let validity = freeze_validity(validity);
Ok(FixedSizeBinaryArray::new(
data_type,
values.values.into(),
validity.into(),
validity,
))
}
}
Expand Down
16 changes: 8 additions & 8 deletions crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use simple::page_iter_to_array;

pub use self::nested_utils::{init_nested, InitNested, NestedState};
pub use self::utils::filter::Filter;
use self::utils::freeze_validity;
use super::*;
use crate::parquet::read::get_page_iterator as _get_page_iterator;
use crate::parquet::schema::types::PrimitiveType;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub fn create_list(
values: Box<dyn Array>,
) -> Box<dyn Array> {
let (mut offsets, validity) = nested.pop().unwrap();
let validity = validity.and_then(freeze_validity);
match data_type.to_logical_type() {
ArrowDataType::List(_) => {
offsets.push(values.len() as i64);
Expand All @@ -62,7 +64,7 @@ pub fn create_list(
data_type,
offsets.into(),
values,
validity.and_then(|x| x.into()),
validity,
))
},
ArrowDataType::LargeList(_) => {
Expand All @@ -72,14 +74,12 @@ pub fn create_list(
data_type,
offsets.try_into().expect("List too large"),
values,
validity.and_then(|x| x.into()),
validity,
))
},
ArrowDataType::FixedSizeList(_, _) => Box::new(FixedSizeListArray::new(
data_type,
values,
validity.and_then(|x| x.into()),
)),
ArrowDataType::FixedSizeList(_, _) => {
Box::new(FixedSizeListArray::new(data_type, values, validity))
},
_ => unreachable!(),
}
}
Expand All @@ -104,7 +104,7 @@ pub fn create_map(
data_type,
offsets.into(),
values,
validity.and_then(|x| x.into()),
validity.and_then(freeze_validity),
))
},
_ => unreachable!(),
Expand Down
Loading

0 comments on commit 880e2e6

Please sign in to comment.