Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding compact check functionality to GenericByteViewArray #5720

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_schema::{ArrowError, DataType};
use std::any::Any;

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down Expand Up @@ -265,6 +267,30 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
phantom: Default::default(),
}
}

/// check if the array is a compact view
pub fn is_compact(&self) -> bool {
self.compact_check().iter().all(|&x| x)
}

/// Returns whether the buffers are compact
pub(self) fn compact_check(&self) -> Vec<bool> {
let mut checkers: Vec<_> = self
.buffers
.iter()
.map(|b| CompactChecker::new(b.len()))
.collect();

for (i, view) in self.views.iter().enumerate() {
let view = ByteView::from(*view);
if self.is_null(i) || view.length <= 12 {
continue;
}
checkers[view.buffer_index as usize]
.accumulate(view.offset as usize, view.length as usize);
}
checkers.into_iter().map(|c| c.finish()).collect()
}
}

impl<T: ByteViewType + ?Sized> Debug for GenericByteViewArray<T> {
Expand Down Expand Up @@ -482,6 +508,67 @@ impl From<Vec<Option<String>>> for StringViewArray {
}
}

/// A helper struct that used to check if the array is compact view
///
/// The checker is lazy and will not check the array until `finish` is called.
///
/// This is based on the assumption that the array will most likely to be not compact,
/// so it is likely to scan the entire array.
///
/// Then it is better to do the check at once, rather than doing it for each accumulate operation.
struct CompactChecker {
length: usize,
intervals: BTreeMap<usize, usize>,
}

impl CompactChecker {
/// Create a new checker with the expected length of the buffer
pub fn new(length: usize) -> Self {
Self {
length,
intervals: BTreeMap::new(),
}
}

/// Accumulate a new covered interval to the checker
pub fn accumulate(&mut self, offset: usize, length: usize) {
if length == 0 {
return;
}
let end = offset + length;
if end > self.length {
panic!(
"Invalid interval: offset {} length {} is out of bound of length {}",
offset, length, self.length
);
}
if let Some(val) = self.intervals.get_mut(&offset) {
if *val < end {
*val = end;
}
} else {
self.intervals.insert(offset, end);
}
}

/// Check if the checker is fully covered
pub fn finish(self) -> bool {
// check if the coverage is continuous and full
let mut last_end = 0;

for (start, end) in self.intervals.iter() {
if *start > last_end {
return false;
}
if *end > last_end {
last_end = *end;
}
}

last_end == self.length
}
}

#[cfg(test)]
mod tests {
use crate::builder::{BinaryViewBuilder, StringViewBuilder};
Expand Down Expand Up @@ -645,4 +732,112 @@ mod tests {

StringViewArray::new(views, buffers, None);
}

#[test]
#[should_panic(expected = "Invalid interval: offset 0 length 13 is out of bound of length 12")]
fn test_compact_checker() {
use super::CompactChecker;

// single coverage, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 10);
assert!(checker.finish());

// single coverage, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
assert!(!checker.finish());

// multiple coverage, no overlapping, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 4);
assert!(!checker.finish());

//multiple coverage, no overlapping, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 5);
assert!(checker.finish());

//multiple coverage, overlapping, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(4, 5);
assert!(!checker.finish());

//multiple coverage, overlapping, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(4, 6);
assert!(checker.finish());

//mutiple coverage, no overlapping, full, out of order
let mut checker = CompactChecker::new(10);
checker.accumulate(4, 6);
checker.accumulate(0, 4);
assert!(checker.finish());

// multiple coverage, overlapping, full, out of order
let mut checker = CompactChecker::new(10);
checker.accumulate(4, 6);
checker.accumulate(0, 4);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(5, 5);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(4, 6);
checker.accumulate(5, 5);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
//
// this case is for attacking those implementation that only check
// the lower-bound of the interval
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(1, 9);
checker.accumulate(2, 3);
checker.accumulate(3, 1);
checker.accumulate(9, 1);
assert!(checker.finish());

// panic case, out of bound
let mut checker = CompactChecker::new(12);
checker.accumulate(0, 13);
checker.finish();
}

#[test]
fn test_compact_check() {
let mut builder = StringViewBuilder::new().with_block_size(30);
builder.append_value("0".repeat(13));
builder.append_value("0".repeat(11));
builder.append_option(Some("0".repeat(13)));
builder.append_null();

// array should have only one buffer
let array = builder.finish();
let compact_check = array.compact_check();
assert!(array.is_compact());
assert_eq!(compact_check.len(), 1);
assert!(compact_check.iter().all(|&x| x));

// construct a non-compact array
let sliced = array.slice(0, 1);
let compact_check = sliced.compact_check();
assert_eq!(compact_check.len(), 1);
assert!(!compact_check.iter().all(|&x| x));
assert!(!sliced.is_compact());
}
}
Loading