Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 18 additions & 30 deletions encodings/fastlanes/benches/pipeline_bitpacking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,33 +66,21 @@ pub fn decompress_bitpacking_late_filter<T: NativePType>(bencher: Bencher, fract
.bench_values(|mask| filter(array.to_canonical().as_ref(), &mask).unwrap());
}

// TODO(ngates): bring back benchmarks once operator API is stable.
// #[divan::bench(types = [i8, i16, i32, i64], args = TRUE_COUNT)]
// pub fn decompress_bitpacking_pipeline_filter<T: Element + NativePType>(
// bencher: Bencher,
// fraction_kept: f64,
// ) {
// let mut rng = StdRng::seed_from_u64(0);
// let values = (0..LENGTH)
// .map(|_| T::from(rng.random_range(0..100)).unwrap())
// .collect::<BufferMut<T>>()
// .into_array()
// .to_primitive();
// let array = bitpack_to_best_bit_width(&values).unwrap();
//
// let mask = (0..LENGTH)
// .map(|_| rng.random_bool(fraction_kept))
// .collect::<BooleanBuffer>();
//
// bencher
// .with_inputs(|| Mask::from_buffer(mask.clone()))
// .bench_local_values(|mask| {
// export_canonical_pipeline_expr(
// array.dtype(),
// array.len(),
// array.to_operator().unwrap().unwrap().as_ref(),
// &mask,
// )
// .unwrap()
// });
// }
#[divan::bench(types = [i8, i16, i32, i64], args = TRUE_COUNT)]
pub fn decompress_bitpacking_pipeline_filter<T: NativePType>(bencher: Bencher, fraction_kept: f64) {
let mut rng = StdRng::seed_from_u64(0);
let values = (0..LENGTH)
.map(|_| T::from(rng.random_range(0..100)).unwrap())
.collect::<BufferMut<T>>()
.into_array()
.to_primitive();
let array = bitpack_to_best_bit_width(&values).unwrap();

let mask = (0..LENGTH)
.map(|_| rng.random_bool(fraction_kept))
.collect::<BitBuffer>();

bencher
.with_inputs(|| Mask::from(mask.clone()))
.bench_local_values(|mask| array.execute_with_selection(&mask).unwrap());
}
4 changes: 0 additions & 4 deletions encodings/fastlanes/benches/pipeline_v2_bitpacking_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ const BENCH_PARAMS: &[(usize, f64)] = &[
(10_000, 1.0),
(100_000, 0.5),
(100_000, 1.0),
(1_000_000, 0.5),
(1_000_000, 1.0),
(10_000_000, 0.5),
(10_000_000, 1.0),
];

#[divan::bench(args = BENCH_PARAMS)]
Expand Down
119 changes: 50 additions & 69 deletions encodings/fastlanes/src/bitpacking/array/bitpack_pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::mem::{transmute, transmute_copy};

use fastlanes::{BitPacking, FastLanes};
use static_assertions::const_assert_eq;
use vortex_array::pipeline::{
Expand All @@ -9,7 +11,7 @@ use vortex_array::pipeline::{
use vortex_buffer::Buffer;
use vortex_dtype::{PTypeDowncastExt, PhysicalPType, match_each_integer_ptype};
use vortex_error::VortexResult;
use vortex_mask::Mask;
use vortex_mask::MaskMut;
use vortex_vector::primitive::PVectorMut;
use vortex_vector::{VectorMut, VectorMutOps};

Expand Down Expand Up @@ -56,7 +58,9 @@ impl PipelinedNode for BitPackedArray {
Ok(Box::new(AlignedBitPackedKernel::<T>::new(
packed_bit_width,
packed_buffer,
self.validity.to_mask(self.len()),
// FIXME(ngates): if we make sure the mask has offset zero, we know that split_off
// inside the kernel is free.
self.validity.to_mask(self.len()).into_mut(),
)) as Box<dyn Kernel>)
})
}
Expand Down Expand Up @@ -85,7 +89,7 @@ pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
packed_buffer: Buffer<BP::Physical>,

/// The validity mask for the bitpacked array.
validity: Mask,
validity: MaskMut,

