Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Batch parquet primitive decoding #17462

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 152 additions & 15 deletions crates/polars-arrow/src/bitmap/utils/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,42 +46,123 @@ impl<'a> BitmapIter<'a> {
rest_len,
}
}

/// Consume and returns the numbers of `1` / `true` values at the beginning of the iterator.
///
/// This performs the same operation as `(&mut iter).take_while(|b| b).count()`.
///
/// This is a lot more efficient than consecutively polling the iterator and should therefore
/// be preferred, if the use-case allows for it.
pub fn take_leading_ones(&mut self) -> usize {
let word_ones = usize::min(self.word_len, self.word.trailing_ones() as usize);
self.word_len -= word_ones;
self.word = self.word.wrapping_shr(word_ones as u32);

if self.word_len != 0 {
return word_ones;
}

let mut num_leading_ones = word_ones;

while self.rest_len != 0 {
self.word_len = usize::min(self.rest_len, 64);
self.rest_len -= self.word_len;

unsafe {
let chunk = self.bytes.get_unchecked(..8).try_into().unwrap();
self.word = u64::from_le_bytes(chunk);
self.bytes = self.bytes.get_unchecked(8..);
}

let word_ones = usize::min(self.word_len, self.word.trailing_ones() as usize);
self.word_len -= word_ones;
self.word = self.word.wrapping_shr(word_ones as u32);
num_leading_ones += word_ones;

if self.word_len != 0 {
return num_leading_ones;
}
}

num_leading_ones
}

/// Consume and returns the numbers of `0` / `false` values that the start of the iterator.
///
/// This performs the same operation as `(&mut iter).take_while(|b| !b).count()`.
///
/// This is a lot more efficient than consecutively polling the iterator and should therefore
/// be preferred, if the use-case allows for it.
pub fn take_leading_zeros(&mut self) -> usize {
let word_zeros = usize::min(self.word_len, self.word.trailing_zeros() as usize);
self.word_len -= word_zeros;
self.word = self.word.wrapping_shr(word_zeros as u32);

if self.word_len != 0 {
return word_zeros;
}

let mut num_leading_zeros = word_zeros;

while self.rest_len != 0 {
self.word_len = usize::min(self.rest_len, 64);
self.rest_len -= self.word_len;
unsafe {
let chunk = self.bytes.get_unchecked(..8).try_into().unwrap();
self.word = u64::from_le_bytes(chunk);
self.bytes = self.bytes.get_unchecked(8..);
}

let word_zeros = usize::min(self.word_len, self.word.trailing_zeros() as usize);
self.word_len -= word_zeros;
self.word = self.word.wrapping_shr(word_zeros as u32);
num_leading_zeros += word_zeros;

if self.word_len != 0 {
return num_leading_zeros;
}
}

num_leading_zeros
}

/// Returns the number of remaining elements in the iterator
#[inline]
pub fn num_remaining(&self) -> usize {
self.word_len + self.rest_len
}
}

impl<'a> Iterator for BitmapIter<'a> {
type Item = bool;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.word_len != 0 {
let ret = self.word & 1 != 0;
self.word >>= 1;
self.word_len -= 1;
return Some(ret);
}
if self.word_len == 0 {
if self.rest_len == 0 {
return None;
}

if self.rest_len != 0 {
self.word_len = self.rest_len.min(64);
self.rest_len -= self.word_len;

unsafe {
let chunk = self.bytes.get_unchecked(..8).try_into().unwrap();
self.word = u64::from_le_bytes(chunk);
self.bytes = self.bytes.get_unchecked(8..);
}

let ret = self.word & 1 != 0;
self.word >>= 1;
self.word_len -= 1;
return Some(ret);
}

None
let ret = self.word & 1 != 0;
self.word >>= 1;
self.word_len -= 1;
Some(ret)
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let exact = self.word_len + self.rest_len;
(exact, Some(exact))
let num_remaining = self.num_remaining();
(num_remaining, Some(num_remaining))
}
}

