Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add union_extract kernel #6387

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add tests, improve docs, simplify code
  • Loading branch information
gstvg committed Sep 22, 2024
commit c01d50acc670508d238e294ae844cd401f245ec4
25 changes: 20 additions & 5 deletions arrow-select/src/union_extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,13 @@ fn extract_dense_all_selected(

const EQ_SCALAR_CHUNK_SIZE: usize = 512;

/// The result of checking which type_ids matches the target type_id
#[derive(Debug, PartialEq)]
enum BoolValue {
/// If true, all type_ids matches the target type_id
/// If false, none type_ids matches the target type_id
Scalar(bool),
/// A mask represeting which type_ids matches the target type_id
Buffer(BooleanBuffer),
}

Expand Down Expand Up @@ -329,7 +333,7 @@ fn eq_scalar_inner(chunk_size: usize, type_ids: &[i8], target: i8) -> BoolValue
let set_bits = set_bits - set_bits % 64;

let mut buffer =
MutableBuffer::new(bit_util::ceil(type_ids.len(), 64) * 8).with_bitset(set_bits / 8, val);
MutableBuffer::new(bit_util::ceil(type_ids.len(), 8)).with_bitset(set_bits / 8, val);

buffer.extend(type_ids[set_bits..].chunks(64).map(|chunk| {
chunk
Expand All @@ -341,8 +345,6 @@ fn eq_scalar_inner(chunk_size: usize, type_ids: &[i8], target: i8) -> BoolValue
})
}));

buffer.truncate(bit_util::ceil(type_ids.len(), 8));

BoolValue::Buffer(BooleanBuffer::new(buffer.into(), 0, type_ids.len()))
}

Expand All @@ -357,8 +359,14 @@ fn is_sequential_generic<const N: usize>(offsets: &[i32]) -> bool {
return true;
}

// checks a common form of non sequential offsets, when sequential nulls reuses the same value,
// pointed by the same offset, while valid values offsets increases one by one
// fast check this common combination:
// 1: sequential nulls are represented as a single null value on the values array, pointed by the same offset multiple times
// 2: valid values offsets increase one by one.
// example for an union with a single field A with type_id 0:
// union = A=7 A=NULL A=NULL A=5 A=9
// a values = 7 NULL 5 9
// offsets = 0 1 1 2 3
// type_ids = 0 0 0 0 0
// this also checks if the last chunk/remainder is sequential relative to the first offset
if offsets[0] + offsets.len() as i32 - 1 != offsets[offsets.len() - 1] {
return false;
Expand Down Expand Up @@ -525,6 +533,13 @@ mod tests {
assert!(!is_sequential(&[1, 2, 3, 4, 5, 0, 7, 8]));
assert!(!is_sequential(&[1, 2, 3, 4, 5, 6, 0, 8]));
assert!(!is_sequential(&[1, 2, 3, 4, 5, 6, 7, 0]));

// checks increments at the chunk boundary
assert!(!is_sequential(&[1, 2, 3, 5]));
assert!(!is_sequential(&[1, 2, 3, 5, 6]));
assert!(!is_sequential(&[1, 2, 3, 5, 6, 7]));
assert!(!is_sequential(&[1, 2, 3, 4, 5, 6, 8]));
assert!(!is_sequential(&[1, 2, 3, 4, 5, 6, 8, 9]));
}

fn str1() -> UnionFields {
Expand Down
Loading