/// The total number of bitpacked chunks we have unpacked.
num_chunks_unpacked: usize,
Expand All @@ -94,8 +98,9 @@ pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
pub fn new(
packed_bit_width: usize,
// TODO(ngates): hold an iterator over chunks instead of the full buffer?
packed_buffer: Buffer<BP::Physical>,
validity: Mask,
validity: MaskMut,
) -> Self {
let packed_stride =
packed_bit_width * <<BP as PhysicalPType>::Physical as FastLanes>::LANES;
Expand All @@ -119,30 +124,28 @@ impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<BP> {
fn step(
&mut self,
_ctx: &KernelCtx,
_ctx: &mut KernelCtx,
selection: &BitView,
out: &mut VectorMut,
) -> VortexResult<()> {
let output_vector: &mut PVectorMut<BP::Physical> = out.as_primitive_mut().downcast();
debug_assert!(output_vector.is_empty());
out: VectorMut,
) -> VortexResult<VectorMut> {
if selection.true_count() == 0 {
debug_assert!(out.is_empty());
return Ok(out);
}

let packed_offset = self.num_chunks_unpacked * self.packed_stride;
let not_yet_unpacked_values = &self.packed_buffer.as_slice()[packed_offset..];
let mut output: PVectorMut<BP> = out.into_primitive().downcast();
debug_assert!(output.is_empty());

let true_count = selection.true_count();
let chunk_offset = self.num_chunks_unpacked * N;
let array_len = self.validity.len();
debug_assert!(chunk_offset < array_len);
let packed_offset = self.num_chunks_unpacked * self.packed_stride;
let packed_bytes = &self.packed_buffer[packed_offset..][..self.packed_stride];

// If the true count is very small (the selection is sparse), we can unpack individual
// elements directly into the output vector.
if true_count < SCALAR_UNPACK_THRESHOLD {
output_vector.reserve(true_count);
debug_assert!(true_count <= output_vector.capacity());
if selection.true_count() < SCALAR_UNPACK_THRESHOLD {
output.reserve(selection.true_count());

selection.iter_ones(|idx| {
let absolute_idx = chunk_offset + idx;
if self.validity.value(absolute_idx) {
if self.validity.value(idx) {
// SAFETY:
// - The documentation for `packed_bit_width` explains that the size is valid.
// - We know that the size of the `next_packed_chunk` we provide is equal to
Expand All @@ -151,72 +154,50 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
let unpacked_value = unsafe {
BitPacking::unchecked_unpack_single(
self.packed_bit_width,
not_yet_unpacked_values,
packed_bytes,
idx,
)
};

// SAFETY: We just reserved enough capacity to push these values.
unsafe { output_vector.push_unchecked(unpacked_value) };
unsafe { output.push_unchecked(transmute_copy(&unpacked_value)) };
} else {
output_vector.append_nulls(1);
output.append_nulls(1);
}
});
} else {
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
// element lane with SIMD / FastLanes and let other nodes in the pipeline decide if they
// want to perform the selection filter themselves.
output_vector.reserve(N);
debug_assert!(N <= output_vector.capacity());

let next_packed_chunk = &not_yet_unpacked_values[..self.packed_stride];
debug_assert_eq!(
next_packed_chunk.len(),
FL_VECTOR_SIZE * self.packed_bit_width / BP::Physical::T
);

// SAFETY: We have just reserved enough capacity for the elements buffer to set the
// length, and we are about to initialize all of the values **without** reading the
// memory.
unsafe { output_vector.elements_mut().set_len(N) };

// SAFETY:
// - The documentation for `packed_bit_width` explains that the size is valid.
// - We know that the size of the `next_packed_chunk` we provide is equal to
// `self.packed_stride`, and we explain why this is correct in its documentation.
// - It is clear that the output buffer has length 1024.
unsafe {
BitPacking::unchecked_unpack(
self.packed_bit_width,
next_packed_chunk,
output_vector.as_mut(),
);
}
self.num_chunks_unpacked += 1;
return Ok(output.into());
}

if array_len < chunk_offset + N {
let vector_len = array_len - chunk_offset;
debug_assert!(vector_len < N, "math is broken");
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
// element lane with SIMD / FastLanes and let other nodes in the pipeline decide if they
// want to perform the selection filter themselves.
let (mut elements, _validity) = output.into_parts();

// SAFETY: This must be less than `N` so this is just a truncate.
unsafe { output_vector.elements_mut().set_len(vector_len) };
elements.reserve(N);
// SAFETY: we just reserved enough capacity.
unsafe { elements.set_len(N) };

let chunk_mask = self.validity.slice(chunk_offset..array_len);
unsafe {
BitPacking::unchecked_unpack(
self.packed_bit_width,
packed_bytes,
transmute::<&mut [BP], &mut [BP::Physical]>(elements.as_mut()),
);
}

// SAFETY: We have just set the elements length to N, and the validity buffer has
// capacity for N elements.
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
} else {
let chunk_mask = self.validity.slice(chunk_offset..chunk_offset + N);
// Prepare the output validity mask for this chunk.
let mut chunk_validity = self.validity.split_off(N.min(self.validity.capacity()));
std::mem::swap(&mut self.validity, &mut chunk_validity);

// SAFETY: We have just set the elements length to N, and the validity buffer has
// capacity for N elements.
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
}
}
// For the final chunk, we may have fewer than N elements to unpack.
// So we just set the length of the output to the correct value.
unsafe { elements.set_len(chunk_validity.len()) };

self.num_chunks_unpacked += 1;

Ok(())
Ok(PVectorMut::new(elements, chunk_validity).into())
}
}

Expand Down
10 changes: 5 additions & 5 deletions vortex-array/src/pipeline/driver/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ impl InputKernel {
impl Kernel for InputKernel {
fn step(
&mut self,
_ctx: &KernelCtx,
_ctx: &mut KernelCtx,
selection: &BitView,
out: &mut VectorMut,
) -> VortexResult<()> {
mut out: VectorMut,
) -> VortexResult<VectorMut> {
let mut batch = self
.batch
.take()
Expand All @@ -47,7 +47,7 @@ impl Kernel for InputKernel {
selection.iter_ones(|idx| {
out.extend_from_vector(&immutable.slice(idx..idx + 1));
});
return Ok(());
return Ok(out);
}

// We split off from our owned batch vector in chunks of size N, and then unsplit onto the
Expand All @@ -66,7 +66,7 @@ impl Kernel for InputKernel {

self.batch = Some(batch);

Ok(())
Ok(out)
}
}

Expand Down
Loading
Loading