Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: teach ALPArray to store validity only in the encoded array #2216

Merged
merged 19 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions encodings/alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ vortex-scalar = { workspace = true }

[dev-dependencies]
divan = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
vortex-array = { workspace = true, features = ["test-harness"] }

Expand Down
96 changes: 85 additions & 11 deletions encodings/alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,101 @@
#![allow(clippy::unwrap_used)]

use divan::Bencher;
use vortex_alp::{ALPFloat, ALPRDFloat, Exponents, RDEncoder};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng as _};
use vortex_alp::{alp_encode, ALPFloat, ALPRDFloat, RDEncoder};
use vortex_array::array::PrimitiveArray;
use vortex_array::validity::Validity;
use vortex_array::IntoCanonical;
use vortex_buffer::{buffer, Buffer};
use vortex_buffer::buffer;
use vortex_dtype::NativePType;

fn main() {
divan::main();
}

#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])]
fn compress_alp<T: ALPFloat>(n: usize) -> (Exponents, Buffer<T::ALPInt>, Buffer<u64>, Buffer<T>) {
let values: Vec<T> = vec![T::from(1.234).unwrap(); n];
T::encode(values.as_slice(), None)
#[divan::bench(types = [f32, f64], args = [
(100_000, 0.0, 0.25),
(100_000, 0.01, 0.25),
(100_000, 0.1, 0.25),
(10_000_000, 0.0, 0.25),
(10_000_000, 0.01, 0.25),
(10_000_000, 0.1, 0.25),
(100_000, 0.0, 0.95),
(100_000, 0.01, 0.95),
(100_000, 0.1, 0.95),
(10_000_000, 0.0, 0.95),
(10_000_000, 0.01, 0.95),
(10_000_000, 0.1, 0.95),
(100_000, 0.0, 1.0),
(100_000, 0.01, 1.0),
(100_000, 0.1, 1.0),
(10_000_000, 0.0, 1.0),
(10_000_000, 0.01, 1.0),
(10_000_000, 0.1, 1.0),
])]
fn compress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64, f64)) {
let (n, fraction_patch, fraction_valid) = args;
let mut rng = StdRng::seed_from_u64(0);
let mut values = buffer![T::from(1.234).unwrap(); n].into_mut();
if fraction_patch > 0.0 {
for index in 0..values.len() {
if rng.gen_bool(fraction_patch) {
values[index] = T::from(1000.0).unwrap()
}
}
}
let validity = if fraction_valid < 1.0 {
Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid)))
} else {
Validity::NonNullable
};
let values = values.freeze();

bencher.bench_local(move || {
alp_encode(&PrimitiveArray::new(values.clone(), validity.clone())).unwrap()
})
}

#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])]
fn decompress_alp<T: ALPFloat>(bencher: Bencher, n: usize) {
let values: Vec<T> = vec![T::from(1.234).unwrap(); n];
let (exponents, encoded, ..) = T::encode(values.as_slice(), None);
bencher.bench_local(move || T::decode(&encoded, exponents));
#[divan::bench(types = [f32, f64], args = [
(100_000, 0.0, 0.25),
(100_000, 0.01, 0.25),
(100_000, 0.1, 0.25),
(10_000_000, 0.0, 0.25),
(10_000_000, 0.01, 0.25),
(10_000_000, 0.1, 0.25),
(100_000, 0.0, 0.95),
(100_000, 0.01, 0.95),
(100_000, 0.1, 0.95),
(10_000_000, 0.0, 0.95),
(10_000_000, 0.01, 0.95),
(10_000_000, 0.1, 0.95),
(100_000, 0.0, 1.0),
(100_000, 0.01, 1.0),
(100_000, 0.1, 1.0),
(10_000_000, 0.0, 1.0),
(10_000_000, 0.01, 1.0),
(10_000_000, 0.1, 1.0),
])]
fn decompress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64, f64)) {
let (n, fraction_patch, fraction_valid) = args;
let mut rng = StdRng::seed_from_u64(0);
let mut values = buffer![T::from(1.234).unwrap(); n].into_mut();
if fraction_patch > 0.0 {
for index in 0..values.len() {
if rng.gen_bool(fraction_patch) {
values[index] = T::from(1000.0).unwrap()
}
}
}
let validity = if fraction_valid < 1.0 {
Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid)))
} else {
Validity::NonNullable
};
let values = values.freeze();
let array = alp_encode(&PrimitiveArray::new(values, validity)).unwrap();
bencher.bench_local(move || array.clone().into_canonical().unwrap());
}

