Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

faster ALP encode #924

Merged
merged 17 commits into from
Sep 25, 2024
171 changes: 135 additions & 36 deletions encodings/alp/src/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::{Display, Formatter};
use std::mem::size_of;

use itertools::Itertools;
use num_traits::{CheckedSub, Float, NumCast, PrimInt, ToPrimitive, Zero};
use num_traits::{CheckedSub, Float, PrimInt, ToPrimitive};
use serde::{Deserialize, Serialize};
use vortex_error::vortex_panic;

Expand Down Expand Up @@ -35,10 +35,11 @@ pub trait ALPFloat: Float + Display + 'static {
(self + Self::SWEET) - Self::SWEET
}

#[inline]
fn as_int(self) -> Option<Self::ALPInt> {
<Self::ALPInt as NumCast>::from(self)
}
/// Equivalent to calling `as` to cast the primitive float to the target integer type.
fn as_int(self) -> Self::ALPInt;

/// Convert from the integer type back to the float type using `as`.
fn from_int(n: Self::ALPInt) -> Self;
lwwmanning marked this conversation as resolved.
Show resolved Hide resolved

fn find_best_exponents(values: &[Self]) -> Exponents {
let mut best_exp = Exponents { e: 0, f: 0 };
Expand Down Expand Up @@ -72,7 +73,7 @@ pub trait ALPFloat: Float + Display + 'static {
best_exp
}

#[inline(always)]
#[inline]
fn estimate_encoded_size(encoded: &[Self::ALPInt], patches: &[Self]) -> usize {
let bits_per_encoded = encoded
.iter()
Expand Down Expand Up @@ -103,42 +104,35 @@ pub trait ALPFloat: Float + Display + 'static {
) -> (Exponents, Vec<Self::ALPInt>, Vec<u64>, Vec<Self>) {
let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values));

let mut exc_pos = Vec::new();
let mut exc_value = Vec::new();
let mut prev = Self::ALPInt::zero();
let encoded = values
.iter()
.enumerate()
.map(|(i, v)| {
match Self::encode_single(*v, exp) {
Ok(fi) => {
prev = fi;
fi
}
Err(exc) => {
exc_pos.push(i as u64);
exc_value.push(exc);
// Emit the last known good value. This helps with run-end encoding.
prev
}
}
})
.collect_vec();
let mut encoded_output = Vec::with_capacity(values.len());
let mut patch_indices = Vec::new();
let mut patch_values = Vec::new();
let mut fill_value: Option<Self::ALPInt> = None;

(exp, encoded, exc_pos, exc_value)
// this is intentionally branchless
// we batch this into 32KB of values at a time to make it more L1 cache friendly
let encode_chunk_size: usize = (32 << 10) / size_of::<Self::ALPInt>();
for chunk in values.chunks(encode_chunk_size) {
encode_chunk_unchecked(
chunk,
exp,
&mut encoded_output,
&mut patch_indices,
&mut patch_values,
&mut fill_value,
);
}

(exp, encoded_output, patch_indices, patch_values)
}

#[inline]
fn encode_single(value: Self, exponents: Exponents) -> Result<Self::ALPInt, Self> {
let encoded = (value * Self::F10[exponents.e as usize] * Self::IF10[exponents.f as usize])
.fast_round();
if let Some(e) = encoded.as_int() {
let decoded = Self::decode_single(e, exponents);
if decoded == value {
return Ok(e);
}
let encoded = unsafe { Self::encode_single_unchecked(value, exponents) };
let decoded = Self::decode_single(encoded, exponents);
if decoded == value {
return Ok(encoded);
}

Err(value)
}

Expand All @@ -154,6 +148,91 @@ pub trait ALPFloat: Float + Display + 'static {
});
encoded_float * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize]
}

/// # Safety
///
/// The returned value may not decode back to the original value.
#[inline(always)]
unsafe fn encode_single_unchecked(value: Self, exponents: Exponents) -> Self::ALPInt {
(value * Self::F10[exponents.e as usize] * Self::IF10[exponents.f as usize])
.fast_round()
.as_int()
}
}

