Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -521,12 +521,16 @@ jobs:
with:
tool: nextest
- name: Rust Tests
env:
# Outline flat layouts for cuda scan.
FLAT_LAYOUT_INLINE_ARRAY_NODE: true
run: |
# Build with full debug info first (helps with caching)
cargo +nightly build --locked -p vortex-cuda --all-features --target x86_64-unknown-linux-gnu
cargo +nightly build --locked -p vortex-cuda -p vortex-file --all-features --target x86_64-unknown-linux-gnu
# Run tests with sanitizers and debug output
cargo +nightly nextest run \
--locked \
-p vortex-file \
-p vortex-cuda \
--all-features \
--no-fail-fast \
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions vortex-array/src/arrays/decimal/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,11 @@ impl VTable for DecimalVTable {

match_each_decimal_value_type!(metadata.values_type(), |D| {
// Check and reinterpret-cast the buffer
if let Some(buffer) = values.as_host_opt() {
vortex_ensure!(
buffer.is_aligned(Alignment::of::<D>()),
"DecimalArray buffer not aligned for values type {:?}",
D::DECIMAL_TYPE
);
}
vortex_ensure!(
values.is_aligned_to(Alignment::of::<D>()),
"DecimalArray buffer not aligned for values type {:?}",
D::DECIMAL_TYPE
);
DecimalArray::try_new_handle(values, metadata.values_type(), *decimal_dtype, validity)
})
}
Expand Down
16 changes: 6 additions & 10 deletions vortex-array/src/arrays/primitive/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,12 @@ impl VTable for PrimitiveVTable {
);
}

// For host buffers, we eagerly check alignment on construction.
// TODO(aduffy): check for device buffers. CUDA buffers are generally 256-byte aligned,
// but not sure about other devices.
if let Some(host_buf) = buffer.as_host_opt() {
vortex_ensure!(
host_buf.is_aligned(Alignment::new(ptype.byte_width())),
"PrimitiveArray::build: Buffer must be aligned to {}",
ptype.byte_width()
);
}
vortex_ensure!(
buffer.is_aligned_to(Alignment::new(ptype.byte_width())),
"PrimitiveArray::build: Buffer (align={}) must be aligned to {}",
buffer.alignment(),
ptype.byte_width()
);

// SAFETY: checked ahead of time
unsafe {
Expand Down
33 changes: 33 additions & 0 deletions vortex-array/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
/// Returns the length of the buffer in bytes.
fn len(&self) -> usize;

/// Returns the alignment of the buffer.
fn alignment(&self) -> Alignment;

/// Returns true if the buffer is empty.
fn is_empty(&self) -> bool {
self.len() == 0
Expand Down Expand Up @@ -81,6 +84,13 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
/// Create a new buffer that references a subrange of this buffer at the given
/// slice indices.
fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;

/// Return a buffer with the given alignment. Where possible, this will be zero-copy.
///
/// # Errors
///
/// Returns an error if the buffer cannot be aligned (e.g., allocation or copy failure).
fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>>;
}

impl Hash for dyn DeviceBuffer {
Expand Down Expand Up @@ -130,6 +140,29 @@ impl BufferHandle {
}
}

/// Returns the alignment of the buffer.
pub fn alignment(&self) -> Alignment {
match &self.0 {
Inner::Host(bytes) => bytes.alignment(),
Inner::Device(device) => device.alignment(),
}
}

/// Returns true if the buffer is aligned to the given alignment.
pub fn is_aligned_to(&self, alignment: Alignment) -> bool {
self.alignment().is_aligned_to(alignment)
}

/// Ensure the buffer satisfies the requested alignment.
///
/// Both host and device buffers will be copied if necessary to satisfy the alignment.
pub fn ensure_aligned(self, alignment: Alignment) -> VortexResult<Self> {
match self.0 {
Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.aligned(alignment))),
Inner::Device(device) => Ok(BufferHandle::new_device(device.aligned(alignment)?)),
}
}

/// Check if the buffer is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
Expand Down
21 changes: 9 additions & 12 deletions vortex-array/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,9 @@ impl ArrayParts {
array_tree: ByteBuffer,
segment: BufferHandle,
) -> VortexResult<Self> {
// TODO: this can also work with device buffers.
let segment = segment.try_to_host_sync()?;
// We align each buffer individually, so we remove alignment requirements on the buffer.
let segment = segment.aligned(Alignment::none());
// We align each buffer individually, so we remove alignment requirements on the segment
// for host-resident buffers. Device buffers are sliced directly.
let segment = segment.ensure_aligned(Alignment::none())?;

let fb_buffer = FlatBuffer::align_from(array_tree);

Expand All @@ -504,7 +503,7 @@ impl ArrayParts {
let flatbuffer_loc = fb_root._tab.loc();

let mut offset = 0;
let buffers: Arc<[_]> = fb_array
let buffers = fb_array
.buffers()
.unwrap_or_default()
.iter()
Expand All @@ -515,15 +514,13 @@ impl ArrayParts {
let buffer_len = fb_buf.length() as usize;

// Extract a buffer and ensure it's aligned, copying if necessary
let buffer = segment
.slice(offset..(offset + buffer_len))
.aligned(Alignment::from_exponent(fb_buf.alignment_exponent()));

let buffer = segment.slice(offset..(offset + buffer_len));
let buffer = buffer
.ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?;
offset += buffer_len;
BufferHandle::new_host(buffer)
Ok(buffer)
})
.collect();

.collect::<VortexResult<Arc<[_]>>>()?;
(flatbuffer_loc, buffers)
};

