Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion encodings/fsst/goldenfiles/fsst.metadata
Original file line number Diff line number Diff line change
@@ -1 +1 @@


252 changes: 200 additions & 52 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ use vortex_array::builders::VarBinViewBuilder;
use vortex_array::serde::ArrayChildren;
use vortex_array::stats::ArrayStats;
use vortex_array::stats::StatsSetRef;
use vortex_array::validity::Validity;
use vortex_array::vtable;
use vortex_array::vtable::ArrayId;
use vortex_array::vtable::BaseArrayVTable;
use vortex_array::vtable::NotSupported;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityChild;
use vortex_array::vtable::ValidityHelper;
use vortex_array::vtable::ValidityVTableFromChild;
use vortex_array::vtable::VisitorVTable;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBuffer;
use vortex_dtype::DType;
use vortex_dtype::Nullability;
use vortex_dtype::PType;
Expand All @@ -59,6 +62,9 @@ vtable!(FSST);
pub struct FSSTMetadata {
#[prost(enumeration = "PType", tag = "1")]
uncompressed_lengths_ptype: i32,

#[prost(enumeration = "PType", tag = "2")]
codes_offsets_ptype: i32,
}

impl FSSTMetadata {
Expand All @@ -85,8 +91,8 @@ impl VTable for FSSTVTable {

fn metadata(array: &FSSTArray) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(FSSTMetadata {
uncompressed_lengths_ptype: PType::try_from(array.uncompressed_lengths().dtype())?
as i32,
uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
codes_offsets_ptype: array.codes.offsets().dtype().as_ptype().into(),
}))
}

Expand All @@ -100,48 +106,118 @@ impl VTable for FSSTVTable {
))
}

fn append_to_builder(
array: &FSSTArray,
builder: &mut dyn ArrayBuilder,
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
builder.extend_from_array(&array.to_array().execute::<Canonical>(ctx)?.into_array());
return Ok(());
};

// Decompress the whole block of data into a new buffer, and create some views
// from it instead.
let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;

builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask()?);
Ok(())
}

fn build(
dtype: &DType,
len: usize,
metadata: &Self::Metadata,
buffers: &[BufferHandle],
children: &dyn ArrayChildren,
) -> VortexResult<FSSTArray> {
if buffers.len() != 2 {
vortex_bail!(InvalidArgument: "Expected 2 buffers, got {}", buffers.len());
}
let symbols = Buffer::<Symbol>::from_byte_buffer(buffers[0].clone().try_to_host_sync()?);
let symbol_lengths = Buffer::<u8>::from_byte_buffer(buffers[1].clone().try_to_host_sync()?);

if children.len() != 2 {
vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
// Check for the legacy deserialization path.
if buffers.len() == 2 {
if children.len() != 2 {
vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
}
let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
let codes = codes
.as_opt::<VarBinVTable>()
.ok_or_else(|| {
vortex_err!(
"Expected VarBinArray for codes, got {}",
codes.encoding_id()
)
})?
.clone();
let uncompressed_lengths = children.get(
1,
&DType::Primitive(
metadata.0.get_uncompressed_lengths_ptype()?,
Nullability::NonNullable,
),
len,
)?;

return FSSTArray::try_new(
dtype.clone(),
symbols,
symbol_lengths,
codes,
uncompressed_lengths,
);
}
let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
let codes = codes
.as_opt::<VarBinVTable>()
.ok_or_else(|| {
vortex_err!(
"Expected VarBinArray for codes, got {}",
codes.encoding_id()
)
})?
.clone();
let uncompressed_lengths = children.get(
1,
&DType::Primitive(
metadata.0.get_uncompressed_lengths_ptype()?,
Nullability::NonNullable,
),
len,
)?;

FSSTArray::try_new(
dtype.clone(),
symbols,
symbol_lengths,
codes,
uncompressed_lengths,
)

// Check for the current deserialization path.
if buffers.len() == 3 {
let uncompressed_lengths = children.get(
0,
&DType::Primitive(
metadata.0.get_uncompressed_lengths_ptype()?,
Nullability::NonNullable,
),
len,
)?;

let codes_buffer = ByteBuffer::from_byte_buffer(buffers[2].clone().try_to_host_sync()?);
let codes_offsets = children.get(
1,
&DType::Primitive(
PType::try_from(metadata.codes_offsets_ptype)?,
Nullability::NonNullable,
),
// VarBin offsets are len + 1
len + 1,
)?;

let codes_validity = if children.len() == 2 {
Validity::from(dtype.nullability())
} else if children.len() == 3 {
let validity = children.get(2, &Validity::DTYPE, len)?;
Validity::Array(validity)
} else {
vortex_bail!("Expected 0 or 1 child, got {}", children.len());
};

let codes = VarBinArray::try_new(
codes_offsets,
codes_buffer,
DType::Binary(dtype.nullability()),
codes_validity,
)?;

return FSSTArray::try_new(
dtype.clone(),
symbols,
symbol_lengths,
codes,
uncompressed_lengths,
);
}

vortex_bail!(
"InvalidArgument: Expected 2 or 3 buffers, got {}",
buffers.len()
);
}

fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
Expand Down Expand Up @@ -175,24 +251,6 @@ impl VTable for FSSTVTable {
Ok(())
}

fn append_to_builder(
array: &FSSTArray,
builder: &mut dyn ArrayBuilder,
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
builder.extend_from_array(&array.to_array().execute::<Canonical>(ctx)?.into_array());
return Ok(());
};

// Decompress the whole block of data into a new buffer, and create some views
// from it instead.
let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;

builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask()?);
Ok(())
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
canonicalize_fsst(array, ctx)
}
Expand Down Expand Up @@ -423,21 +481,39 @@ impl VisitorVTable<FSSTVTable> for FSSTVTable {
"symbol_lengths",
&BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
);
visitor.visit_buffer_handle("compressed_codes", array.codes.bytes_handle())
}

