Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 90 additions & 2 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ where
self.last_value = value;
buffer[0] = value;
read += 1;
self.values_left -= 1;
}

while read != to_read {
Expand All @@ -653,6 +654,14 @@ where
.bit_reader
.get_batch(&mut buffer[read..read + batch_to_read], bit_width);

if batch_read != batch_to_read {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this change, the decoder would get stuck in an infinite loop if given a truncated page. I discovered this by accident whilst writing test_delta_bit_packed_padding

return Err(general_err!(
"Expected to read {} values from miniblock got {}",
batch_to_read,
batch_read
));
}

// At this point we have read the deltas to `buffer` we now need to offset
// these to get back to the original values that were encoded
for v in &mut buffer[read..read + batch_read] {
Expand All @@ -668,9 +677,9 @@ where

read += batch_read;
self.mini_block_remaining -= batch_read;
self.values_left -= batch_read;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for #1417 - the problem was that as self.value_left was only decremented outside the loop, the code in next_block was unable to correctly determine when the bit widths were padding

}

self.values_left -= to_read;
Ok(to_read)
}

Expand Down Expand Up @@ -928,7 +937,9 @@ mod tests {
ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType,
};
use crate::util::{
bit_util::set_array_bit, memory::MemTracker, test_common::RandGen,
bit_util::set_array_bit,
memory::{BufferPtr, MemTracker},
test_common::RandGen,
};

#[test]
Expand Down Expand Up @@ -1326,6 +1337,83 @@ mod tests {
assert_eq!(result, vec![29, 43, 89]);
}

#[test]
fn test_delta_bit_packed_padding() {
// Page header
let header = vec![
// Page Header

// Block Size - 256
128,
2,
// Miniblocks in block,
4,
// Total value count - 419
128 + 35,
3,
// First value - 7
7,
];

// Block Header
let block1_header = vec![
0, // Min delta
0, 1, 0, 0, // Bit widths
];

// Mini-block 1 - bit width 0 => 0 bytes
// Mini-block 2 - bit width 1 => 8 bytes
// Mini-block 3 - bit width 0 => 0 bytes
// Mini-block 4 - bit width 0 => 0 bytes
let block1 = vec![0xFF; 8];

// Block Header
let block2_header = vec![
0, // Min delta
0, 1, 2, 0xFF, // Bit widths, including non-zero padding
];

// Mini-block 1 - bit width 0 => 0 bytes
// Mini-block 2 - bit width 1 => 8 bytes
// Mini-block 3 - bit width 2 => 16 bytes
// Mini-block 4 - padding => no bytes
let block2 = vec![0xFF; 24];

let data: Vec<u8> = header
.into_iter()
.chain(block1_header)
.chain(block1)
.chain(block2_header)
.chain(block2)
.collect();

let length = data.len();

let ptr = BufferPtr::new(data);
let mut reader = BitReader::new(ptr.clone());
assert_eq!(reader.get_vlq_int().unwrap(), 256);
assert_eq!(reader.get_vlq_int().unwrap(), 4);
assert_eq!(reader.get_vlq_int().unwrap(), 419);
assert_eq!(reader.get_vlq_int().unwrap(), 7);

// Test output buffer larger than needed and not exact multiple of block size
let mut output = vec![0_i32; 420];

let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
decoder.set_data(ptr.clone(), 0).unwrap();
assert_eq!(decoder.get(&mut output).unwrap(), 419);
assert_eq!(decoder.get_offset(), length);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix for #1417 this would fail


// Test with truncated buffer
decoder.set_data(ptr.range(0, 12), 0).unwrap();
let err = decoder.get(&mut output).unwrap_err().to_string();
assert!(
err.contains("Expected to read 64 values from miniblock got 8"),
"{}",
err
);
}

#[test]
fn test_delta_byte_array_same_arrays() {
let data = vec![
Expand Down
13 changes: 10 additions & 3 deletions parquet/src/encodings/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
return Ok(());
}

let mut min_delta = i64::max_value();
let mut min_delta = i64::MAX;
for i in 0..self.values_in_block {
min_delta = cmp::min(min_delta, self.deltas[i]);
}
Expand All @@ -581,6 +581,13 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
// values left
let n = cmp::min(self.mini_block_size, self.values_in_block);
if n == 0 {
// Decoders should be agnostic to the padding value, we therefore use 0xFF
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for #1416, I opted to pad with a non-zero value for tests so we have coverage of that scenario, but not produce such data outside of tests for better ecosystem-interop

// when running tests. However, not all implementations may handle this correctly
// so pad with 0 when not running tests
let pad_value = cfg!(test).then(|| 0xFF).unwrap_or(0);
for j in i..self.num_mini_blocks {
self.bit_writer.write_at(offset + j, pad_value);
}
break;
}

Expand Down Expand Up @@ -610,8 +617,8 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
self.values_in_block -= n;
}

assert!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change -- the older assert seemed to be more specific, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah - I was under the impression assert_eq! logged the textual representation of the arguments, but it does not - will fix

self.values_in_block == 0,
assert_eq!(
self.values_in_block, 0,
"Expected 0 values in block, found {}",
self.values_in_block
);
Expand Down