Skip to content

Commit

Permalink
full working poc
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Aug 1, 2024
1 parent f6b1543 commit cd972cd
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 14 deletions.
22 changes: 18 additions & 4 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ fn rg_to_dfs_new(

let live_variables = predicate.live_variables()?;

dbg!(&live_variables);

let mut live_indices = Vec::with_capacity(live_variables.len());
let live_variables = live_variables
.iter()
Expand Down Expand Up @@ -298,7 +296,7 @@ fn rg_to_dfs_new(
.collect::<PolarsResult<Vec<_>>>()
})?;

let mut df_columns = vec![Vec::with_capacity(live_indices.len()); num_row_groups];
let mut df_columns = vec![Vec::with_capacity(live_indices.len()); included_row_groups.len()];
for (i, col) in pred_columns.into_iter().enumerate() {
df_columns[i / live_indices.len()].push(col);
}
Expand Down Expand Up @@ -334,11 +332,26 @@ fn rg_to_dfs_new(
let bitmap = bitmap.freeze();

debug_assert_eq!(df.height(), bitmap.set_bits());
debug_assert_eq!(md.num_rows(), bitmap.len());
Ok((bitmap, df))
})
.filter(|v| v.as_ref().is_ok_and(|(bm, _)| bm.set_bits() > 0))
.collect::<PolarsResult<Vec<(Bitmap, DataFrame)>>>()?;

let mut num_removed = 0;
let dfs: Vec<(Bitmap, DataFrame)> = dfs
.into_iter()
.enumerate()
.filter_map(|(i, (mask, df))| {
if mask.set_bits() == 0 {
included_row_groups.remove(i - num_removed);
num_removed += 1;
return None;
}

Some((mask, df))
})
.collect();

// @TODO
// *previous_row_count += df.height() as IdxSize;

Expand All @@ -357,6 +370,7 @@ fn rg_to_dfs_new(
let (mask, _) = &dfs[i / num_unloaded_columns];

let md = &file_metadata.row_groups[rg_idx];
debug_assert_eq!(md.num_rows(), mask.len());
column_idx_to_series(
col_idx,
md,
Expand Down
17 changes: 9 additions & 8 deletions crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ impl<'a, T: Unpackable> Decoder<'a, T> {
DecoderIter { buffer, idx: 0 }
}

pub fn num_bits(&self) -> usize {
self.num_bits
}

/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
pub fn try_new(packed: &'a [u8], num_bits: usize, length: usize) -> Result<Self, ParquetError> {
let block_size = std::mem::size_of::<T>() * num_bits;
Expand Down Expand Up @@ -113,12 +117,8 @@ impl<'a, 'b, T: Unpackable> Iterator for ChunkedDecoder<'a, 'b, T> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let is_exact = self.decoder.len() % T::Unpacked::LENGTH == 0;
let (low, high) = self.decoder.packed.size_hint();

let delta = usize::from(!is_exact);

(low - delta, high.map(|h| h - delta))
let len = self.decoder.len() / T::Unpacked::LENGTH;
(len, Some(len))
}
}

Expand Down Expand Up @@ -160,7 +160,10 @@ impl<'a, T: Unpackable> Decoder<'a, T> {
}

pub fn skip_chunks(&mut self, n: usize) {
debug_assert!(n * T::Unpacked::LENGTH <= self.length);

for _ in (&mut self.packed).take(n) {}
self.length -= n * T::Unpacked::LENGTH;
}

pub fn take(&mut self) -> Self {
Expand All @@ -169,8 +172,6 @@ impl<'a, T: Unpackable> Decoder<'a, T> {
let length = self.length;
self.length = 0;

debug_assert_eq!(self.len(), 0);

Self {
packed,
num_bits: self.num_bits,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl<'a> BufferedBitpacked<'a> {
let unpacked_offset = n % <u32 as Unpackable>::Unpacked::LENGTH;
self.decoder.skip_chunks(num_chunks);
let (unpacked, unpacked_length) = self.decoder.chunked().next_inexact().unwrap();
debug_assert!(unpacked_offset < unpacked_length);

self.unpacked = unpacked;
self.unpacked_start = unpacked_offset;
Expand Down Expand Up @@ -267,7 +268,12 @@ impl<'a> HybridRleBuffered<'a> {
};

debug_assert!(num_skipped <= n);
debug_assert_eq!(num_skipped, start_length - self.len());
debug_assert_eq!(
num_skipped,
start_length - self.len(),
"{self:?}: {num_skipped} != {start_length} - {}",
self.len()
);

num_skipped
}
Expand Down
60 changes: 60 additions & 0 deletions crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/// complex to facilitate performance. We create this small fuzzer
use std::collections::VecDeque;

use arrow::pushable::Pushable;
use rand::Rng;

use super::*;
Expand Down Expand Up @@ -329,3 +330,62 @@ fn small_fuzz() -> ParquetResult<()> {
fn large_fuzz() -> ParquetResult<()> {
fuzz_loops(1_000_000)
}

#[test]
fn skip_fuzz() -> ParquetResult<()> {
let mut rng = rand::thread_rng();

const MAX_LENGTH: usize = 10_000;

let mut encoded = Vec::with_capacity(10000);

let mut bs: Vec<u32> = Vec::with_capacity(MAX_LENGTH);
let mut skips: VecDeque<usize> = VecDeque::with_capacity(2000);

let num_loops = 100_000;

for i in 0..num_loops {
skips.clear();
bs.clear();

let num_bits = rng.gen_range(0..=32);
let mask = 1u32.wrapping_shl(num_bits).wrapping_sub(1);

let length = rng.gen_range(1..=MAX_LENGTH);

unsafe { bs.set_len(length) };
rng.fill(&mut bs[..]);

let mut filled = 0;
while filled < bs.len() {
if rng.gen() {
let num_repeats = rng.gen_range(0..=(bs.len() - filled));
let value = bs[filled] & mask;
for j in 0..num_repeats {
bs[filled + j] = value;
}
filled += num_repeats;
} else {
bs[filled] &= mask;
filled += 1;
}
}

let mut num_done = 0;
while num_done < filled {
let num_skip = rng.gen_range(1..=filled - num_done);
num_done += num_skip;
skips.push_back(num_skip);
}

encoder::encode(&mut encoded, bs.iter().copied(), num_bits).unwrap();
let mut decoder = HybridRleDecoder::new(&encoded, num_bits, filled);

dbg!(&skips);
for s in &skips {
decoder.skip_in_place(*s).unwrap();
}
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ impl<'a> HybridRleDecoder<'a> {

debug_assert_eq!(num_skipped, start_num_values - self.num_values);
debug_assert!(num_skipped <= n, "{num_skipped} <= {n}");
debug_assert!(num_skipped > 0, "{num_skipped} > 0");
debug_assert!(indicator >> 1 == 0 || num_skipped > 0);

n -= num_skipped;
}
Expand Down

0 comments on commit cd972cd

Please sign in to comment.