Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-btrblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ vortex-zigzag = { workspace = true }
[dev-dependencies]
divan = { workspace = true }
env_logger = "0.11"
rstest = { workspace = true }
vortex-array = { workspace = true, features = ["test-harness"] }

[features]
Expand Down
101 changes: 101 additions & 0 deletions vortex-btrblocks/src/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vortex_dict::DictArray;
use vortex_dtype::PType;
use vortex_error::{VortexExpect, VortexResult, vortex_panic};
use vortex_scalar::Scalar;
use vortex_sparse::{SparseArray, SparseVTable};

pub use self::stats::FloatStats;
use crate::float::dictionary::dictionary_encode;
Expand Down Expand Up @@ -42,6 +43,7 @@ impl Compressor for FloatCompressor {
&ALPScheme,
&ALPRDScheme,
&DictScheme,
&NullDominated,
&RLE_FLOAT_SCHEME,
]
}
Expand All @@ -63,6 +65,8 @@ const DICT_SCHEME: FloatCode = FloatCode(4);
const RUN_END_SCHEME: FloatCode = FloatCode(5);
const RUN_LENGTH_SCHEME: FloatCode = FloatCode(6);

const SPARSE_SCHEME: FloatCode = FloatCode(7);

#[derive(Debug, Copy, Clone)]
struct UncompressedScheme;

Expand All @@ -78,6 +82,9 @@ struct ALPRDScheme;
#[derive(Debug, Copy, Clone)]
struct DictScheme;

#[derive(Debug, Copy, Clone)]
pub struct NullDominated;

pub const RLE_FLOAT_SCHEME: RLEScheme<FloatStats, FloatCode> = RLEScheme::new(
RUN_LENGTH_SCHEME,
|values, is_sample, allowed_cascading, excludes| {
Expand Down Expand Up @@ -373,14 +380,90 @@ impl Scheme for DictScheme {
}
}

impl Scheme for NullDominated {
type StatsType = FloatStats;
type CodeType = FloatCode;

fn code(&self) -> Self::CodeType {
SPARSE_SCHEME
}

fn expected_compression_ratio(
&self,
stats: &Self::StatsType,
_is_sample: bool,
allowed_cascading: usize,
_excludes: &[Self::CodeType],
) -> VortexResult<f64> {
// Only use `SparseScheme` if we can cascade.
if allowed_cascading == 0 {
return Ok(0.0);
}

if stats.value_count == 0 {
// All nulls should use ConstantScheme
return Ok(0.0);
}

// If the majority is null, will compress well.
if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
return Ok(stats.src.len() as f64 / stats.value_count as f64);
}

// Otherwise we don't go this route
Ok(0.0)
}

fn compress(
&self,
stats: &Self::StatsType,
is_sample: bool,
allowed_cascading: usize,
_excludes: &[Self::CodeType],
) -> VortexResult<ArrayRef> {
assert!(allowed_cascading > 0);

// We pass None as we only run this pathway for NULL-dominated float arrays
let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;

if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
// Compress the values
let new_excludes = vec![integer::SparseScheme.code()];

// Don't attempt to compress the non-null values

let indices = sparse.patches().indices().to_primitive().narrow()?;
let compressed_indices = IntCompressor::compress_no_dict(
&indices,
is_sample,
allowed_cascading - 1,
&new_excludes,
)?;

SparseArray::try_new(
compressed_indices,
sparse.patches().values().clone(),
sparse.len(),
sparse.fill_scalar().clone(),
)
.map(|a| a.into_array())
} else {
Ok(sparse_encoded)
}
}
}

#[cfg(test)]
mod tests {
use std::iter;

use vortex_array::arrays::PrimitiveArray;
use vortex_array::builders::{ArrayBuilder, PrimitiveBuilder};
use vortex_array::validity::Validity;
use vortex_array::{Array, IntoArray, ToCanonical, assert_arrays_eq};
use vortex_buffer::{Buffer, buffer_mut};
use vortex_dtype::Nullability;
use vortex_sparse::SparseEncoding;

use crate::float::{FloatCompressor, RLE_FLOAT_SCHEME};
use crate::{Compressor, CompressorStats, MAX_CASCADE, Scheme};
Expand Down Expand Up @@ -430,4 +513,22 @@ mod tests {
let expected = Buffer::copy_from(&values).into_array();
assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
}

#[test]
fn test_sparse_compression() {
let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
array.append_value(f32::NAN);
array.append_value(-f32::NAN);
array.append_value(f32::INFINITY);
array.append_value(-f32::INFINITY);
array.append_value(0.0f32);
array.append_value(-0.0f32);
array.append_nulls(90);

let floats = array.finish_into_primitive();

let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();

assert_eq!(compressed.encoding_id(), SparseEncoding.id());
}
}
98 changes: 97 additions & 1 deletion vortex-btrblocks/src/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vortex_dict::builders::dict_encode;
use vortex_error::{VortexExpect, VortexResult};
use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
use vortex_scalar::Scalar;
use vortex_sparse::{SparseArray, SparseVTable};
use vortex_utils::aliases::hash_set::HashSet;

