Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions encodings/fsst/benches/fsst_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ const BENCH_ARGS: &[(usize, usize, u8)] = &[
#[divan::bench(args = BENCH_ARGS)]
fn compress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
let array = generate_test_data(string_count, avg_len, unique_chars);
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
bencher.bench(|| fsst_compress(array.as_ref(), &compressor).unwrap())
let compressor = fsst_train_compressor(&array);
bencher.bench(|| fsst_compress(&array, &compressor))
}

#[divan::bench(args = BENCH_ARGS)]
fn decompress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
let array = generate_test_data(string_count, avg_len, unique_chars);
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
let encoded = fsst_compress(array.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&array);
let encoded = fsst_compress(array, &compressor);

bencher
.with_inputs(|| encoded.clone())
Expand All @@ -56,14 +56,14 @@ fn decompress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usi
#[divan::bench(args = BENCH_ARGS)]
fn train_compressor(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
let array = generate_test_data(string_count, avg_len, unique_chars);
bencher.bench(|| fsst_train_compressor(array.as_ref()).unwrap())
bencher.bench(|| fsst_train_compressor(&array))
}

#[divan::bench(args = BENCH_ARGS)]
fn pushdown_compare(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
let array = generate_test_data(string_count, avg_len, unique_chars);
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
let fsst_array = fsst_compress(array.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&array);
let fsst_array = fsst_compress(&array, &compressor);
let constant = ConstantArray::new(Scalar::from(&b"const"[..]), array.len());

bencher
Expand All @@ -79,8 +79,8 @@ fn canonicalize_compare(
(string_count, avg_len, unique_chars): (usize, usize, u8),
) {
let array = generate_test_data(string_count, avg_len, unique_chars);
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
let fsst_array = fsst_compress(array.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&array);
let fsst_array = fsst_compress(&array, &compressor);
let constant = ConstantArray::new(Scalar::from(&b"const"[..]), array.len());

bencher
Expand Down Expand Up @@ -168,11 +168,9 @@ fn generate_chunked_test_data(
) -> ChunkedArray {
(0..chunk_size)
.map(|_| {
let array = generate_test_data(string_count, avg_len, unique_chars).into_array();
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
fsst_compress(array.as_ref(), &compressor)
.unwrap()
.into_array()
let array = generate_test_data(string_count, avg_len, unique_chars);
let compressor = fsst_train_compressor(&array);
fsst_compress(array, &compressor).into_array()
})
.collect::<ChunkedArray>()
}
22 changes: 8 additions & 14 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ mod tests {

use crate::{fsst_compress, fsst_train_compressor};

fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
fn make_data() -> (VarBinArray, Vec<Option<Vec<u8>>>) {
const STRING_COUNT: usize = 1000;
let mut rng = StdRng::seed_from_u64(0);
let mut strings = Vec::with_capacity(STRING_COUNT);
Expand Down Expand Up @@ -133,8 +133,7 @@ mod tests {
.into_iter()
.map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
DType::Binary(Nullability::Nullable),
)
.into_array(),
),
strings,
)
}
Expand All @@ -144,11 +143,8 @@ mod tests {
let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
.map(|_| {
let (array, data) = make_data();
let compressor = fsst_train_compressor(&array).unwrap();
(
fsst_compress(&array, &compressor).unwrap().into_array(),
data,
)
let compressor = fsst_train_compressor(&array);
(fsst_compress(&array, &compressor).into_array(), data)
})
.unzip();

Expand All @@ -168,17 +164,15 @@ mod tests {

{
let arr = builder.finish_into_canonical().into_varbinview();
let res1 = arr
.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
.unwrap();
let res1 =
arr.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
assert_eq!(data, res1);
};

{
let arr2 = chunked_arr.to_varbinview();
let res2 = arr2
.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
.unwrap();
let res2 =
arr2.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
assert_eq!(data, res2)
};
}
Expand Down
56 changes: 11 additions & 45 deletions encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,64 +6,30 @@
use fsst::{Compressor, Symbol};
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::builder::VarBinBuilder;
use vortex_array::arrays::{VarBinVTable, VarBinViewVTable};
use vortex_array::{Array, IntoArray};
use vortex_buffer::{Buffer, BufferMut};
use vortex_dtype::DType;
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail};
use vortex_error::{VortexExpect, VortexUnwrap};

use crate::FSSTArray;

/// Compress an array using FSST.
///
/// # Panics
///
/// If the `strings` array is not encoded as either [`vortex_array::arrays::VarBinArray`] or
/// [`vortex_array::arrays::VarBinViewArray`].
pub fn fsst_compress(strings: &dyn Array, compressor: &Compressor) -> VortexResult<FSSTArray> {
let len = strings.len();
let dtype = strings.dtype().clone();

// Compress VarBinArray
if let Some(varbin) = strings.as_opt::<VarBinVTable>() {
return varbin
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
.map_err(|err| err.with_context("Failed to compress VarBinArray with FSST"));
}

// Compress VarBinViewArray
if let Some(varbin_view) = strings.as_opt::<VarBinViewVTable>() {
return varbin_view
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
.map_err(|err| err.with_context("Failed to compress VarBinViewArray with FSST"));
}

vortex_bail!(
"cannot fsst_compress array with unsupported encoding {:?}",
strings.encoding_id()
)
/// Compress a string array using FSST.
pub fn fsst_compress<A: ArrayAccessor<[u8]> + AsRef<dyn Array>>(
strings: A,
compressor: &Compressor,
) -> FSSTArray {
let len = strings.as_ref().len();
let dtype = strings.as_ref().dtype().clone();
strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
}

/// Train a compressor from an array.
///
/// # Panics
///
/// If the provided array is not FSST compressible.
pub fn fsst_train_compressor(array: &dyn Array) -> VortexResult<Compressor> {
if let Some(varbin) = array.as_opt::<VarBinVTable>() {
varbin
.with_iterator(|iter| fsst_train_compressor_iter(iter))
.map_err(|err| err.with_context("Failed to train FSST Compressor from VarBinArray"))
} else if let Some(varbin_view) = array.as_opt::<VarBinViewVTable>() {
varbin_view
.with_iterator(|iter| fsst_train_compressor_iter(iter))
.map_err(|err| err.with_context("Failed to train FSST Compressor from VarBinViewArray"))
} else {
vortex_bail!(
"cannot fsst_compress array with unsupported encoding {:?}",
array.encoding_id()
)
}
pub fn fsst_train_compressor<A: ArrayAccessor<[u8]>>(array: &A) -> Compressor {
array.with_iterator(|iter| fsst_train_compressor_iter(iter))
}

/// Train a [compressor][Compressor] from an iterator of bytestrings.
Expand Down
8 changes: 4 additions & 4 deletions encodings/fsst/src/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ mod tests {
DType::Utf8(Nullability::NonNullable),
);

let compressor = fsst_train_compressor(strings.as_ref()).unwrap();
let fsst = fsst_compress(strings.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&strings);
let fsst = fsst_compress(strings, &compressor);

// Cast to nullable
let casted = cast(fsst.as_ref(), &DType::Utf8(Nullability::Nullable)).unwrap();
Expand All @@ -77,8 +77,8 @@ mod tests {
DType::Utf8(Nullability::NonNullable)
))]
fn test_cast_fsst_conformance(#[case] array: VarBinArray) {
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
let fsst = fsst_compress(array.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&array);
let fsst = fsst_compress(&array, &compressor);
test_cast_conformance(fsst.as_ref());
}
}
4 changes: 2 additions & 2 deletions encodings/fsst/src/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ mod tests {
],
DType::Utf8(Nullability::Nullable),
);
let compressor = fsst_train_compressor(lhs.as_ref()).unwrap();
let lhs = fsst_compress(lhs.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&lhs);
let lhs = fsst_compress(lhs, &compressor);