Expand Down
2 changes: 1 addition & 1 deletion vortex-buffer/src/alignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Alignment {
Self::new(align_of::<T>())
}

/// Check if this alignment is a "larger" than another alignment.
/// Check if `self` alignment is a "larger" than `other` alignment.
///
/// ## Example
///
Expand Down
5 changes: 4 additions & 1 deletion vortex-buffer/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ impl<T> Buffer<T> {
let end_byte = end * size_of::<T>();

if !begin_byte.is_multiple_of(*alignment) {
vortex_panic!("range start must be aligned to {alignment:?}");
vortex_panic!(
"range start must be aligned to {alignment:?}, byte {}",
begin_byte
);
}
if !alignment.is_aligned_to(Alignment::of::<T>()) {
vortex_panic!("Slice alignment must at least align to type T")
Expand Down
1 change: 1 addition & 0 deletions vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ vortex-decimal-byte-parts = { workspace = true }
vortex-dtype = { workspace = true, features = ["cudarc"] }
vortex-error = { workspace = true }
vortex-fastlanes = { workspace = true }
vortex-io = { workspace = true }
vortex-mask = { workspace = true }
vortex-nvcomp = { path = "nvcomp" }
vortex-session = { workspace = true }
Expand Down
42 changes: 42 additions & 0 deletions vortex-cuda/src/device_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::cmp::min;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
Expand All @@ -16,8 +17,10 @@ use vortex_array::buffer::DeviceBuffer;
use vortex_buffer::Alignment;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;

use crate::stream::await_stream_callback;

Expand All @@ -27,6 +30,8 @@ pub struct CudaDeviceBuffer<T> {
offset: usize,
len: usize,
device_ptr: u64,
// This is the min required alignment of the buffer.
alignment: Alignment,
}

impl<T: DeviceRepr> CudaDeviceBuffer<T> {
Expand All @@ -40,6 +45,7 @@ impl<T: DeviceRepr> CudaDeviceBuffer<T> {
offset: 0,
len,
device_ptr,
alignment: Alignment::of::<T>(),
}
}

Expand Down Expand Up @@ -109,6 +115,10 @@ impl<T: DeviceRepr + Send + Sync + 'static> DeviceBuffer for CudaDeviceBuffer<T>
self.len * size_of::<T>()
}

fn alignment(&self) -> Alignment {
self.alignment
}

/// Synchronous copy of CUDA device to host memory.
///
/// The copy is not started before other operations on the streams are completed.
Expand Down Expand Up @@ -185,6 +195,20 @@ impl<T: DeviceRepr + Send + Sync + 'static> DeviceBuffer for CudaDeviceBuffer<T>
fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
let new_offset = self.offset + range.start;
let new_len = range.end - range.start;
let byte_offset = new_offset * size_of::<T>();

let trailing = (self.device_ptr + byte_offset as u64).trailing_zeros();
let exponent =
u8::try_from(min(15, trailing)).vortex_expect("min(15, x) always fits in u8");
let slice_align = Alignment::from_exponent(exponent);
let alignment = Alignment::of::<T>();

assert!(
slice_align.is_aligned_to(alignment),
"slice must respect minimum alignment byte {}, min {}",
slice_align,
alignment
);

assert!(
range.end <= self.len,
Expand All @@ -198,10 +222,28 @@ impl<T: DeviceRepr + Send + Sync + 'static> DeviceBuffer for CudaDeviceBuffer<T>
offset: new_offset,
len: new_len,
device_ptr: self.device_ptr,
alignment: self.alignment,
})
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>> {
let effective_ptr = self.device_ptr + (self.offset * size_of::<T>()) as u64;
if effective_ptr % (*alignment as u64) == 0 {
Ok(Arc::new(CudaDeviceBuffer {
inner: self.inner.clone(),
offset: self.offset,
len: self.len,
device_ptr: self.device_ptr,
alignment,
}))
} else if alignment > Alignment::new(256) {
vortex_panic!("we do not support alignment greater than 256")
} else {
vortex_panic!("some how we alloc a cuda buffer with alignment less than 256")
}
}
}
Loading
Loading