Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

faster ALP encode #924

Merged
merged 17 commits into from
Sep 25, 2024
Prev Previous commit
Next Next commit
wip
  • Loading branch information
lwwmanning committed Sep 25, 2024
commit b3f4d183be3c4378537a4b3fdf8e7660a2a5af76
86 changes: 81 additions & 5 deletions encodings/alp/src/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ impl Display for Exponents {
}
}

const ENCODE_CHUNK_SIZE: usize = 1024;

pub trait ALPFloat: Float + Display + 'static {
type ALPInt: PrimInt + Bounded + Display + ToPrimitive;

Expand Down Expand Up @@ -102,7 +104,7 @@ pub trait ALPFloat: Float + Display + 'static {
exponents: Option<Exponents>,
) -> (Exponents, Vec<Self::ALPInt>, Vec<u64>, Vec<Self>) {
let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values));

let mut encoded_output = Vec::with_capacity(values.len());
let mut patch_indices = Vec::new();
let mut patch_values = Vec::new();
Expand All @@ -111,8 +113,10 @@ pub trait ALPFloat: Float + Display + 'static {

// this is intentionally branchless
// TODO: batch this into 1024 values at a time to make it more cache friendly
const CHUNK_SIZE: usize = 1024;
for (chunk_idx, chunk) in values.chunks(CHUNK_SIZE).enumerate() {
for chunk in values.chunks(ENCODE_CHUNK_SIZE) {
let num_prev_encoded = encoded_output.len();
let num_prev_patches = patch_indices.len();

let mut chunk_patch_count = 0;
encoded_output.extend(chunk.iter().map(|v| {
let encoded = unsafe { Self::encode_single_unchecked(*v, exp) };
Expand All @@ -131,8 +135,6 @@ pub trait ALPFloat: Float + Display + 'static {
patch_indices.reserve(chunk_patch_count + 1);
patch_values.reserve(chunk_patch_count + 1);

let num_prev_encoded = chunk_idx * CHUNK_SIZE;
let num_prev_patches = patch_indices.len();

// record the patches in this chunk
let patch_indices_mut = patch_indices.spare_capacity_mut();
Expand Down Expand Up @@ -217,6 +219,80 @@ pub trait ALPFloat: Float + Display + 'static {
}
}

fn encode_chunk_unchecked<T: ALPFloat>(chunk_idx: usize, chunk: &[T], exp: Exponents, encoded_output: &mut Vec<T::ALPInt>, patch_indices: &mut Vec<u64>, patch_values: &mut Vec<T>, fill_value: &mut Option<T::ALPInt>) {
let num_prev_encoded = encoded_output.len();
let num_prev_patches = patch_indices.len();
assert_eq!(patch_indices.len(), patch_values.len());
let has_filled = fill_value.is_some();

// encode the chunk, counting the number of patches
let mut chunk_patch_count = 0;
encoded_output.extend(chunk.iter().map(|v| {
let encoded = unsafe { T::encode_single_unchecked(*v, exp) };
let decoded = T::decode_single(encoded, exp);
let neq: usize = (decoded != *v) as usize;
lwwmanning marked this conversation as resolved.
Show resolved Hide resolved
chunk_patch_count += neq;
encoded
}));
let chunk_patch_count = chunk_patch_count; // immutable hereafter
assert_eq!(encoded_output.len(), num_prev_encoded + chunk.len());

// if there are no patches, we are done
if chunk_patch_count == 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to handle the edge case of 2 chunks where chunk 0 is all patches, chunk 1 has 0 patches... which won't fill

return;
}

// we need to gather the patches for this chunk
// preallocate space for the patches (plus one because our loop may attempt to write one past the end)
patch_indices.reserve(chunk_patch_count + 1);
patch_values.reserve(chunk_patch_count + 1);

// record the patches in this chunk
let patch_indices_mut = patch_indices.spare_capacity_mut();
let patch_values_mut = patch_values.spare_capacity_mut();
let mut chunk_patch_index = 0;
for i in num_prev_encoded..encoded_output.len() {
let decoded = T::decode_single(encoded_output[i], exp);
// write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op)
patch_indices_mut[chunk_patch_index].write(i as u64);
patch_values_mut[chunk_patch_index].write(chunk[i]);
chunk_patch_index += (decoded != chunk[i]) as usize;
}
assert_eq!(chunk_patch_index, chunk_patch_count);
unsafe {
patch_indices.set_len(num_prev_patches + chunk_patch_count);
patch_values.set_len(num_prev_patches + chunk_patch_count);
}

// find the first successfully encoded value (i.e., not patched)
// this is our fill value for missing values
if fill_value.is_none() {
assert_eq!(num_prev_encoded, num_prev_patches);
for i in num_prev_encoded..encoded_output.len() {
if i >= patch_indices.len() || patch_indices[i] != i as u64 {
*fill_value = Some(encoded_output[i]);
break;
}
}
}

// replace the patched values in the encoded array with the fill value
// for better downstream compression
if let Some(fill_value) = fill_value {
// handle the edge case where the first N >= 1 chunks are all patches
let patch_indices_to_fill = if !has_filled {
&patch_indices
} else {
&patch_indices[num_prev_patches..]
};

for patch_idx in patch_indices_to_fill.iter() {
encoded_output[*patch_idx as usize] = fill_value;
}
has_filled = true;
}
}

impl ALPFloat for f32 {
type ALPInt = i32;
const FRACTIONAL_BITS: u8 = 23;
Expand Down