Expand All @@ -102,3 +183,59 @@ impl<'a> DoubleEndedIterator for BitmapIter<'a> {

unsafe impl TrustedLen for BitmapIter<'_> {}
impl ExactSizeIterator for BitmapIter<'_> {}

#[cfg(test)]
mod tests {
use super::*;

#[test]
#[ignore = "Fuzz test. Too slow"]
fn test_leading_ops() {
for _ in 0..10_000 {
let bs = rand::random::<u8>() % 4;

let mut length = 0;
let mut pattern = Vec::new();
for _ in 0..rand::random::<usize>() % 1024 {
let word = match bs {
0 => u64::MIN,
1 => u64::MAX,
2 | 3 => rand::random(),
_ => unreachable!(),
};

pattern.extend_from_slice(&word.to_le_bytes());
length += 64;
}

for _ in 0..rand::random::<usize>() % 7 {
pattern.push(rand::random::<u8>());
length += 8;
}

let last_length = rand::random::<usize>() % 8;
if last_length != 0 {
pattern.push(rand::random::<u8>());
length += last_length;
}

let mut iter = BitmapIter::new(&pattern, 0, length);

let mut prev_remaining = iter.num_remaining();
while iter.num_remaining() != 0 {
let num_ones = iter.clone().take_leading_ones();
assert_eq!(num_ones, (&mut iter).take_while(|&b| b).count());

let num_zeros = iter.clone().take_leading_zeros();
assert_eq!(num_zeros, (&mut iter).take_while(|&b| !b).count());

// Ensure that we are making progress
assert!(iter.num_remaining() < prev_remaining);
prev_remaining = iter.num_remaining();
}

assert_eq!(iter.take_leading_zeros(), 0);
assert_eq!(iter.take_leading_ones(), 0);
}
}
}
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/legacy/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub trait CustomIterTools: Iterator {
where
Self: Sized,
{
TrustMyLength::new(self, length)
unsafe { TrustMyLength::new(self, length) }
}

fn collect_trusted<T: FromTrustedLenIterator<Self::Item>>(self) -> T
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-arrow/src/pushable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub trait Pushable<T>: Sized + Default {
fn push(&mut self, value: T);
fn len(&self) -> usize;
fn push_null(&mut self);
#[inline]
fn extend_n(&mut self, n: usize, iter: impl Iterator<Item = T>) {
for item in iter.take(n) {
self.push(item);
}
}
fn extend_constant(&mut self, additional: usize, value: T);
fn extend_null_constant(&mut self, additional: usize);
fn freeze(self) -> Self::Freeze;
Expand All @@ -31,6 +37,7 @@ impl Pushable<bool> for MutableBitmap {
fn reserve(&mut self, additional: usize) {
MutableBitmap::reserve(self, additional)
}

#[inline]
fn len(&self) -> usize {
self.len()
Expand Down Expand Up @@ -82,6 +89,11 @@ impl<T: Copy + Default> Pushable<T> for Vec<T> {
self.push(value)
}

#[inline]
fn extend_n(&mut self, n: usize, iter: impl Iterator<Item = T>) {
self.extend(iter.take(n));
}

#[inline]
fn extend_constant(&mut self, additional: usize, value: T) {
self.resize(self.len() + additional, value);
Expand Down
8 changes: 7 additions & 1 deletion crates/polars-arrow/src/trusted_len.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ impl<I, J> TrustMyLength<I, J>
where
I: Iterator<Item = J>,
{
/// Create a new `TrustMyLength` iterator
///
/// # Safety
///
/// This is safe if the iterator always has the exact length given by `len`.
#[inline]
pub fn new(iter: I, len: usize) -> Self {
pub unsafe fn new(iter: I, len: usize) -> Self {
Self { iter, len }
}
}
Expand All @@ -104,6 +109,7 @@ where
self.iter.next()
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
values,
page_values,
),
)?,
BinaryState::Required(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
Expand All @@ -92,7 +92,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
offsets,
page_values.lengths.by_ref(),
);
)?;

let length = *offsets.last() - last_offset;

Expand Down Expand Up @@ -123,7 +123,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
.values
.by_ref()
.map(|index| page_dict.value(index as usize)),
);
)?;
page_values.values.get_result()?;
},
BinaryState::RequiredDictionary(page) => {
Expand All @@ -148,7 +148,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
values,
page_values.by_ref(),
);
)?;
},
BinaryState::FilteredOptionalDelta(page_validity, page_values) => {
extend_from_decoder(
Expand All @@ -157,7 +157,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
values,
page_values.by_ref(),
);
)?;
},
BinaryState::FilteredRequiredDictionary(page) => {
// Already done on the dict.
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
.values
.by_ref()
.map(|index| page_dict.value(index as usize)),
);
)?;
page_values.values.get_result()?;
},
BinaryState::OptionalDeltaByteArray(page_validity, page_values) => extend_from_decoder(
Expand All @@ -195,7 +195,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
values,
page_values,
),
)?,
BinaryState::DeltaByteArray(page_values) => {
for x in page_values.take(additional) {
values.push(x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ impl<'a> FilteredDelta<'a> {

#[derive(Debug)]
pub(crate) struct RequiredDictionary<'a> {
pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>,
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a BinaryDict,
}

impl<'a> RequiredDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult<Self> {
let values = utils::dict_indices_decoder(page)?.into_iter();
let values = utils::dict_indices_decoder(page)?;

Ok(Self { dict, values })
}
Expand All @@ -198,13 +198,13 @@ impl<'a> RequiredDictionary<'a> {

#[derive(Debug)]
pub(crate) struct FilteredRequiredDictionary<'a> {
pub values: SliceFilteredIter<hybrid_rle::BufferedHybridRleDecoderIter<'a>>,
pub values: SliceFilteredIter<hybrid_rle::HybridRleDecoder<'a>>,
pub dict: &'a BinaryDict,
}

impl<'a> FilteredRequiredDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult<Self> {
let values = utils::dict_indices_decoder(page)?.into_iter();
let values = utils::dict_indices_decoder(page)?;

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);
Expand All @@ -220,13 +220,13 @@ impl<'a> FilteredRequiredDictionary<'a> {

#[derive(Debug)]
pub(crate) struct ValuesDictionary<'a> {
pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>,
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a BinaryDict,
}

impl<'a> ValuesDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult<Self> {
let values = utils::dict_indices_decoder(page)?.into_iter();
let values = utils::dict_indices_decoder(page)?;

Ok(Self { dict, values })
}
Expand Down
Loading
Loading