Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl VTable for ALPVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(
Expand Down
1 change: 1 addition & 0 deletions encodings/alp/src/alp_rd/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl VTable for ALPRDVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(
Expand Down
1 change: 1 addition & 0 deletions encodings/bytebool/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl VTable for ByteBoolVTable {
_bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(EmptyMetadata)
Expand Down
1 change: 1 addition & 0 deletions encodings/datetime-parts/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl VTable for DateTimePartsVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(
Expand Down
1 change: 1 addition & 0 deletions encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl VTable for DecimalBytePartsVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(DecimalBytesPartsMetadata::decode(bytes)?))
Expand Down
1 change: 1 addition & 0 deletions encodings/fastlanes/src/bitpacking/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl VTable for BitPackedVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
let inner = <ProstMetadata<BitPackedMetadata> as DeserializeMetadata>::deserialize(bytes)?;
Expand Down
1 change: 1 addition & 0 deletions encodings/fastlanes/src/delta/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl VTable for DeltaVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(DeltaMetadata::decode(bytes)?))
Expand Down
1 change: 1 addition & 0 deletions encodings/fastlanes/src/for/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl VTable for FoRVTable {
bytes: &[u8],
dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
let scalar_value = ScalarValue::from_proto_bytes(bytes, dtype)?;
Expand Down
1 change: 1 addition & 0 deletions encodings/fastlanes/src/rle/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl VTable for RLEVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(RLEMetadata::decode(bytes)?))
Expand Down
1 change: 1 addition & 0 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl VTable for FSSTVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(
Expand Down
1 change: 1 addition & 0 deletions encodings/pco/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl VTable for PcoVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
Expand Down
1 change: 1 addition & 0 deletions encodings/runend/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl VTable for RunEndVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
Expand Down
1 change: 1 addition & 0 deletions encodings/sequence/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl VTable for SequenceVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(
Expand Down
1 change: 1 addition & 0 deletions encodings/sparse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl VTable for SparseVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(SparseMetadata::decode(bytes)?))
Expand Down
1 change: 1 addition & 0 deletions encodings/zigzag/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl VTable for ZigZagVTable {
_bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(EmptyMetadata)
Expand Down
1 change: 1 addition & 0 deletions encodings/zstd/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl VTable for ZstdVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(ZstdMetadata::decode(bytes)?))
Expand Down
1 change: 1 addition & 0 deletions encodings/zstd/src/zstd_buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ impl VTable for ZstdBuffersVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(ZstdBuffersMetadata::decode(bytes)?))
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/bool/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl VTable for BoolVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
let metadata = <Self::Metadata as DeserializeMetadata>::deserialize(bytes)?;
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/chunked/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl VTable for ChunkedVTable {
_bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(EmptyMetadata)
Expand Down
26 changes: 19 additions & 7 deletions vortex-array/src/arrays/constant/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,38 @@ impl ConstantArray {
#[cfg(test)]
mod tests {
use rstest::rstest;
use vortex_buffer::ByteBufferMut;
use vortex_dtype::DType;
use vortex_dtype::Nullability;
use vortex_dtype::PType;
use vortex_error::VortexResult;
use vortex_scalar::Scalar;
use vortex_scalar::ScalarValue;
use vortex_session::VortexSession;

use crate::arrays::ConstantArray;
use crate::arrays::constant::vtable::CONSTANT_INLINE_THRESHOLD;
use crate::arrays::constant::vtable::ConstantVTable;
use crate::buffer::BufferHandle;
use crate::vtable::VTable;

#[rstest]
#[case::below_threshold(CONSTANT_INLINE_THRESHOLD - 1, true)]
#[case::at_threshold(CONSTANT_INLINE_THRESHOLD, true)]
#[case::above_threshold(CONSTANT_INLINE_THRESHOLD + 1, false)]
fn test_metadata_inlining(
fn test_serialize_inlining(
#[case] nbytes: usize,
#[case] should_inline: bool,
) -> VortexResult<()> {
// UTF-8 scalar `nbytes` equals the string length.
let string = "x".repeat(nbytes);
let array = ConstantArray::new(Scalar::from(string.as_str()), 10);
let metadata = ConstantVTable::metadata(&array)?;
let serialized =
ConstantVTable::serialize(metadata)?.expect("serialize should produce Some bytes");

assert_eq!(
metadata.is_some(),
!serialized.is_empty(),
should_inline,
"scalar of {nbytes} bytes: expected inlined={should_inline}"
);
Expand All @@ -81,25 +88,30 @@ mod tests {
let session = VortexSession::empty();
let deserialized = ConstantVTable::deserialize(
&bytes,
&vortex_dtype::DType::Primitive(vortex_dtype::PType::I64, Nullability::NonNullable),
&DType::Primitive(PType::I64, Nullability::NonNullable),
5,
&[],
&session,
)?;

assert_eq!(deserialized.unwrap(), scalar);
assert_eq!(deserialized, scalar);
Ok(())
}

#[test]
fn test_empty_bytes_deserializes_to_none() -> VortexResult<()> {
fn test_empty_bytes_deserializes_from_buffer() -> VortexResult<()> {
let scalar_value = ScalarValue::from(42i32);
let buffer = ScalarValue::to_proto_bytes::<ByteBufferMut>(Some(&scalar_value)).freeze();
let buffer_handle = BufferHandle::new_host(buffer);
let session = VortexSession::empty();
let metadata = ConstantVTable::deserialize(
&[],
&vortex_dtype::DType::Primitive(vortex_dtype::PType::I32, Nullability::NonNullable),
&DType::Primitive(PType::I32, Nullability::NonNullable),
10,
&[buffer_handle],
&session,
)?;
assert!(metadata.is_none(), "empty bytes should deserialize to None");
assert_eq!(metadata, Scalar::from(42i32));
Ok(())
}
}
75 changes: 34 additions & 41 deletions vortex-array/src/arrays/constant/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,12 @@ pub(crate) const CONSTANT_INLINE_THRESHOLD: usize = 1024;
impl VTable for ConstantVTable {
type Array = ConstantArray;

/// Optional inlined scalar constant.
/// The scalar constant value.
///
/// When the scalar value is small enough (<= `CONSTANT_INLINE_THRESHOLD` bytes), it is stored
/// directly in the metadata to avoid an extra buffer allocation and potential
/// device-to-host copy during deserialization.
///
/// Currently, scalars are **always** stored in a separate buffer, regardless of if we inline a
/// small scalar into the metadata.
type Metadata = Option<Scalar>;
/// During serialization, scalars small enough (<= `CONSTANT_INLINE_THRESHOLD` bytes) are
/// inlined into the metadata bytes. Larger scalars are stored only in the buffer and
/// reconstructed from it during deserialization.
type Metadata = Scalar;

type ArrayVTable = Self;
type OperationsVTable = Self;
Expand All @@ -64,62 +61,58 @@ impl VTable for ConstantVTable {
}

fn metadata(array: &ConstantArray) -> VortexResult<Self::Metadata> {
let constant = array.scalar();

// If the scalar is small enough, we can simply carry it around as metadata.
Ok((constant.nbytes() <= CONSTANT_INLINE_THRESHOLD).then_some(constant.clone()))
Ok(array.scalar().clone())
}

fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
// If we do not have a scalar to serialize, just return empty bytes.
Ok(Some(metadata.map_or_else(Vec::new, |c| {
// Note that we **only** serialize the optional scalar value (not including the dtype).
ScalarValue::to_proto_bytes(c.value())
})))
// If the scalar is small enough, inline it into the metadata bytes.
// Note that we **only** serialize the scalar value (not including the dtype).
Ok(Some(if metadata.nbytes() <= CONSTANT_INLINE_THRESHOLD {
ScalarValue::to_proto_bytes(metadata.value())
} else {
// Large scalars are stored only in the buffer; return empty bytes.
Vec::new()
}))
}

fn deserialize(
bytes: &[u8],
dtype: &DType,
_len: usize,
buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
// Empty bytes indicates an old writer that didn't produce metadata.
if bytes.is_empty() {
return Ok(None);
}

// Otherwise, deserialize the constant scalar from the metadata.
let scalar_value = ScalarValue::from_proto_bytes(bytes, dtype)?;
Some(Scalar::try_new(dtype.clone(), scalar_value)).transpose()
}

fn build(
dtype: &DType,
len: usize,
metadata: &Self::Metadata,
buffers: &[BufferHandle],
_children: &dyn ArrayChildren,
) -> VortexResult<ConstantArray> {
// Prefer reading the scalar from inlined metadata to avoid device-to-host copies.
if let Some(constant) = metadata {
return Ok(ConstantArray::new(constant.clone(), len));
if !bytes.is_empty() {
// If metadata has been deserialized, then it means we can fast-path read the constant
// directly from the metadata bytes.
let scalar_value = ScalarValue::from_proto_bytes(bytes, dtype)?;
return Scalar::try_new(dtype.clone(), scalar_value);
}

// Otherwise, get the constant scalar from the buffers.
vortex_ensure!(
buffers.len() == 1,
"Expected 1 buffer, got {}",
"Expected 1 buffer for the constant scalar, got {}",
buffers.len()
);

// Otherwise, the scalar was too large to inline / serialize into metadata, so we
// reconstruct it now from the buffer.

let buffer = buffers[0].clone().try_to_host_sync()?;
let bytes: &[u8] = buffer.as_ref();

let scalar_value = ScalarValue::from_proto_bytes(bytes, dtype)?;
let scalar = Scalar::try_new(dtype.clone(), scalar_value)?;
Scalar::try_new(dtype.clone(), scalar_value)
}

Ok(ConstantArray::new(scalar, len))
fn build(
_dtype: &DType,
len: usize,
metadata: &Scalar,
_buffers: &[BufferHandle],
_children: &dyn ArrayChildren,
) -> VortexResult<ConstantArray> {
Ok(ConstantArray::new(metadata.clone(), len))
}

fn with_children(_array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/decimal/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl VTable for DecimalVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
let metadata = ProstMetadata::<DecimalMetadata>::deserialize(bytes)?;
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/dict/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl VTable for DictVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
let metadata = <Self::Metadata as DeserializeMetadata>::deserialize(bytes)?;
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/extension/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl VTable for ExtensionVTable {
_bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(EmptyMetadata)
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/filter/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl VTable for FilterVTable {
_bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
vortex_bail!("Filter array is not serializable")
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/fixed_size_list/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl VTable for FixedSizeListVTable {
_bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(EmptyMetadata)
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/list/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl VTable for ListVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/listview/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl VTable for ListViewVTable {
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
let metadata = <Self::Metadata as DeserializeMetadata>::deserialize(bytes)?;
Expand Down
Loading
Loading