let rhs = ConstantArray::new("world", lhs.len());

Expand Down
12 changes: 6 additions & 6 deletions encodings/fsst/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ mod test {
builder.append_value(b"world");
let varbin = builder.finish(DType::Utf8(Nullability::NonNullable));

let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
let array = fsst_compress(varbin.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&varbin);
let array = fsst_compress(&varbin, &compressor);
test_filter_conformance(array.as_ref());

// Test with longer strings that benefit from compression
Expand All @@ -59,8 +59,8 @@ mod test {
builder.append_value(b"the lazy dog sleeps");
let varbin = builder.finish(DType::Utf8(Nullability::NonNullable));

let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
let array = fsst_compress(varbin.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&varbin);
let array = fsst_compress(&varbin, &compressor);
test_filter_conformance(array.as_ref());

// Test with nullable strings
Expand All @@ -72,8 +72,8 @@ mod test {
builder.append_null();
let varbin = builder.finish(DType::Utf8(Nullability::Nullable));

let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
let array = fsst_compress(varbin.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&varbin);
let array = fsst_compress(&varbin, &compressor);
test_filter_conformance(array.as_ref());
}
}
32 changes: 16 additions & 16 deletions encodings/fsst/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ mod tests {
#[test]
fn test_take_null() {
let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
let compr = fsst_train_compressor(arr.as_ref()).unwrap();
let fsst = fsst_compress(arr.as_ref(), &compr).unwrap();
let compr = fsst_train_compressor(&arr);
let fsst = fsst_compress(&arr, &compr);

let idx1: PrimitiveArray = (0..1).collect();

Expand Down Expand Up @@ -86,8 +86,8 @@ mod tests {
DType::Utf8(Nullability::NonNullable),
))]
fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
let array = fsst_compress(varbin.as_ref(), &compressor).unwrap();
let compressor = fsst_train_compressor(&varbin);
let array = fsst_compress(&varbin, &compressor);
test_take_conformance(array.as_ref());
}

