Skip to content
51 changes: 51 additions & 0 deletions arrow-buffer/src/builder/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,57 @@ impl NullBufferBuilder {
}
}

/// Extends this builder with validity values from a trusted length iterator.
///
/// This is more efficient than calling `append` in a loop as it processes
/// 64 bits at a time internally.
///
/// # Safety
///
/// The iterator must report its length correctly via `size_hint()`.
/// Using an iterator that reports an incorrect length is undefined behavior.
///
/// # Example
/// ```
/// # use arrow_buffer::NullBufferBuilder;
/// let mut builder = NullBufferBuilder::new(8);
/// let validities = [true, false, true, true];
/// // SAFETY: slice iterator reports correct length
/// unsafe { builder.extend_from_trusted_len_iter(validities.iter().copied()) };
/// assert_eq!(builder.len(), 4);
/// ```
pub unsafe fn extend_from_trusted_len_iter<I: Iterator<Item = bool>>(&mut self, iter: I) {
let (_, upper) = iter.size_hint();
let len = upper.expect("extend_from_trusted_len_iter requires an upper limit");

if len == 0 {
return;
}

// Materialize since we're about to append bits
self.materialize_if_needed();

let buf = self.bitmap_builder.as_mut().unwrap();
let start_len = buf.len();
// Advance to allocate space, initializing new bits to 0
buf.advance(len);

let slice = buf.as_slice_mut();
let mut bit_idx = start_len;

// Process bits - set bit if true (buffer initialized to 0, so false bits are already correct)
for valid in iter {
if valid {
let byte_idx = bit_idx / 8;
let bit_offset = bit_idx % 8;
slice[byte_idx] |= 1 << bit_offset;
}
bit_idx += 1;
}

debug_assert_eq!(bit_idx, start_len + len);
}

/// Builds the null buffer and resets the builder.
/// Returns `None` if the builder only contains `true`s.
pub fn finish(&mut self) -> Option<NullBuffer> {
Expand Down
239 changes: 233 additions & 6 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::filter::filter_record_batch;
use crate::filter::{
FilterBuilder, FilterPredicate, IndexIterator, filter_record_batch,
is_optimize_beneficial_record_batch,
};
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
use arrow_schema::{ArrowError, DataType, SchemaRef};
Expand Down Expand Up @@ -211,7 +214,10 @@ impl BatchCoalescer {
/// Push a batch into the Coalescer after applying a filter
///
/// This is semantically equivalent of calling [`Self::push_batch`]
/// with the results from [`filter_record_batch`]
/// with the results from [`filter_record_batch`], but avoids
/// materializing the intermediate filtered batch.
///
/// [`filter_record_batch`]: crate::filter::filter_record_batch
///
/// # Example
/// ```
Expand All @@ -237,10 +243,109 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
// TODO: optimize this to avoid materializing (copying the results
// of filter to a new batch)
let filtered_batch = filter_record_batch(&batch, filter)?;
self.push_batch(filtered_batch)
// We only support primitve now, fallback to filter_record_batch for other types
// Also, skip optimization when filter is not very selective
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if always better to take into account biggest_coalesce_batch_size

if batch
.schema()
.fields()
.iter()
.any(|field| !field.data_type().is_primitive())
|| self
.biggest_coalesce_batch_size
.map(|biggest_size| filter.true_count() > biggest_size)
.unwrap_or(false)
{
let batch = filter_record_batch(&batch, filter)?;

self.push_batch(batch)?;
return Ok(());
}

// Build an optimized filter predicate that chooses the best iteration strategy
let is_optimize_beneficial = is_optimize_beneficial_record_batch(&batch);
let selected_count = filter.true_count();

// Fast path: skip if no rows selected
if selected_count == 0 {
return Ok(());
}

// Fast path: if all rows selected, just push the batch
if selected_count == batch.num_rows() {
return self.push_batch(batch);
}

let (_schema, arrays, _num_rows) = batch.into_parts();

// Setup input arrays as sources
assert_eq!(arrays.len(), self.in_progress_arrays.len());
self.in_progress_arrays
.iter_mut()
.zip(&arrays)
.for_each(|(in_progress, array)| {
in_progress.set_source(Some(Arc::clone(array)));
});

// Choose iteration strategy based on the optimized predicate
self.copy_from_filter(filter, is_optimize_beneficial, selected_count)?;

// Clear sources to allow memory to be freed
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.set_source(None);
}

Ok(())
}