fn visit_children(array: &FSSTArray, visitor: &mut dyn ArrayChildVisitor) {
visitor.visit_child("codes", &array.codes().to_array());
visitor.visit_child("uncompressed_lengths", array.uncompressed_lengths());
visitor.visit_child("codes_offsets", array.codes.offsets());
visitor.visit_validity(array.codes.validity(), array.codes.len());
}
}

#[cfg(test)]
mod test {
use fsst::Compressor;
use fsst::Symbol;
use vortex_array::Array;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ProstMetadata;
use vortex_array::VortexSessionExecute;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::buffer::BufferHandle;
use vortex_array::test_harness::check_metadata;
use vortex_array::vtable::VTable;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_dtype::Nullability;
use vortex_dtype::PType;
use vortex_error::VortexError;

use crate::FSSTVTable;
use crate::array::FSSTMetadata;
use crate::fsst_compress_iter;

#[cfg_attr(miri, ignore)]
#[test]
Expand All @@ -446,7 +522,79 @@ mod test {
"fsst.metadata",
ProstMetadata(FSSTMetadata {
uncompressed_lengths_ptype: PType::U64 as i32,
codes_offsets_ptype: PType::I32 as i32,
}),
);
}

/// The original FSST array stored codes as a VarBinArray child and required that the child
/// have this encoding. Vortex forbids this kind of introspection, therefore we had to fix
/// the array to store the compressed offsets and compressed data buffer separately, and only
/// use VarBinArray to delegate behavior.
///
/// This test manually constructs an old-style FSST array and ensures that it can still be
/// deserialized.
#[test]
fn test_back_compat() {
let symbols = Buffer::<Symbol>::copy_from([
Symbol::from_slice(b"abc00000"),
Symbol::from_slice(b"defghijk"),
]);
let symbol_lengths = Buffer::<u8>::copy_from([3, 8]);

let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice());
let fsst_array = fsst_compress_iter(
[Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
2,
DType::Utf8(Nullability::NonNullable),
&compressor,
);

let compressed_codes = fsst_array.codes().clone();

// There were two buffers:
// 1. The 8 byte symbols
// 2. The symbol lengths as u8.
let buffers = [
BufferHandle::new_host(symbols.into_byte_buffer()),
BufferHandle::new_host(symbol_lengths.into_byte_buffer()),
];

// There were 2 children:
// 1. The compressed codes, stored as a VarBinArray.
// 2. The uncompressed lengths, stored as a Primitive array.
let children = vec![
compressed_codes.into_array(),
fsst_array.uncompressed_lengths().clone(),
];

let fsst = FSSTVTable::build(
&DType::Utf8(Nullability::NonNullable),
2,
&ProstMetadata(FSSTMetadata {
uncompressed_lengths_ptype: fsst_array
.uncompressed_lengths()
.dtype()
.as_ptype()
.into(),
// Legacy array did not store this field, use Protobuf default of 0.
codes_offsets_ptype: 0,
}),
&buffers,
&children.as_slice(),
)
.unwrap();

let decompressed = fsst
.into_array()
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
decompressed
.with_iterator(|it| {
assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref()));
assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref()));
Ok::<_, VortexError>(())
})
.unwrap()
}
}
13 changes: 13 additions & 0 deletions vortex-array/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,19 @@ pub trait ArrayChildren {
}
}

impl ArrayChildren for &[ArrayRef] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
let array = self[index].clone();
assert_eq!(array.len(), len);
assert_eq!(array.dtype(), dtype);
Ok(array)
}

fn len(&self) -> usize {
<[_]>::len(self)
}
}

/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
/// It contains all the information from the serialized form, without anything extra. i.e.
/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
Expand Down
Loading