Expand All @@ -98,43 +98,43 @@ mod tests {
["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
DType::Utf8(Nullability::NonNullable),
);
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
fsst_compress(varbin.as_ref(), &compressor).unwrap()
let compressor = fsst_train_compressor(&varbin);
fsst_compress(&varbin, &compressor)
})]
// Nullable strings
#[case::fsst_nullable({
let varbin = VarBinArray::from_iter(
[Some("hello"), None, Some("world"), Some("test"), None],
DType::Utf8(Nullability::Nullable),
);
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
fsst_compress(varbin.as_ref(), &compressor).unwrap()
let compressor = fsst_train_compressor(&varbin);
fsst_compress(varbin, &compressor)
})]
// Repetitive patterns (good for FSST compression)
#[case::fsst_repetitive({
let varbin = VarBinArray::from_iter(
["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
DType::Utf8(Nullability::NonNullable),
);
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
fsst_compress(varbin.as_ref(), &compressor).unwrap()
let compressor = fsst_train_compressor(&varbin);
fsst_compress(&varbin, &compressor)
})]
// Edge cases
#[case::fsst_single({
let varbin = VarBinArray::from_iter(
["single element"].map(Some),
DType::Utf8(Nullability::NonNullable),
);
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
fsst_compress(varbin.as_ref(), &compressor).unwrap()
let compressor = fsst_train_compressor(&varbin);
fsst_compress(&varbin, &compressor)
})]
#[case::fsst_empty_strings({
let varbin = VarBinArray::from_iter(
["", "test", "", "hello", ""].map(Some),
DType::Utf8(Nullability::NonNullable),
);
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
fsst_compress(varbin.as_ref(), &compressor).unwrap()
let compressor = fsst_train_compressor(&varbin);
fsst_compress(varbin, &compressor)
})]
// Large arrays
#[case::fsst_large({
Expand All @@ -153,8 +153,8 @@ mod tests {
}))
.collect();
let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
fsst_compress(varbin.as_ref(), &compressor).unwrap()
let compressor = fsst_train_compressor(&varbin);
fsst_compress(varbin, &compressor)
})]

fn test_fsst_consistency(#[case] array: FSSTArray) {
Expand Down
4 changes: 2 additions & 2 deletions encodings/fsst/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ impl EncodeVTable<FSSTVTable> for FSSTVTable {

let compressor = match like {
Some(like) => Compressor::rebuild_from(like.symbols(), like.symbol_lengths()),
None => fsst_train_compressor(array.as_ref())?,
None => fsst_train_compressor(&array),
};

Ok(Some(fsst_compress(array.as_ref(), &compressor)?))
Ok(Some(fsst_compress(array, &compressor)))
}
}

Expand Down
6 changes: 2 additions & 4 deletions encodings/fsst/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ pub fn gen_fsst_test_data(len: usize, avg_str_len: usize, unique_chars: u8) -> A
.map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
DType::Binary(Nullability::NonNullable),
);
let compressor = fsst_train_compressor(varbin.as_ref()).vortex_unwrap();
let compressor = fsst_train_compressor(&varbin);

fsst_compress(varbin.as_ref(), &compressor)
.vortex_unwrap()
.into_array()
fsst_compress(varbin, &compressor).into_array()
}

pub fn gen_dict_fsst_test_data<T: NativePType>(
Expand Down
6 changes: 2 additions & 4 deletions encodings/fsst/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ pub(crate) fn build_fsst_array() -> ArrayRef {
input_array.append_value(b"Nothing in present history can contradict them");
let input_array = input_array.finish(DType::Utf8(Nullability::NonNullable));

let compressor = fsst_train_compressor(input_array.as_ref()).unwrap();
fsst_compress(input_array.as_ref(), &compressor)
.unwrap()
.into_array()
let compressor = fsst_train_compressor(&input_array);
fsst_compress(input_array, &compressor).into_array()
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion encodings/zstd/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn collect_valid_vbv(vbv: &VarBinViewArray) -> VortexResult<(ByteBuffer, Vec<usi
buffer.extend_from_slice(value);
}
Ok::<_, VortexError>(())
})??;
})?;
(buffer.freeze(), value_byte_indices)
}
};
Expand Down
Loading
Loading