Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

117 changes: 63 additions & 54 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use vortex_array::ExecutionCtx;
use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::VectorExecutor;
use vortex_array::buffer::BufferHandle;
use vortex_array::patches::Patches;
use vortex_array::patches::PatchesMetadata;
Expand All @@ -43,14 +42,11 @@ use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_vector::Vector;

use crate::ALPFloat;
use crate::alp::Exponents;
use crate::alp::alp_encode;
use crate::alp::decompress::decompress_into_array;
use crate::alp::decompress::decompress_into_vector;
use crate::match_each_alp_float_ptype;

vtable!(ALP);

Expand Down Expand Up @@ -187,28 +183,9 @@ impl VTable for ALPVTable {
Ok(())
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
let encoded = array.encoded().execute(ctx)?;
let patches = if let Some(patches) = array.patches() {
Some((
patches.indices().execute(ctx)?,
patches.values().execute(ctx)?,
patches
.chunk_offsets()
.as_ref()
.map(|co| co.execute(ctx))
.transpose()?,
))
} else {
None
};

let patches_offset = array.patches().map(|p| p.offset()).unwrap_or(0);
let exponents = array.exponents();

match_each_alp_float_ptype!(array.dtype().as_ptype(), |T| {
decompress_into_vector::<T>(encoded, exponents, patches, patches_offset)
})
fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<Canonical> {
// TODO(joe): take by value
Ok(decompress_into_array(array.clone()).to_canonical())
}
}

Expand Down Expand Up @@ -504,12 +481,11 @@ mod tests {
use rstest::rstest;
use vortex_array::ToCanonical;
use vortex_array::VectorExecutor;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::session::ArraySession;
use vortex_array::vtable::ValidityHelper;
use vortex_dtype::PTypeDowncast;
use vortex_session::VortexSession;
use vortex_vector::VectorOps;

use super::*;

Expand All @@ -530,14 +506,20 @@ mod tests {
let values = PrimitiveArray::from_iter((0..size).map(|i| i as f32));
let encoded = alp_encode(&values, None).unwrap();

let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded.to_array().execute(&mut ctx).unwrap()
};
// Compare against the traditional array-based decompress path
let expected = decompress_into_array(encoded);

assert_eq!(result_vector.len(), size);
assert_eq!(result_canonical.len(), size);

let result_primitive = result_vector.into_primitive().into_f32();
assert_eq!(result_primitive.as_ref(), expected.as_slice::<f32>());
let result_primitive = result_canonical.into_primitive();
assert_eq!(
result_primitive.as_slice::<f32>(),
expected.as_slice::<f32>()
);
}

#[rstest]
Expand All @@ -554,14 +536,20 @@ mod tests {
let values = PrimitiveArray::from_iter((0..size).map(|i| i as f64));
let encoded = alp_encode(&values, None).unwrap();

let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded.to_array().execute(&mut ctx).unwrap()
};
// Compare against the traditional array-based decompress path
let expected = decompress_into_array(encoded);

assert_eq!(result_vector.len(), size);
assert_eq!(result_canonical.len(), size);

let result_primitive = result_vector.into_primitive().into_f64();
assert_eq!(result_primitive.as_ref(), expected.as_slice::<f64>());
let result_primitive = result_canonical.into_primitive();
assert_eq!(
result_primitive.as_slice::<f64>(),
expected.as_slice::<f64>()
);
}

#[rstest]
Expand All @@ -584,14 +572,20 @@ mod tests {
let encoded = alp_encode(&array, None).unwrap();
assert!(encoded.patches().unwrap().array_len() > 0);

let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded.to_array().execute(&mut ctx).unwrap()
};
// Compare against the traditional array-based decompress path
let expected = decompress_into_array(encoded);

assert_eq!(result_vector.len(), size);
assert_eq!(result_canonical.len(), size);

let result_primitive = result_vector.into_primitive().into_f64();
assert_eq!(result_primitive.as_ref(), expected.as_slice::<f64>());
let result_primitive = result_canonical.into_primitive();
assert_eq!(
result_primitive.as_slice::<f64>(),
expected.as_slice::<f64>()
);
}

#[rstest]
Expand All @@ -612,19 +606,25 @@ mod tests {
let array = PrimitiveArray::from_option_iter(values);
let encoded = alp_encode(&array, None).unwrap();

let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded.to_array().execute(&mut ctx).unwrap()
};
// Compare against the traditional array-based decompress path
let expected = decompress_into_array(encoded);

assert_eq!(result_vector.len(), size);
assert_eq!(result_canonical.len(), size);

let result_primitive = result_vector.into_primitive().into_f32();
assert_eq!(result_primitive.as_ref(), expected.as_slice::<f32>());
let result_primitive = result_canonical.into_primitive();
assert_eq!(
result_primitive.as_slice::<f32>(),
expected.as_slice::<f32>()
);