#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])]
Expand Down
8 changes: 8 additions & 0 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ impl ALPArray {
let mut children = Vec::with_capacity(2);
children.push(encoded);
if let Some(patches) = &patches {
if patches.dtype() != &dtype {
vortex_bail!(MismatchedTypes: dtype, patches.dtype());
}

if !patches.values().all_valid()? {
vortex_bail!("ALPArray: patches must not contain invalid entries");
}

children.push(patches.indices().clone());
children.push(patches.values().clone());
}
Expand Down
148 changes: 119 additions & 29 deletions encodings/alp/src/alp/compress.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use itertools::Itertools as _;
use vortex_array::array::PrimitiveArray;
use vortex_array::patches::Patches;
use vortex_array::validity::Validity;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{Array, IntoArray, IntoArrayVariant};
use vortex_buffer::{Buffer, BufferMut};
use vortex_dtype::{NativePType, PType};
use vortex_error::{vortex_bail, VortexResult, VortexUnwrap};
use vortex_error::{vortex_bail, VortexResult};
use vortex_mask::Mask;
use vortex_scalar::ScalarType;

use crate::alp::{ALPArray, ALPFloat};
Expand All @@ -24,39 +28,74 @@ macro_rules! match_each_alp_float_ptype {
})
}

pub fn alp_encode_components<T>(
pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
let (exponents, encoded, patches) = alp_encode_components(parray)?;
ALPArray::try_new(encoded, exponents, patches)
}

pub fn alp_encode_components(
parray: &PrimitiveArray,
) -> VortexResult<(Exponents, Array, Option<Patches>)> {
match parray.ptype() {
PType::F32 => alp_encode_components_typed::<f32>(parray),
PType::F64 => alp_encode_components_typed::<f64>(parray),
_ => vortex_bail!("ALP can only encode f32 and f64"),
}
}

#[allow(clippy::cast_possible_truncation)]
fn alp_encode_components_typed<T>(
values: &PrimitiveArray,
exponents: Option<Exponents>,
) -> (Exponents, Array, Option<Patches>)
) -> VortexResult<(Exponents, Array, Option<Patches>)>
where
T: ALPFloat + NativePType,
T::ALPInt: NativePType,
T: ScalarType,
{
let (exponents, encoded, exc_pos, exc) = T::encode(values.as_slice::<T>(), exponents);
let len = encoded.len();
(
exponents,
PrimitiveArray::new(encoded, values.validity()).into_array(),
(!exc.is_empty()).then(|| {
let position_arr = exc_pos.into_array();
let patch_validity = values.validity().take(&position_arr).vortex_unwrap();
Patches::new(
len,
position_arr,
PrimitiveArray::new(exc, patch_validity).into_array(),
)
}),
)
}
let values_slice = values.as_slice::<T>();

pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
let (exponents, encoded, patches) = match parray.ptype() {
PType::F32 => alp_encode_components::<f32>(parray, None),
PType::F64 => alp_encode_components::<f64>(parray, None),
_ => vortex_bail!("ALP can only encode f32 and f64"),
let (exponents, encoded, exceptional_positions, exceptional_values) =
T::encode(values_slice, None);

let encoded_array = PrimitiveArray::new(encoded, values.validity()).into_array();

let validity = values.validity_mask()?;
// exceptional_positions may contain exceptions at invalid positions (which contain garbage
// data). We remove invalid exceptional positions in order to keep the Patches small.
let (valid_exceptional_positions, valid_exceptional_values): (Buffer<u64>, Buffer<T>) =
match validity {
Mask::AllTrue(_) => (exceptional_positions, exceptional_values),
Mask::AllFalse(_) => {
// no valid positions, ergo nothing worth patching
(Buffer::empty(), Buffer::empty())
}
Mask::Values(is_valid) => {
let (pos, vals): (BufferMut<u64>, BufferMut<T>) = exceptional_positions
.into_iter()
.zip_eq(exceptional_values)
.filter(|(index, _)| is_valid.value(*index as usize))
.unzip();
(pos.freeze(), vals.freeze())
}
};
let patches = if valid_exceptional_positions.is_empty() {
None
} else {
let patches_validity = if values.dtype().is_nullable() {
Validity::AllValid
} else {
Validity::NonNullable
};
let valid_exceptional_values =
PrimitiveArray::new(valid_exceptional_values, patches_validity).into_array();

Some(Patches::new(
values_slice.len(),
valid_exceptional_positions.into_array(),
valid_exceptional_values,
))
};
ALPArray::try_new(encoded, exponents, patches)
Ok((exponents, encoded_array, patches))
}