/// Helper to copy rows at the given indices, handling batch boundaries efficiently
///
/// This method batches the index iteration to avoid per-row batch boundary checks.
fn copy_from_filter(
&mut self,
filter: &BooleanArray,
is_optimize_beneficial: bool,
count: usize,
) -> Result<(), ArrowError> {
let mut remaining = count;
let mut filter_pos = 0; // Position in the filter array

// We need to process the filter in chunks that fit the target batch size
while remaining > 0 {
let space_in_batch = self.target_batch_size - self.buffered_rows;
let to_copy = remaining.min(space_in_batch);

// Find how many filter positions we need to cover `to_copy` set bits
// Skip the expensive search if all remaining rows fit in the current batch
let chunk_len = if remaining <= space_in_batch {
filter.len() - filter_pos
} else {
find_nth_set_bit_position(filter, filter_pos, to_copy) - filter_pos
};

let chunk_filter = filter.slice(filter_pos, chunk_len);
let mut filter_builder = FilterBuilder::new(&chunk_filter);

if is_optimize_beneficial {
filter_builder = filter_builder.optimize();
}
let chunk_predicate = filter_builder.build();

// Copy all collected indices in one call per array
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows_by_filter(&chunk_predicate)?;
}

self.buffered_rows += to_copy;
filter_pos += chunk_len;
remaining -= to_copy;

// If we've filled the batch, finish it
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}
}

Ok(())
}

/// Push all the rows from `batch` into the Coalescer
Expand Down Expand Up @@ -526,6 +631,118 @@ impl BatchCoalescer {
}
}

/// Find the position after the n-th set bit in a boolean array starting from `start`.
/// Returns the position after the n-th set bit, or the end of the array if fewer than n bits are set.
fn find_nth_set_bit_position(filter: &BooleanArray, start: usize, n: usize) -> usize {
if n == 0 {
return start;
}

let values = filter.values();
let mut remaining = n;

// Get the underlying buffer and interpret as u64 chunks for fast iteration
let inner = values.inner();
let (prefix, chunks, suffix) = unsafe { inner.as_slice().align_to::<u64>() };

// Handle prefix bytes (before alignment)
let prefix_bits = prefix.len() * 8;
if start < prefix_bits {
for (byte_idx, &byte) in prefix.iter().enumerate().skip(start / 8) {
let masked_byte = if byte_idx == start / 8 {
byte & (!0u8 << (start % 8))
} else {
byte
};
let ones = masked_byte.count_ones() as usize;
if ones >= remaining {
let mut bits = masked_byte;
for bit_pos in 0..8 {
if bits & 1 != 0 {
remaining -= 1;
if remaining == 0 {
return (byte_idx * 8 + bit_pos + 1).min(filter.len());
}
}
bits >>= 1;
}
} else {
remaining -= ones;
}
}
}

// Handle aligned u64 chunks
let chunk_start = if start <= prefix_bits {
0
} else {
(start - prefix_bits) / 64
};

for (chunk_idx, &chunk) in chunks.iter().enumerate().skip(chunk_start) {
let global_bit_pos = prefix_bits + chunk_idx * 64;

// Mask off bits before our start position in the first relevant chunk
let masked_chunk = if global_bit_pos < start && start < global_bit_pos + 64 {
chunk & (!0u64 << (start - global_bit_pos))
} else if global_bit_pos < start {
continue;
} else {
chunk
};

let ones = masked_chunk.count_ones() as usize;

if ones >= remaining {
// The n-th bit is in this chunk - find exact position
let mut bits = masked_chunk;
for bit_pos in 0..64 {
if bits & 1 != 0 {
remaining -= 1;
if remaining == 0 {
return (global_bit_pos + bit_pos + 1).min(filter.len());
}
}
bits >>= 1;
}
} else {
remaining -= ones;
}
}

// Handle suffix bytes (after alignment)
let suffix_start = prefix_bits + chunks.len() * 64;
for (byte_idx, &byte) in suffix.iter().enumerate() {
let global_byte_pos = suffix_start + byte_idx * 8;
if global_byte_pos + 8 <= start {
continue;
}
let masked_byte = if global_byte_pos < start {
byte & (!0u8 << (start - global_byte_pos))
} else {
byte
};
let ones = masked_byte.count_ones() as usize;
if ones >= remaining {
let mut bits = masked_byte;
for bit_pos in 0..8 {
if bits & 1 != 0 {
remaining -= 1;
if remaining == 0 {
return (global_byte_pos + bit_pos + 1).min(filter.len());
}
}
bits >>= 1;
}
} else {
remaining -= ones;
}
}

// Fewer than n set bits found, return end of array
filter.len()
}

/// Return a new `InProgressArray` for the given data type
fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
macro_rules! instantiate_primitive {
Expand Down Expand Up @@ -571,6 +788,15 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;

/// Copy rows at the given indices from the current source array into the in-progress array
fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
// Default implementation: iterate over indices from the filter
for idx in IndexIterator::new(filter.filter_array(), filter.count()) {
self.copy_rows(idx, 1)?;
}
Ok(())
}

/// Finish the currently in-progress array and return it as an `ArrayRef`
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
Expand All @@ -579,6 +805,7 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
mod tests {
use super::*;
use crate::concat::concat_batches;
use crate::filter::filter_record_batch;
use arrow_array::builder::StringViewBuilder;
use arrow_array::cast::AsArray;
use arrow_array::{
Expand Down
Loading
Loading