Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ vortex = { workspace = true, features = [
"files",
"tokio",
"zstd",
"unstable_encodings",
] }
vortex-datafusion = { workspace = true }
vortex-duckdb = { workspace = true }
Expand Down
40 changes: 19 additions & 21 deletions encodings/fastlanes/src/delta/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
use arrayref::{array_mut_ref, array_ref};
use fastlanes::{Delta, FastLanes, Transpose};
use num_traits::{WrappingAdd, WrappingSub};
use vortex_array::ToCanonical;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::validity::Validity;
use vortex_array::vtable::ValidityHelper;
use vortex_array::{Array, ToCanonical};
use vortex_buffer::{Buffer, BufferMut};
use vortex_dtype::{NativePType, Nullability, match_each_unsigned_integer_ptype};
use vortex_dtype::{NativePType, match_each_unsigned_integer_ptype};
use vortex_error::VortexResult;

use crate::DeltaArray;
Expand All @@ -22,16 +22,10 @@ pub fn delta_compress(array: &PrimitiveArray) -> VortexResult<(PrimitiveArray, P
let (bases, deltas) = match_each_unsigned_integer_ptype!(array.ptype(), |T| {
const LANES: usize = T::LANES;
let (bases, deltas) = compress_primitive::<T, LANES>(array.as_slice::<T>());
let (base_validity, delta_validity) =
if array.validity().nullability() != Nullability::NonNullable {
(Validity::AllValid, Validity::AllValid)
} else {
(Validity::NonNullable, Validity::NonNullable)
};
(
// To preserve nullability, we include Validity
PrimitiveArray::new(bases, base_validity),
PrimitiveArray::new(deltas, delta_validity),
PrimitiveArray::new(bases, array.dtype().nullability().into()),
PrimitiveArray::new(deltas, array.validity().clone()),
)
});

Expand Down Expand Up @@ -103,7 +97,7 @@ pub fn delta_decompress(array: &DeltaArray) -> PrimitiveArray {

PrimitiveArray::new(
decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice()),
array.validity().clone(),
Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability()),
)
});

Expand Down Expand Up @@ -168,26 +162,30 @@ mod test {

#[test]
fn test_compress() {
do_roundtrip_test((0u32..10_000).collect::<Vec<_>>());
do_roundtrip_test::<u32>((0u32..10_000).collect());
}

#[test]
fn test_compress_nullable() {
do_roundtrip_test::<u32>(PrimitiveArray::from_option_iter(
(0u32..10_000).map(|i| (i % 2 == 0).then_some(i)),
));
}

#[test]
fn test_compress_overflow() {
do_roundtrip_test(
(0..10_000)
.map(|i| (i % (u8::MAX as i32)) as u8)
.collect::<Vec<_>>(),
);
do_roundtrip_test::<u8>((0..10_000).map(|i| (i % (u8::MAX as i32)) as u8).collect());
}

fn do_roundtrip_test<T: NativePType>(input: Vec<T>) {
let delta = DeltaArray::try_from_vec(input.clone()).unwrap();
fn do_roundtrip_test<T: NativePType>(input: PrimitiveArray) {
let delta = DeltaArray::try_from_primitive_array(&input).unwrap();
assert_eq!(delta.len(), input.len());
let decompressed = delta_decompress(&delta);
let decompressed_slice = decompressed.as_slice::<T>();
assert_eq!(decompressed_slice.len(), input.len());
for (actual, expected) in decompressed_slice.iter().zip(input) {
assert_eq!(actual, &expected);
for (actual, expected) in decompressed_slice.iter().zip(input.as_slice()) {
assert_eq!(actual, expected);
}
assert_eq!(decompressed.validity(), input.validity());
}
}
14 changes: 3 additions & 11 deletions encodings/fastlanes/src/delta/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_array::compute::{CastKernel, CastKernelAdapter, cast};
use vortex_array::vtable::ValidityHelper;
use vortex_array::{ArrayRef, IntoArray, register_kernel};
use vortex_dtype::DType;
use vortex_dtype::Nullability::NonNullable;
use vortex_error::VortexResult;

use crate::delta::{DeltaArray, DeltaVTable};
Expand All @@ -20,20 +20,12 @@ impl CastKernel for DeltaVTable {
}

// Cast both bases and deltas to the target type
let casted_bases = cast(array.bases(), dtype)?;
let casted_bases = cast(array.bases(), &dtype.with_nullability(NonNullable))?;
let casted_deltas = cast(array.deltas(), dtype)?;

// Create a new DeltaArray with the casted components
Ok(Some(
DeltaArray::try_from_delta_compress_parts(
casted_bases,
casted_deltas,
array
.validity()
.clone()
.cast_nullability(dtype.nullability(), array.len())?,
)?
.into_array(),
DeltaArray::try_from_delta_compress_parts(casted_bases, casted_deltas)?.into_array(),
))
}
}
Expand Down
77 changes: 30 additions & 47 deletions encodings/fastlanes/src/delta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use vortex_array::arrays::PrimitiveArray;
use vortex_array::stats::{ArrayStats, StatsSetRef};
use vortex_array::validity::Validity;
use vortex_array::vtable::{
ArrayVTable, CanonicalVTable, NotSupported, VTable, ValidityHelper,
ValidityVTableFromValidityHelper,
ArrayVTable, CanonicalVTable, NotSupported, VTable, ValidityChildSliceHelper,
ValidityVTableFromChildSliceHelper,
};
use vortex_array::{Array, ArrayRef, Canonical, EncodingId, EncodingRef, IntoArray, vtable};
use vortex_buffer::Buffer;
Expand All @@ -31,7 +31,7 @@ impl VTable for DeltaVTable {
type ArrayVTable = Self;
type CanonicalVTable = Self;
type OperationsVTable = Self;
type ValidityVTable = ValidityVTableFromValidityHelper;
type ValidityVTable = ValidityVTableFromChildSliceHelper;
type VisitorVTable = Self;
type ComputeVTable = NotSupported;
type EncodeVTable = NotSupported;
Expand All @@ -47,20 +47,6 @@ impl VTable for DeltaVTable {
}
}

