Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10460,6 +10460,8 @@ pub fn vortex_array::serde::ArrayParts::from_array_tree(array_tree: impl core::c

pub fn vortex_array::serde::ArrayParts::from_flatbuffer_and_segment(array_tree: vortex_buffer::ByteBuffer, segment: vortex_array::buffer::BufferHandle) -> vortex_error::VortexResult<Self>

pub fn vortex_array::serde::ArrayParts::from_flatbuffer_with_buffers(array_tree: impl core::convert::Into<vortex_buffer::ByteBuffer>, buffers: alloc::vec::Vec<vortex_array::buffer::BufferHandle>) -> vortex_error::VortexResult<Self>

pub fn vortex_array::serde::ArrayParts::metadata(&self) -> &[u8]

pub fn vortex_array::serde::ArrayParts::nbuffers(&self) -> usize
Expand Down
89 changes: 52 additions & 37 deletions vortex-array/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,35 @@ impl ArrayParts {
.unwrap_or_default()
}

/// Validate and align the array tree flatbuffer, returning the aligned buffer and root location.
fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
let fb_buffer = FlatBuffer::align_from(array_tree.into());
let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
let fb_root = fb_array
.root()
.ok_or_else(|| vortex_err!("Array must have a root node"))?;
let flatbuffer_loc = fb_root._tab.loc();
Ok((fb_buffer, flatbuffer_loc))
}

/// Create an [`ArrayParts`] from a pre-existing array tree flatbuffer and pre-resolved buffer
/// handles.
///
/// The caller is responsible for resolving buffers from whatever source (device segments, host
/// overrides, or a mix). The buffers must be in the same order as the `Array.buffers` descriptor
/// list in the flatbuffer.
pub fn from_flatbuffer_with_buffers(
array_tree: impl Into<ByteBuffer>,
buffers: Vec<BufferHandle>,
) -> VortexResult<Self> {
let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
Ok(ArrayParts {
flatbuffer,
flatbuffer_loc,
buffers: buffers.into(),
})
}

/// Create an [`ArrayParts`] from a raw array tree flatbuffer (metadata only).
///
/// This constructor creates an `ArrayParts` with no buffer data, useful for
Expand All @@ -470,15 +499,9 @@ impl ArrayParts {
/// Note: Calling `buffer()` on the returned `ArrayParts` will fail since
/// no actual buffer data is available.
pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
let fb_buffer = FlatBuffer::align_from(array_tree.into());
let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
let fb_root = fb_array
.root()
.ok_or_else(|| vortex_err!("Array must have a root node"))?;
let flatbuffer_loc = fb_root._tab.loc();

let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
Ok(ArrayParts {
flatbuffer: fb_buffer,
flatbuffer,
flatbuffer_loc,
buffers: Arc::new([]),
})
Expand Down Expand Up @@ -510,35 +533,27 @@ impl ArrayParts {
// for host-resident buffers. Device buffers are sliced directly.
let segment = segment.ensure_aligned(Alignment::none())?;

let fb_buffer = FlatBuffer::align_from(array_tree);

// Parse the flatbuffer to extract buffer descriptors and root location.
let (flatbuffer_loc, buffers) = {
let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
let fb_root = fb_array.root().vortex_expect("Array must have a root node");
let flatbuffer_loc = fb_root._tab.loc();

let mut offset = 0;
let buffers = fb_array
.buffers()
.unwrap_or_default()
.iter()
.map(|fb_buf| {
// Skip padding
offset += fb_buf.padding() as usize;

let buffer_len = fb_buf.length() as usize;

// Extract a buffer and ensure it's aligned, copying if necessary
let buffer = segment.slice(offset..(offset + buffer_len));
let buffer = buffer
.ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?;
offset += buffer_len;
Ok(buffer)
})
.collect::<VortexResult<Arc<[_]>>>()?;
(flatbuffer_loc, buffers)
};
// this can't return the validated array because there is no lifetime to give it, so we
// need to cast it below, which is safe.
let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
// SAFETY: fb_buffer was already validated by validate_array_tree above.
let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };

let mut offset = 0;
let buffers = fb_array
.buffers()
.unwrap_or_default()
.iter()
.map(|fb_buf| {
offset += fb_buf.padding() as usize;
let buffer_len = fb_buf.length() as usize;
let buffer = segment.slice(offset..(offset + buffer_len));
let buffer =
buffer.ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?;
offset += buffer_len;
Ok(buffer)
})
.collect::<VortexResult<Arc<[_]>>>()?;

Ok(ArrayParts {
flatbuffer: fb_buffer,
Expand Down
4 changes: 4 additions & 0 deletions vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ arrow-schema = { workspace = true, features = ["ffi"] }
async-trait = { workspace = true }
cudarc = { workspace = true, features = ["f16"] }
fastlanes = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["executor"] }
kanal = { workspace = true }
paste = { workspace = true }
prost = { workspace = true }
tracing = { workspace = true }
vortex-alp = { workspace = true }
vortex-array = { workspace = true }
Expand All @@ -42,7 +44,9 @@ vortex-decimal-byte-parts = { workspace = true }
vortex-dtype = { workspace = true, features = ["cudarc"] }
vortex-error = { workspace = true }
vortex-fastlanes = { workspace = true }
vortex-flatbuffers = { workspace = true }
vortex-io = { workspace = true }
vortex-layout = { workspace = true }
vortex-mask = { workspace = true }
vortex-nvcomp = { path = "nvcomp" }
vortex-runend = { workspace = true }
Expand Down
Loading
Loading