// Test validity masks match
for idx in 0..size {
assert_eq!(
result_primitive.validity().value(idx),
result_primitive.validity().is_valid(idx),
expected.validity().is_valid(idx)
);
}
Expand All @@ -651,19 +651,25 @@ mod tests {
let encoded = alp_encode(&array, None).unwrap();
assert!(encoded.patches().unwrap().array_len() > 0);

let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded.to_array().execute(&mut ctx).unwrap()
};
// Compare against the traditional array-based decompress path
let expected = decompress_into_array(encoded);

assert_eq!(result_vector.len(), size);
assert_eq!(result_canonical.len(), size);

let result_primitive = result_vector.into_primitive().into_f64();
assert_eq!(result_primitive.as_ref(), expected.as_slice::<f64>());
let result_primitive = result_canonical.into_primitive();
assert_eq!(
result_primitive.as_slice::<f64>(),
expected.as_slice::<f64>()
);

// Test validity masks match
for idx in 0..size {
assert_eq!(
result_primitive.validity().value(idx),
result_primitive.validity().is_valid(idx),
expected.validity().is_valid(idx)
);
}
Expand Down Expand Up @@ -693,21 +699,24 @@ mod tests {
let slice_len = slice_end - slice_start;
let sliced_encoded = encoded.slice(slice_start..slice_end);

let result_vector = sliced_encoded.execute_vector(&SESSION).unwrap();
let result_primitive = result_vector.into_primitive().into_f64();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
sliced_encoded.execute(&mut ctx).unwrap()
};
let result_primitive = result_canonical.into_primitive();

for idx in 0..slice_len {
let expected_value = values[slice_start + idx];

let result_valid = result_primitive.validity().value(idx);
let result_valid = result_primitive.validity().is_valid(idx);
assert_eq!(
result_valid,
expected_value.is_some(),
"Validity mismatch at idx={idx}",
);

if let Some(expected_val) = expected_value {
let result_val = result_primitive.as_ref()[idx];
let result_val = result_primitive.as_slice::<f64>()[idx];
assert_eq!(result_val, expected_val, "Value mismatch at idx={idx}",);
}
}
Expand Down
47 changes: 0 additions & 47 deletions encodings/alp/src/alp/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::mem::transmute;

use num_traits::AsPrimitive;
use vortex_array::ArrayRef;
use vortex_array::ToCanonical;
use vortex_array::arrays::PrimitiveArray;
Expand All @@ -13,13 +12,7 @@ use vortex_array::patches::Patches;
use vortex_array::vtable::ValidityHelper;
use vortex_buffer::BufferMut;
use vortex_dtype::DType;
use vortex_dtype::NativePType;
use vortex_dtype::match_each_unsigned_integer_ptype;
use vortex_error::VortexResult;
use vortex_vector::Vector;
use vortex_vector::VectorMutOps;
use vortex_vector::VectorOps;
use vortex_vector::primitive::PVectorMut;

use crate::ALPArray;
use crate::ALPFloat;
Expand Down Expand Up @@ -48,46 +41,6 @@ pub fn decompress_into_array(array: ALPArray) -> PrimitiveArray {
}
}

/// Decompresses an ALP-encoded array.
///
/// # Returns
///
/// A `Vector` containing the decompressed floating-point values with all patches applied.
pub fn decompress_into_vector<T: ALPFloat>(
encoded_vector: Vector,
exponents: Exponents,
patches_vectors: Option<(Vector, Vector, Option<Vector>)>,
patches_offset: usize,
) -> VortexResult<Vector> {
let encoded_primitive = encoded_vector.into_primitive().into_mut();
let (mut alp_buffer, mask) = T::ALPInt::downcast(encoded_primitive).into_parts();
<T>::decode_slice_inplace(alp_buffer.as_mut_slice(), exponents);

// SAFETY: `Buffer<T::ALPInt> and `BufferMut<T>` have the same layout.
let mut decoded_buffer: BufferMut<T> = unsafe { transmute(alp_buffer) };

// Apply patches if they exist.
if let Some((patches_indices, patches_values, _)) = patches_vectors {
let patches_indices = patches_indices.into_primitive();
let patches_values = patches_values.into_primitive();

let values_buffer = T::downcast(patches_values.into_mut()).into_parts().0;
let values_slice = values_buffer.as_slice();
let decoded_slice = decoded_buffer.as_mut_slice();

match_each_unsigned_integer_ptype!(patches_indices.ptype(), |I| {
let indices_buffer = I::downcast(patches_indices.into_mut()).into_parts().0;
let indices_slice = indices_buffer.as_slice();

for (&idx, &value) in indices_slice.iter().zip(values_slice.iter()) {
decoded_slice[AsPrimitive::<usize>::as_(idx) - patches_offset] = value;
}
});
}

Ok(PVectorMut::<T>::new(decoded_buffer, mask).freeze().into())
}

/// Decompresses an ALP-encoded array in 1024-element chunks.
///
/// # Returns
Expand Down
Loading
Loading