#[derive(Clone, Debug)]
pub struct DeltaArray {
offset: usize,
len: usize,
dtype: DType,
bases: ArrayRef,
deltas: ArrayRef,
validity: Validity,
stats_set: ArrayStats,
}

#[derive(Clone, Debug)]
pub struct DeltaEncoding;

/// A FastLanes-style delta-encoded array of primitive values.
///
/// A [`DeltaArray`] comprises a sequence of _chunks_ each representing 1,024 delta-encoded values,
Expand Down Expand Up @@ -93,6 +79,21 @@ pub struct DeltaEncoding;
///
/// If the chunk physically has fewer than 1,024 values, then it is stored as a traditional,
/// non-SIMD-amenable, delta-encoded vector.
///
/// Note the validity is stored in the deltas array.
#[derive(Clone, Debug)]
pub struct DeltaArray {
offset: usize,
len: usize,
dtype: DType,
bases: ArrayRef,
deltas: ArrayRef,
stats_set: ArrayStats,
}

#[derive(Clone, Debug)]
pub struct DeltaEncoding;

impl DeltaArray {
// TODO(ngates): remove constructing from vec
pub fn try_from_vec<T: NativePType>(vec: Vec<T>) -> VortexResult<Self> {
Expand All @@ -105,26 +106,19 @@ impl DeltaArray {
pub fn try_from_primitive_array(array: &PrimitiveArray) -> VortexResult<Self> {
let (bases, deltas) = delta_compress(array)?;

Self::try_from_delta_compress_parts(
bases.into_array(),
deltas.into_array(),
Validity::NonNullable,
)
Self::try_from_delta_compress_parts(bases.into_array(), deltas.into_array())
}

pub fn try_from_delta_compress_parts(
bases: ArrayRef,
deltas: ArrayRef,
validity: Validity,
) -> VortexResult<Self> {
/// Create a [`DeltaArray`] from the given `bases` and `deltas` arrays.
/// Note the `deltas` might be nullable
pub fn try_from_delta_compress_parts(bases: ArrayRef, deltas: ArrayRef) -> VortexResult<Self> {
let logical_len = deltas.len();
Self::try_new(bases, deltas, validity, 0, logical_len)
Self::try_new(bases, deltas, 0, logical_len)
}

pub fn try_new(
bases: ArrayRef,
deltas: ArrayRef,
validity: Validity,
offset: usize,
logical_len: usize,
) -> VortexResult<Self> {
Expand All @@ -139,7 +133,7 @@ impl DeltaArray {
deltas.len()
)
}
if bases.dtype() != deltas.dtype() {
if !bases.dtype().eq_ignore_nullability(deltas.dtype()) {
vortex_bail!(
"DeltaArray: bases and deltas must have the same dtype, got {:?} and {:?}",
bases.dtype(),
Expand All @@ -157,16 +151,6 @@ impl DeltaArray {
vortex_bail!("DeltaArray: ptype must be an integer, got {}", ptype);
}

if let Some(vlen) = validity.maybe_len()
&& vlen != logical_len
{
vortex_bail!(
"DeltaArray: validity length ({}) must match logical_len ({})",
vlen,
logical_len
);
}

let lanes = lane_count(ptype);

if (deltas.len() % 1024 == 0) != (bases.len() % lanes == 0) {
Expand All @@ -179,23 +163,21 @@ impl DeltaArray {
}

// SAFETY: validation done above
Ok(unsafe { Self::new_unchecked(bases, deltas, validity, offset, logical_len) })
Ok(unsafe { Self::new_unchecked(bases, deltas, offset, logical_len) })
}

pub(crate) unsafe fn new_unchecked(
bases: ArrayRef,
deltas: ArrayRef,
validity: Validity,
offset: usize,
logical_len: usize,
) -> Self {
Self {
offset,
len: logical_len,
dtype: bases.dtype().clone(),
dtype: bases.dtype().with_nullability(deltas.dtype().nullability()),
bases,
deltas,
validity,
stats_set: Default::default(),
}
}
Expand Down Expand Up @@ -236,9 +218,10 @@ pub(crate) fn lane_count(ptype: PType) -> usize {
match_each_unsigned_integer_ptype!(ptype, |T| { T::LANES })
}

impl ValidityHelper for DeltaArray {
fn validity(&self) -> &Validity {
&self.validity
impl ValidityChildSliceHelper for DeltaArray {
fn unsliced_child_and_slice(&self) -> (&ArrayRef, usize, usize) {
let (start, len) = (self.offset(), self.len());
(self.deltas(), start, start + len)
}
}

Expand Down
15 changes: 3 additions & 12 deletions encodings/fastlanes/src/delta/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::cmp::min;
use std::ops::Range;

use vortex_array::vtable::{OperationsVTable, ValidityHelper};
use vortex_array::vtable::OperationsVTable;
use vortex_array::{Array, ArrayRef, IntoArray, ToCanonical};
use vortex_scalar::Scalar;

Expand All @@ -20,7 +20,6 @@ impl OperationsVTable<DeltaVTable> for DeltaVTable {

let bases = array.bases();
let deltas = array.deltas();
let validity = array.validity();
let lanes = array.lanes();

let new_bases = bases.slice(
Expand All @@ -31,18 +30,10 @@ impl OperationsVTable<DeltaVTable> for DeltaVTable {
min(start_chunk * 1024, array.deltas_len())..min(stop_chunk * 1024, array.deltas_len()),
);

let new_validity = validity.slice(range.clone());

// SAFETY: slicing valid bases/deltas preserves correctness
unsafe {
DeltaArray::new_unchecked(
new_bases,
new_deltas,
new_validity,
physical_start % 1024,
range.len(),
)
.into_array()
DeltaArray::new_unchecked(new_bases, new_deltas, physical_start % 1024, range.len())
.into_array()
}
}

Expand Down
21 changes: 4 additions & 17 deletions encodings/fastlanes/src/delta/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_array::serde::ArrayChildren;
use vortex_array::validity::Validity;
use vortex_array::vtable::{SerdeVTable, ValidityHelper, VisitorVTable};
use vortex_array::vtable::{SerdeVTable, VisitorVTable};
use vortex_array::{
Array, ArrayBufferVisitor, ArrayChildVisitor, DeserializeMetadata, ProstMetadata,
};
use vortex_buffer::ByteBuffer;
use vortex_dtype::{DType, PType, match_each_unsigned_integer_ptype};
use vortex_error::{VortexResult, vortex_bail, vortex_err};
use vortex_error::{VortexResult, vortex_err};

use super::DeltaEncoding;
use crate::{DeltaArray, DeltaVTable};
Expand Down Expand Up @@ -41,18 +40,7 @@ impl SerdeVTable<DeltaVTable> for DeltaVTable {
_buffers: &[ByteBuffer],
children: &dyn ArrayChildren,
) -> VortexResult<DeltaArray> {
let validity = if children.len() == 2 {
Validity::from(dtype.nullability())
} else if children.len() == 3 {
let validity = children.get(2, &Validity::DTYPE, len)?;
Validity::Array(validity)
} else {
vortex_bail!(
"DeltaArray: expected 2 or 3 children, got {}",
children.len()
);
};

assert_eq!(children.len(), 2);
let ptype = PType::try_from(dtype)?;
let lanes =
match_each_unsigned_integer_ptype!(ptype, |T| { <T as fastlanes::FastLanes>::LANES });
Expand All @@ -67,7 +55,7 @@ impl SerdeVTable<DeltaVTable> for DeltaVTable {
let bases = children.get(0, dtype, bases_len)?;
let deltas = children.get(1, dtype, deltas_len)?;

DeltaArray::try_new(bases, deltas, validity, metadata.offset as usize, len)
DeltaArray::try_new(bases, deltas, metadata.offset as usize, len)
}
}

Expand All @@ -77,7 +65,6 @@ impl VisitorVTable<DeltaVTable> for DeltaVTable {
fn visit_children(array: &DeltaArray, visitor: &mut dyn ArrayChildVisitor) {
visitor.visit_child("bases", array.bases());
visitor.visit_child("deltas", array.deltas());
visitor.visit_validity(array.validity(), array.len());
}
}

Expand Down
Loading
Loading