use crate::integer::IntCompressor;
Expand All @@ -26,6 +27,7 @@ pub struct StringStats {
src: VarBinViewArray,
estimated_distinct_count: u32,
value_count: u32,
null_count: u32,
}

/// Estimate the number of distinct strings in the var bin view array.
Expand Down Expand Up @@ -65,6 +67,7 @@ impl CompressorStats for StringStats {
Self {
src: input.clone(),
value_count: value_count.try_into().vortex_expect("value_count"),
null_count: null_count.try_into().vortex_expect("null_count"),
estimated_distinct_count: estimated_distinct,
}
}
Expand Down Expand Up @@ -94,6 +97,7 @@ impl Compressor for StringCompressor {
&DictScheme,
&FSSTScheme,
&ConstantScheme,
&NullDominated,
]
}

Expand Down Expand Up @@ -122,6 +126,9 @@ pub struct FSSTScheme;
#[derive(Debug, Copy, Clone)]
pub struct ConstantScheme;

#[derive(Debug, Copy, Clone)]
pub struct NullDominated;

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub struct StringCode(u8);

Expand All @@ -130,6 +137,8 @@ const DICT_SCHEME: StringCode = StringCode(1);
const FSST_SCHEME: StringCode = StringCode(2);
const CONSTANT_SCHEME: StringCode = StringCode(3);

const SPARSE_SCHEME: StringCode = StringCode(4);

impl Scheme for UncompressedScheme {
type StatsType = StringStats;
type CodeType = StringCode;
Expand Down Expand Up @@ -338,13 +347,87 @@ impl Scheme for ConstantScheme {
}
}

impl Scheme for NullDominated {
type StatsType = StringStats;
type CodeType = StringCode;

fn code(&self) -> Self::CodeType {
SPARSE_SCHEME
}

fn expected_compression_ratio(
&self,
stats: &Self::StatsType,
_is_sample: bool,
allowed_cascading: usize,
_excludes: &[Self::CodeType],
) -> VortexResult<f64> {
// Only use `SparseScheme` if we can cascade.
if allowed_cascading == 0 {
return Ok(0.0);
}

if stats.value_count == 0 {
// All nulls should use ConstantScheme
return Ok(0.0);
}

// If the majority is null, will compress well.
if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
return Ok(stats.src.len() as f64 / stats.value_count as f64);
}

// Otherwise we don't go this route
Ok(0.0)
}

fn compress(
&self,
stats: &Self::StatsType,
is_sample: bool,
allowed_cascading: usize,
_excludes: &[Self::CodeType],
) -> VortexResult<ArrayRef> {
assert!(allowed_cascading > 0);

// We pass None as we only run this pathway for NULL-dominated float arrays
let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;

if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
// Compress the values
let new_excludes = vec![integer::SparseScheme.code()];

// Don't attempt to compress the non-null values
let indices = sparse.patches().indices().to_primitive().narrow()?;
let compressed_indices = IntCompressor::compress_no_dict(
&indices,
is_sample,
allowed_cascading - 1,
&new_excludes,
)?;

SparseArray::try_new(
compressed_indices,
sparse.patches().values().clone(),
sparse.len(),
sparse.fill_scalar().clone(),
)
.map(|a| a.into_array())
} else {
Ok(sparse_encoded)
}
}
}

#[cfg(test)]
mod tests {
use vortex_array::arrays::VarBinViewArray;
use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
use vortex_dtype::{DType, Nullability};
use vortex_sparse::SparseEncoding;

use crate::Compressor;
use crate::string::StringCompressor;
use crate::{Compressor, MAX_CASCADE};

#[test]
fn test_strings() {
Expand All @@ -363,4 +446,17 @@ mod tests {

println!("compression tree: {}", compressed.display_tree());
}

#[test]
fn test_sparse_nulls() {
let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
strings.append_nulls(99);

strings.append_value("one little string");

let strings = strings.finish_into_varbinview();

let compressed = StringCompressor::compress(&strings, false, MAX_CASCADE, &[]).unwrap();
assert_eq!(compressed.encoding_id(), SparseEncoding.id());
}
}
Loading