Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Collect Parquet dictionary binary as view #17475

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::array::iterator::NonNullValuesIter;
use crate::bitmap::utils::{BitmapIter, ZipValidity};
pub type BinaryViewArray = BinaryViewArrayGeneric<[u8]>;
pub type Utf8ViewArray = BinaryViewArrayGeneric<str>;
pub use view::{View, INLINE_VIEW_SIZE};
pub use view::View;

use super::Splitable;

Expand Down
12 changes: 12 additions & 0 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,18 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.views.push(value);
}

#[inline]
pub fn push_buffer(&mut self, buffer: Buffer<u8>) -> u32 {
if !self.in_progress_buffer.is_empty() {
self.completed_buffers
.push(Buffer::from(std::mem::take(&mut self.in_progress_buffer)));
}

let buffer_idx = self.completed_buffers.len();
self.completed_buffers.push(buffer);
buffer_idx as u32
}

#[inline]
pub fn push_value<V: AsRef<T>>(&mut self, value: V) {
if let Some(validity) = &mut self.validity {
Expand Down
86 changes: 68 additions & 18 deletions crates/polars-arrow/src/array/binview/view.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::fmt::{self, Display, Formatter};
use std::ops::Add;

use bytemuck::{Pod, Zeroable};
Expand All @@ -13,10 +13,12 @@ use crate::buffer::Buffer;
use crate::datatypes::PrimitiveType;
use crate::types::NativeType;

pub const INLINE_VIEW_SIZE: u32 = 12;

// We use this instead of u128 because we want alignment of <= 8 bytes.
#[derive(Debug, Copy, Clone, Default)]
/// A reference to a set of bytes.
///
/// If `length <= 12`, these bytes are inlined over the `prefix`, `buffer_idx` and `offset` fields.
/// If `length > 12`, these fields specify a slice of a buffer.
#[derive(Copy, Clone, Default)]
#[repr(C)]
pub struct View {
/// The length of the string/bytes.
Expand All @@ -29,29 +31,77 @@ pub struct View {
pub offset: u32,
}

impl fmt::Debug for View {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.length <= Self::MAX_INLINE_SIZE {
fmt.debug_struct("View")
.field("length", &self.length)
.field("content", &unsafe {
std::slice::from_raw_parts(
(self as *const _ as *const u8).add(4),
self.length as usize,
)
})
.finish()
} else {
fmt.debug_struct("View")
.field("length", &self.length)
.field("prefix", &self.prefix.to_be_bytes())
.field("buffer_idx", &self.buffer_idx)
.field("offset", &self.offset)
.finish()
}
}
}

impl View {
pub const MAX_INLINE_SIZE: u32 = 12;

#[inline(always)]
pub fn as_u128(self) -> u128 {
unsafe { std::mem::transmute(self) }
}

/// Create a new inline view
///
/// # Panics
///
/// Panics if the `bytes.len() > View::MAX_INLINE_SIZE`.
#[inline]
pub fn new_inline(bytes: &[u8]) -> Self {
debug_assert!(bytes.len() <= u32::MAX as usize);
assert!(bytes.len() as u32 <= Self::MAX_INLINE_SIZE);

let mut view = Self {
length: bytes.len() as u32,
..Default::default()
};

let view_ptr = &mut view as *mut _ as *mut u8;

// SAFETY:
// - bytes length <= 12,
// - size_of::<View> == 16
// - View is laid out as [length, prefix, buffer_idx, offset] (using repr(C))
// - By grabbing the view_ptr and adding 4, we have provenance over prefix, buffer_idx and
// offset. (i.e. the same could not be achieved with &mut self.prefix as *mut _ as *mut u8)
unsafe {
let inline_data_ptr = view_ptr.add(4);
core::ptr::copy_nonoverlapping(bytes.as_ptr(), inline_data_ptr, bytes.len());
}
view
}

#[inline]
pub fn new_from_bytes(bytes: &[u8], buffer_idx: u32, offset: u32) -> Self {
if bytes.len() <= 12 {
let mut ret = Self {
length: bytes.len() as u32,
..Default::default()
};
let ret_ptr = &mut ret as *mut _ as *mut u8;
unsafe {
core::ptr::copy_nonoverlapping(bytes.as_ptr(), ret_ptr.add(4), bytes.len());
}
ret
debug_assert!(bytes.len() <= u32::MAX as usize);

if bytes.len() as u32 <= Self::MAX_INLINE_SIZE {
Self::new_inline(bytes)
} else {
let prefix_buf: [u8; 4] = std::array::from_fn(|i| *bytes.get(i).unwrap_or(&0));
Self {
length: bytes.len() as u32,
prefix: u32::from_le_bytes(prefix_buf),
prefix: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
buffer_idx,
offset,
}
Expand Down Expand Up @@ -190,8 +240,8 @@ where
{
for view in views {
let len = view.length;
if len <= INLINE_VIEW_SIZE {
if len < INLINE_VIEW_SIZE && view.as_u128() >> (32 + len * 8) != 0 {
if len <= View::MAX_INLINE_SIZE {
if len < View::MAX_INLINE_SIZE && view.as_u128() >> (32 + len * 8) != 0 {
polars_bail!(ComputeError: "view contained non-zero padding in prefix");
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ mod values;
pub use binary::{BinaryArray, BinaryValueIter, MutableBinaryArray, MutableBinaryValuesArray};
pub use binview::{
BinaryViewArray, BinaryViewArrayGeneric, MutableBinaryViewArray, MutablePlBinary,
MutablePlString, Utf8ViewArray, View, ViewType, INLINE_VIEW_SIZE,
MutablePlString, Utf8ViewArray, View, ViewType,
};
pub use boolean::{BooleanArray, MutableBooleanArray};
pub use dictionary::{DictionaryArray, DictionaryKey, MutableDictionaryArray};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub(crate) fn deserialize_plain(values: &[u8], num_values: usize) -> BinaryDict
for v in all {
dict_values.push(v)
}

dict_values.into()
}

Expand Down
92 changes: 65 additions & 27 deletions crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use std::cell::Cell;
use std::collections::VecDeque;

use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray};
use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray, View};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::{ArrowDataType, PhysicalType};
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::binary::decoders::*;
use crate::parquet::encoding::hybrid_rle::BinaryDictionaryTranslator;
use crate::parquet::error::ParquetError;
use crate::parquet::page::{DataPage, DictPage};
use crate::read::deserialize::utils;
use crate::read::deserialize::utils::{extend_from_decoder, next, DecodedState, MaybeNext};
use crate::read::deserialize::utils::{
self, extend_from_decoder, next, DecodedState, MaybeNext, TranslatedHybridRle,
};
use crate::read::{PagesIter, PrimitiveLogicalType};

type DecodedStateTuple = (MutableBinaryViewArray<[u8]>, MutableBitmap);
Expand Down Expand Up @@ -102,33 +105,78 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder {
BinaryState::OptionalDictionary(page_validity, page_values) => {
// Already done on the dict.
validate_utf8 = false;

let page_dict = &page_values.dict;
let offsets = page_dict.offsets();

// @NOTE: If there is no lengths (i.e. 0-1 offset), then we will have only nulls.
let max_length = offsets.lengths().max().unwrap_or(0);

// We do not have to push the buffer if all elements fit as inline views.
let buffer_idx = if max_length <= View::MAX_INLINE_SIZE as usize {
0
} else {
values.push_buffer(page_dict.values().clone())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice. So we don't have to go through the builder, but immediately use the buffer. 👍

};

// @NOTE: we could potentially use the View::new_inline function here, but that
// would require two collectors & two translators. So I don't think it is worth
// it.
let translator = BinaryDictionaryTranslator {
dictionary: page_dict,
buffer_idx,
};
let collector = TranslatedHybridRle::new(&mut page_values.values, &translator);

utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values
.values
.by_ref()
.map(|index| page_dict.value(index as usize)),
collector,
)?;
page_values.values.get_result()?;
},
BinaryState::RequiredDictionary(page) => {
// Already done on the dict.
validate_utf8 = false;

let page_dict = &page.dict;
let offsets = page_dict.offsets();

for x in page
.values
.by_ref()
.map(|index| page_dict.value(index as usize))
.take(additional)
{
values.push_value_ignore_validity(x)
if let Some(max_length) = offsets.lengths().max() {
// We do not have to push the buffer if all elements fit as inline views.
let buffer_idx = if max_length <= View::MAX_INLINE_SIZE as usize {
0
} else {
values.push_buffer(page_dict.values().clone())
};

// @NOTE: we could potentially use the View::new_inline function here, but that
// would require two collectors & two translators. So I don't think it is worth
// it.
let translator = BinaryDictionaryTranslator {
dictionary: page_dict,
buffer_idx,
};

page.values.translate_and_collect_n_into(
values.views_mut(),
additional,
&translator,
)?;
if let Some(validity) = values.validity() {
validity.extend_constant(additional, true);
}
} else {
// @NOTE: If there are no dictionary items, there is no way we can look up
// items.
if additional != 0 {
return Err(ParquetError::oos(
"Attempt to search items with empty dictionary",
)
.into());
}
}
page.values.get_result()?;
},
BinaryState::FilteredOptional(page_validity, page_values) => {
extend_from_decoder(
Expand Down Expand Up @@ -273,17 +321,7 @@ pub(super) fn finish(
}

match data_type.to_physical_type() {
PhysicalType::BinaryView => unsafe {
Ok(BinaryViewArray::new_unchecked(
data_type.clone(),
array.views().clone(),
array.data_buffers().clone(),
array.validity().cloned(),
array.total_bytes_len(),
array.total_buffer_len(),
)
.boxed())
},
PhysicalType::BinaryView => Ok(array.boxed()),
PhysicalType::Utf8View => {
// SAFETY: we already checked utf8
unsafe {
Expand Down
41 changes: 40 additions & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::VecDeque;

use arrow::array::{MutableBinaryViewArray, View};
use arrow::bitmap::utils::BitmapIter;
use arrow::bitmap::MutableBitmap;
use arrow::pushable::Pushable;
Expand Down Expand Up @@ -322,7 +323,7 @@ fn reserve_pushable_and_validity<'a, I, T, C: BatchableCollector<I, T>>(
}

/// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder
pub(super) fn extend_from_decoder<I, T: std::fmt::Debug, C: BatchableCollector<I, T>>(
pub(super) fn extend_from_decoder<I, T, C: BatchableCollector<I, T>>(
validity: &mut MutableBitmap,
page_validity: &mut dyn PageValidity,
limit: Option<usize>,
Expand Down Expand Up @@ -431,6 +432,44 @@ where
}
}

impl<'a, 'b, 'c, T> BatchableCollector<u32, MutableBinaryViewArray<[u8]>>
for TranslatedHybridRle<'a, 'b, 'c, View, T>
where
T: Translator<View>,
{
#[inline]
fn reserve(target: &mut MutableBinaryViewArray<[u8]>, n: usize) {
target.reserve(n);
}

#[inline]
fn push_n(&mut self, target: &mut MutableBinaryViewArray<[u8]>, n: usize) -> ParquetResult<()> {
self.decoder
.translate_and_collect_n_into(target.views_mut(), n, self.translator)?;

if let Some(validity) = target.validity() {
validity.extend_constant(n, true);
}

Ok(())
}

#[inline]
fn push_n_nulls(
&mut self,
target: &mut MutableBinaryViewArray<[u8]>,
n: usize,
) -> ParquetResult<()> {
target.extend_null(n);
Ok(())
}

#[inline]
fn skip_n(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl<T, P: Pushable<T>, I: Iterator<Item = T>> BatchableCollector<T, P> for I {
#[inline]
fn reserve(target: &mut P, n: usize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ pub use decoder::Decoder;
pub use encoder::encode;
use polars_utils::iter::FallibleIterator;
use polars_utils::slice::GetSaferUnchecked;
pub use translator::{DictionaryTranslator, Translator, UnitTranslator};
pub use translator::{
BinaryDictionaryTranslator, DictionaryTranslator, FnTranslator, Translator, UnitTranslator,
};

use self::buffered::HybridRleBuffered;
use super::{bitpacked, ceil8, uleb128};
Expand Down
Loading