Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vortex-array/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct BufferHandle(Inner);
enum Inner {
/// On the host/cpu.
Host(ByteBuffer),
/// On the device.
/// On the device/gpu.
Device(Arc<dyn DeviceBuffer>),
}

Expand Down
2 changes: 2 additions & 0 deletions vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ workspace = true
[dependencies]
async-trait = { workspace = true }
cudarc = { workspace = true }
futures = { workspace = true }
kanal = { workspace = true }
tracing = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion vortex-cuda/src/device_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl CudaBufferExt for BufferHandle {
fn cuda_view<T: DeviceRepr + Send + Sync + 'static>(&self) -> VortexResult<CudaView<'_, T>> {
let device_buffer = self
.as_device_opt()
.ok_or_else(|| vortex_err!("Buffer is not on device, call ensure_on_device first"))?;
.ok_or_else(|| vortex_err!("Buffer is not on device"))?;

let cuda_buf = device_buffer
.as_any()
Expand Down
152 changes: 127 additions & 25 deletions vortex-cuda/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@ use cudarc::driver::CudaEvent;
use cudarc::driver::CudaFunction;
use cudarc::driver::CudaSlice;
use cudarc::driver::CudaStream;
use cudarc::driver::DevicePtrMut;
use cudarc::driver::DeviceRepr;
use cudarc::driver::DriverError;
use cudarc::driver::LaunchArgs;
use cudarc::driver::ValidAsZeroBits;
use cudarc::driver::result;
use cudarc::driver::result::memcpy_htod_async;
use cudarc::driver::sys;
use cudarc::driver::sys::CUevent_flags;
use futures::future::BoxFuture;
use kanal::Sender;
use result::stream;
use vortex_array::Array;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::VortexSessionExecute;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Buffer;
use vortex_dtype::PType;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
Expand All @@ -28,6 +36,76 @@ use crate::CudaDeviceBuffer;
use crate::CudaSession;
use crate::session::CudaSessionExt;

/// Registers a callback and asynchronously waits for its completion.
///
/// This function can be used to asynchronously wait for events previously
/// submitted to the stream to complete, e.g. async device buffer allocations.
///
/// Note: This is not equivalent to calling sync on a stream but only awaits
/// the registered callback to complete.
///
/// # Arguments
///
/// * `stream` - The CUDA stream to wait on
pub async fn await_stream_callback(stream: &CudaStream) -> Result<(), DriverError> {
let rx = register_stream_callback(stream)?;

rx.recv()
.await
.map_err(|_| DriverError(sys::CUresult::CUDA_ERROR_UNKNOWN))
}

/// Registers a host function callback on the stream.
///
/// # Returns
///
/// An async receiver that receives a message when all preceding work on the
/// stream completes.
///
/// # Errors
///
/// Returns an error if registering the host callback function fails.
fn register_stream_callback(stream: &CudaStream) -> Result<kanal::AsyncReceiver<()>, DriverError> {
let (tx, rx) = kanal::bounded::<()>(1);

// There are 2 different scenarios how `tx` gets freed. When the callback
// is invoked or during cleanup in case the registration fails.
let tx_ptr = Box::into_raw(Box::new(tx));

/// Called from CUDA driver thread when all preceding work on the stream completes.
unsafe extern "C" fn callback(user_data: *mut std::ffi::c_void) {
// SAFETY: The memory of `tx` is manually managed has not been freed
// before. We have unique ownership and can therefore free it.
let tx = unsafe { Box::from_raw(user_data as *mut Sender<()>) };

// Blocking send as we're in a callback invoked by the CUDA driver.
#[expect(clippy::expect_used)]
tx.send(())
// A send should never fail. Panic otherwise.
.expect("CUDA callback receiver dropped unexpectedly");
}

// SAFETY:
// 1. Valid handle from the borrowed `CudaStream`.
// 2. Valid function pointer with the the correct signature
// 3. Valid user data pointer which is consumed exactly once
unsafe {
stream::launch_host_function(
stream.cu_stream(),
callback,
tx_ptr as *mut std::ffi::c_void,
)
.inspect_err(|_| {
// SAFETY: Registration failed, so callback will never run.
// Therefore, we need to free the `user_data` passed to the
// callback in the error case.
drop(Box::from_raw(tx_ptr));
})?;
}

Ok(rx.to_async())
}

/// CUDA kernel events recorded before and after kernel launch.
#[derive(Debug)]
pub struct CudaKernelEvents {
Expand Down Expand Up @@ -145,16 +223,18 @@ impl CudaExecutionCtx {
}

/// Allocates a typed buffer on the GPU.
pub fn alloc<T: DeviceRepr + ValidAsZeroBits>(&self, len: usize) -> VortexResult<CudaSlice<T>> {
///
/// Note: Allocation is async in case the CUDA driver supports this.
///
/// The condition for alloc to be async is support for memory pools:
/// `CU_DEVICE_ATTRIBUTE_MEMORY_POOLS_SUPPORTED`.
///
/// Any kernel submitted to the stream after alloc can safely use the
/// memory, as operations on the stream are ordered sequentially.
pub fn device_alloc<T: DeviceRepr>(&self, len: usize) -> VortexResult<CudaSlice<T>> {
// SAFETY: No safety guarantees for allocations on the GPU.
unsafe {
self.stream
// Note that alloc is async in case the device and driver support this.
//
// The condition for alloc to be async is support for memory pools:
// `CU_DEVICE_ATTRIBUTE_MEMORY_POOLS_SUPPORTED`. Any kernel
// submitted to the stream after alloc can safely use the memory,
// as operations on the stream are ordered sequentially.
.alloc::<T>(len)
.map_err(|e| vortex_err!("Failed to allocate device memory: {}", e))
}
Expand Down Expand Up @@ -197,30 +277,52 @@ impl CudaExecutionCtx {
Ok(CudaDeviceBuffer::new(cuda_slice))
}

/// Ensures the buffer is on the CUDA device.
/// Copies a pinned host buffer to the device asynchronously.
///
/// Allocates device memory, schedules an async copy, and returns a future
/// that completes when the copy is finished.
///
/// # Arguments
///
/// * `handle` - The host buffer to copy. Must be a host buffer (not already on device).
///
/// # Safety
///
/// Copies the data from host to device if the input buffer is on the host.
pub fn ensure_on_device<T: DeviceRepr + Send + Sync + 'static>(
/// The returned future captures the source `BufferHandle` to keep the host
/// memory alive until the copy completes.
pub fn copy_buffer_to_device_async<T: DeviceRepr + Send + Sync + 'static>(
&self,
handle: &BufferHandle,
) -> VortexResult<BufferHandle> {
if handle.is_on_device() {
return Ok(handle.clone());
}

handle: BufferHandle,
) -> VortexResult<BoxFuture<'static, VortexResult<BufferHandle>>> {
let host_buffer = handle
.as_host_opt()
.ok_or_else(|| vortex_err!("Buffer is neither on host nor device"))?;

let typed_slice: &[T] = unsafe {
std::slice::from_raw_parts(
host_buffer.as_ptr().cast(),
host_buffer.len() / size_of::<T>(),
)
};
let mut cuda_slice: CudaSlice<T> = self.device_alloc(host_buffer.len() / size_of::<T>())?;
let device_ptr = cuda_slice.device_ptr_mut(&self.stream).0;

let typed_buffer: Buffer<T> = Buffer::from_byte_buffer(host_buffer.clone());
let src_slice: &[T] = typed_buffer.as_slice();

unsafe {
memcpy_htod_async(device_ptr, src_slice, self.stream.cu_stream())
.map_err(|e| vortex_err!("Failed to schedule async copy to device: {}", e))?;
}

let cuda_buf = CudaDeviceBuffer::new(cuda_slice);
let stream = Arc::clone(&self.stream);

Ok(Box::pin(async move {
// Await async copy completion using callback-based async wait.
await_stream_callback(&stream)
.await
.map_err(|e| vortex_err!("CUDA stream wait failed: {}", e))?;

// Keep source memory alive until copy completes.
let _keep_alive = handle;

let cuda_buf = self.copy_buffer_to_device(typed_slice)?;
Ok(BufferHandle::new_device(Arc::new(cuda_buf)))
Ok(BufferHandle::new_device(Arc::new(cuda_buf)))
}))
}
}

Expand Down
8 changes: 7 additions & 1 deletion vortex-cuda/src/for_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ async fn execute_for_typed<P: DeviceRepr + NativePType>(

let encoded = array.encoded().clone().execute_cuda(ctx).await?;
let (dtype, buffer_handle, validity, ..) = encoded.into_primitive().into_parts();
let device_buffer_handle = ctx.ensure_on_device::<P>(&buffer_handle)?;

let device_buffer_handle = if buffer_handle.is_on_device() {
buffer_handle
} else {
ctx.copy_buffer_to_device_async::<P>(buffer_handle)?.await?
};

let cuda_view = device_buffer_handle.cuda_view::<P>()?;
let array_len = array.len() as u64;

Expand Down
Loading