pub fn decompress(array: ALPArray) -> VortexResult<PrimitiveArray> {
Expand Down Expand Up @@ -85,6 +124,7 @@ mod tests {
use vortex_array::compute::scalar_at;
use vortex_array::validity::Validity;
use vortex_buffer::{buffer, Buffer};
use vortex_scalar::Scalar;

use super::*;

Expand Down Expand Up @@ -128,7 +168,7 @@ mod tests {
}

#[test]
#[allow(clippy::approx_constant)] // ALP doesn't like E
#[allow(clippy::approx_constant)] // Clippy objects to 2.718, an approximation of e, the base of the natural logarithm.
fn test_patched_compress() {
let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0];
let array = PrimitiveArray::new(values.clone(), Validity::NonNullable);
Expand All @@ -140,14 +180,47 @@ mod tests {
.into_primitive()
.unwrap()
.as_slice::<i64>(),
vec![1234i64, 2718, 1234, 4000] // fill forward
vec![1234i64, 2718, 1234, 4000]
);
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

let decoded = decompress(encoded).unwrap();
assert_eq!(values.as_slice(), decoded.as_slice::<f64>());
}

#[test]
#[allow(clippy::approx_constant)] // Clippy objects to 2.718, an approximation of e, the base of the natural logarithm.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's so funny

fn test_compress_ignores_invalid_exceptional_values() {
let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0];
let array = PrimitiveArray::new(values, Validity::from_iter([true, true, false, true]));
let encoded = alp_encode(&array).unwrap();
assert!(encoded.patches().is_none());
assert_eq!(
encoded
.encoded()
.into_primitive()
.unwrap()
.as_slice::<i64>(),
vec![1234i64, 2718, 1234, 4000]
);
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

let decoded = decompress(encoded).unwrap();
assert_eq!(
scalar_at(&decoded, 0).unwrap(),
scalar_at(&array, 0).unwrap()
);
assert_eq!(
scalar_at(&decoded, 1).unwrap(),
scalar_at(&array, 1).unwrap()
);
assert!(!decoded.is_valid(2).unwrap());
assert_eq!(
scalar_at(&decoded, 3).unwrap(),
scalar_at(&array, 3).unwrap()
);
}

#[test]
#[allow(clippy::approx_constant)] // ALP doesn't like E
fn test_nullable_patched_scalar_at() {
Expand All @@ -168,6 +241,7 @@ mod tests {
assert!(s.is_valid());
}

assert!(!encoded.is_valid(4).unwrap());
let s = scalar_at(encoded.as_ref(), 4).unwrap();
assert!(s.is_null());

Expand All @@ -190,7 +264,23 @@ mod tests {
);
let alp_arr = alp_encode(&original).unwrap();
let decompressed = alp_arr.into_primitive().unwrap();
assert_eq!(original.as_slice::<f64>(), decompressed.as_slice::<f64>());
assert_eq!(
// The second and third values become exceptions and are replaced
[195.26274, 195.26274, 195.26274],
decompressed.as_slice::<f64>()
);
assert_eq!(original.validity(), decompressed.validity());
assert_eq!(
scalar_at(&original, 0).unwrap(),
Scalar::null_typed::<f64>()
);
assert_eq!(
scalar_at(&original, 1).unwrap(),
Scalar::null_typed::<f64>()
);
assert_eq!(
scalar_at(&original, 2).unwrap(),
Scalar::null_typed::<f64>()
);
}
}
6 changes: 5 additions & 1 deletion encodings/alp/src/alp/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ impl ComputeVTable for ALPEncoding {

impl ScalarAtFn<ALPArray> for ALPEncoding {
fn scalar_at(&self, array: &ALPArray, index: usize) -> VortexResult<Scalar> {
if !array.encoded().is_valid(index)? {
return Ok(Scalar::null(array.dtype().clone()));
}

if let Some(patches) = array.patches() {
if let Some(patch) = patches.get_patched(index)? {
return Ok(patch);
return patch.cast(array.dtype());
}
}

Expand Down
12 changes: 3 additions & 9 deletions vortex-sampling-compressor/src/compressors/alp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use vortex_alp::{
alp_encode_components, match_each_alp_float_ptype, ALPArray, ALPEncoding, ALPRDEncoding,
};
use vortex_alp::{alp_encode_components, ALPArray, ALPEncoding, ALPRDEncoding};
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::PrimitiveArray;
use vortex_array::variants::PrimitiveArrayTrait;
Expand Down Expand Up @@ -43,12 +41,8 @@ impl EncodingCompressor for ALPCompressor {
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let parray = array.clone().into_primitive()?;

let (exponents, encoded, patches) = match_each_alp_float_ptype!(
parray.ptype(), |$T| {
alp_encode_components::<$T>(&parray, None)
});
let (exponents, encoded, patches) =
alp_encode_components(&array.clone().into_primitive()?)?;

let compressed_encoded = ctx
.named("packed")
Expand Down
Loading