fn encode_chunk_unchecked<T: ALPFloat>(
chunk: &[T],
exp: Exponents,
encoded_output: &mut Vec<T::ALPInt>,
patch_indices: &mut Vec<u64>,
patch_values: &mut Vec<T>,
fill_value: &mut Option<T::ALPInt>,
) {
let num_prev_encoded = encoded_output.len();
let num_prev_patches = patch_indices.len();
assert_eq!(patch_indices.len(), patch_values.len());
let has_filled = fill_value.is_some();

// encode the chunk, counting the number of patches
let mut chunk_patch_count = 0;
encoded_output.extend(chunk.iter().map(|v| {
let encoded = unsafe { T::encode_single_unchecked(*v, exp) };
let decoded = T::decode_single(encoded, exp);
let neq = (decoded != *v) as usize;
chunk_patch_count += neq;
encoded
}));
let chunk_patch_count = chunk_patch_count; // immutable hereafter
assert_eq!(encoded_output.len(), num_prev_encoded + chunk.len());

// find the first successfully encoded value (i.e., not patched)
// this is our fill value for missing values
if fill_value.is_none() && (num_prev_encoded + chunk_patch_count < encoded_output.len()) {
assert_eq!(num_prev_encoded, num_prev_patches);
for i in num_prev_encoded..encoded_output.len() {
if i >= patch_indices.len() || patch_indices[i] != i as u64 {
*fill_value = Some(encoded_output[i]);
break;
}
}
}

// if there are no patches, we are done
if chunk_patch_count == 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to handle the edge case of 2 chunks where chunk 0 is all patches, chunk 1 has 0 patches... which won't fill

return;
}

// we need to gather the patches for this chunk
// preallocate space for the patches (plus one because our loop may attempt to write one past the end)
patch_indices.reserve(chunk_patch_count + 1);
patch_values.reserve(chunk_patch_count + 1);

// record the patches in this chunk
let patch_indices_mut = patch_indices.spare_capacity_mut();
let patch_values_mut = patch_values.spare_capacity_mut();
let mut chunk_patch_index = 0;
for i in num_prev_encoded..encoded_output.len() {
let decoded = T::decode_single(encoded_output[i], exp);
// write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op)
patch_indices_mut[chunk_patch_index].write(i as u64);
patch_values_mut[chunk_patch_index].write(chunk[i - num_prev_encoded]);
chunk_patch_index += (decoded != chunk[i - num_prev_encoded]) as usize;
}
assert_eq!(chunk_patch_index, chunk_patch_count);
unsafe {
patch_indices.set_len(num_prev_patches + chunk_patch_count);
patch_values.set_len(num_prev_patches + chunk_patch_count);
}

// replace the patched values in the encoded array with the fill value
// for better downstream compression
if let Some(fill_value) = fill_value {
// handle the edge case where the first N >= 1 chunks are all patches
let start_patch = if !has_filled { 0 } else { num_prev_patches };
for patch_idx in &patch_indices[start_patch..] {
encoded_output[*patch_idx as usize] = *fill_value;
}
}
}

impl ALPFloat for f32 {
Expand Down Expand Up @@ -189,6 +268,16 @@ impl ALPFloat for f32 {
0.000000001,
0.0000000001, // 10^-10
];

#[inline(always)]
fn as_int(self) -> Self::ALPInt {
self as _
}

#[inline(always)]
fn from_int(n: Self::ALPInt) -> Self {
n as _
}
}

impl ALPFloat for f64 {
Expand Down Expand Up @@ -250,4 +339,14 @@ impl ALPFloat for f64 {
0.0000000000000000000001,
0.00000000000000000000001, // 10^-23
];

#[inline(always)]
fn as_int(self) -> Self::ALPInt {
self as _
}

#[inline(always)]
fn from_int(n: Self::ALPInt) -> Self {
n as _
}
}
2 changes: 1 addition & 1 deletion encodings/alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
assert!(encoded.patches().is_some());
assert_eq!(
encoded.encoded().as_primitive().maybe_null_slice::<i64>(),
vec![1234i64, 2718, 2718, 4000] // fill forward
vec![1234i64, 2718, 1234, 4000] // fill forward
);
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

Expand Down
Loading