Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions vortex-duckdb/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,19 @@ impl CopyFunction for VortexCopyFunction {

fn copy_to_sink(
bind_data: &Self::BindData,
init_global: &mut Self::GlobalState,
init_global: &Self::GlobalState,
_init_local: &mut Self::LocalState,
chunk: &mut DataChunkRef,
) -> VortexResult<()> {
let chunk = data_chunk_to_vortex(bind_data.fields.names(), chunk);
RUNTIME.block_on(async {
init_global
.sink
.as_mut()
.vortex_expect("sink closed early")
.send(chunk)
.await
.map_err(|e| vortex_err!("send error {}", e.to_string()))
})?;

let mut sink = init_global
.sink
.as_ref()
.ok_or_else(|| vortex_err!("sink closed early"))?
.clone();
RUNTIME
.block_on(sink.send(chunk))
.map_err(|e| vortex_err!("send error {}", e.to_string()))?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/duckdb/copy_function/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub(crate) unsafe extern "C-unwind" fn copy_to_sink_callback<T: CopyFunction>(
) {
let bind_data =
unsafe { bind_data.cast::<T::BindData>().as_ref() }.vortex_expect("bind_data null pointer");
let global_data = unsafe { global_data.cast::<T::GlobalState>().as_mut() }
let global_data = unsafe { global_data.cast::<T::GlobalState>().as_ref() }
.vortex_expect("bind_data null pointer");
let local_data = unsafe { local_data.cast::<T::LocalState>().as_mut() }
.vortex_expect("bind_data null pointer");
Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/duckdb/copy_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub trait CopyFunction: Sized + Debug {
/// The function is called during query execution and is responsible for consuming the output
fn copy_to_sink(
bind_data: &Self::BindData,
init_global: &mut Self::GlobalState,
init_global: &Self::GlobalState,
init_local: &mut Self::LocalState,
chunk: &mut DataChunkRef,
) -> VortexResult<()>;
Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/duckdb/table_function/cardinality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) unsafe extern "C-unwind" fn cardinality_callback<T: TableFunction>(
node_stats_out: *mut cpp::duckdb_vx_node_statistics,
) {
let bind_data =
unsafe { bind_data.cast::<T::BindData>().as_mut() }.vortex_expect("bind_data null pointer");
unsafe { bind_data.cast::<T::BindData>().as_ref() }.vortex_expect("bind_data null pointer");
let node_stats =
unsafe { node_stats_out.as_mut() }.vortex_expect("node_stats_out null pointer");

Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/duckdb/table_function/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub(crate) unsafe extern "C-unwind" fn init_local_callback<T: TableFunction>(
unsafe { init_input.as_ref() }.vortex_expect("init_input null pointer"),
);

let global_init_data = unsafe { global_init_data.cast::<T::GlobalState>().as_mut() }
let global_init_data = unsafe { global_init_data.cast::<T::GlobalState>().as_ref() }
.vortex_expect("global_init_data null pointer");

match T::init_local(&init_input, global_init_data) {
Expand Down
12 changes: 6 additions & 6 deletions vortex-duckdb/src/duckdb/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub trait TableFunction: Sized + Debug {
client_context: &ClientContextRef,
bind_data: &Self::BindData,
init_local: &mut Self::LocalState,
init_global: &mut Self::GlobalState,
init_global: &Self::GlobalState,
chunk: &mut DataChunkRef,
) -> VortexResult<()>;

Expand All @@ -105,14 +105,14 @@ pub trait TableFunction: Sized + Debug {
/// is thread-local.
fn init_local(
init: &TableInitInput<Self>,
global: &mut Self::GlobalState,
global: &Self::GlobalState,
) -> VortexResult<Self::LocalState>;

/// Return table scanning progress from 0. to 100.
fn table_scan_progress(
client_context: &ClientContextRef,
bind_data: &mut Self::BindData,
global_state: &mut Self::GlobalState,
bind_data: &Self::BindData,
global_state: &Self::GlobalState,
) -> f64;

/// Pushes down a filter expression to the table function.
Expand All @@ -136,7 +136,7 @@ pub trait TableFunction: Sized + Debug {
/// This *must* be globally unique.
fn partition_data(
_bind_data: &Self::BindData,
_global_init_data: &mut Self::GlobalState,
_global_init_data: &Self::GlobalState,
_local_init_data: &mut Self::LocalState,
) -> VortexResult<u64>;

Expand Down Expand Up @@ -255,7 +255,7 @@ unsafe extern "C-unwind" fn function<T: TableFunction>(
) {
let client_context = unsafe { ClientContext::borrow(duckdb_client_context) };
let bind_data = unsafe { &*(bind_data as *const T::BindData) };
let global_init_data = unsafe { global_init_data.cast::<T::GlobalState>().as_mut() }
let global_init_data = unsafe { global_init_data.cast::<T::GlobalState>().as_ref() }
.vortex_expect("global_init_data null pointer");
let local_init_data = unsafe { local_init_data.cast::<T::LocalState>().as_mut() }
.vortex_expect("local_init_data null pointer");
Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/duckdb/table_function/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) unsafe extern "C-unwind" fn get_partition_data_callback<T: TableFunct
) -> idx_t {
let bind_data =
unsafe { bind_data.cast::<T::BindData>().as_ref() }.vortex_expect("bind_data null pointer");
let global_init_data = unsafe { global_init_data.cast::<T::GlobalState>().as_mut() }
let global_init_data = unsafe { global_init_data.cast::<T::GlobalState>().as_ref() }
.vortex_expect("global_init_data null pointer");
let local_init_data = unsafe { local_init_data.cast::<T::LocalState>().as_mut() }
.vortex_expect("local_init_data null pointer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pub(crate) unsafe extern "C-unwind" fn table_scan_progress_callback<T: TableFunc
) -> f64 {
let ctx = unsafe { crate::duckdb::ClientContext::borrow(ctx) };
let bind_data =
unsafe { bind_data.cast::<T::BindData>().as_mut() }.vortex_expect("bind_data null pointer");
let global_state = unsafe { global_state.cast::<T::GlobalState>().as_mut() }
unsafe { bind_data.cast::<T::BindData>().as_ref() }.vortex_expect("bind_data null pointer");
let global_state = unsafe { global_state.cast::<T::GlobalState>().as_ref() }
.vortex_expect("global_init_data null pointer");
T::table_scan_progress(ctx, bind_data, global_state)
}
10 changes: 5 additions & 5 deletions vortex-duckdb/src/e2e_test/object_cache_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ impl TableFunction for TestTableFunction {

fn table_scan_progress(
_client_context: &ClientContextRef,
_bind_data: &mut Self::BindData,
_global_state: &mut Self::GlobalState,
_bind_data: &Self::BindData,
_global_state: &Self::GlobalState,
) -> f64 {
100.0
}
Expand All @@ -76,7 +76,7 @@ impl TableFunction for TestTableFunction {
_client_context: &ClientContextRef,
_bind_data: &Self::BindData,
_local_state: &mut Self::LocalState,
_global_state: &mut Self::GlobalState,
_global_state: &Self::GlobalState,
chunk: &mut DataChunkRef,
) -> VortexResult<()> {
chunk.set_len(0);
Expand All @@ -100,14 +100,14 @@ impl TableFunction for TestTableFunction {

fn init_local(
_init: &TableInitInput<Self>,
_global: &mut Self::GlobalState,
_global: &Self::GlobalState,
) -> VortexResult<Self::LocalState> {
Ok(TestLocalState)
}

fn partition_data(
_bind_data: &Self::BindData,
_global_init_data: &mut Self::GlobalState,
_global_init_data: &Self::GlobalState,
_local_init_data: &mut Self::LocalState,
) -> VortexResult<u64> {
Ok(0)
Expand Down
18 changes: 16 additions & 2 deletions vortex-duckdb/src/exporter/all_invalid.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex::array::ExecutionCtx;
use vortex::error::VortexResult;
use vortex::error::vortex_ensure;

Expand All @@ -22,7 +23,13 @@ pub(crate) fn new_exporter(len: usize, logical_type: &LogicalTypeRef) -> Box<dyn
}

impl ColumnExporter for AllInvalidExporter {
fn export(&self, offset: usize, len: usize, vector: &mut VectorRef) -> VortexResult<()> {
fn export(
&self,
offset: usize,
len: usize,
vector: &mut VectorRef,
_ctx: &mut ExecutionCtx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bonus feature

) -> VortexResult<()> {
vortex_ensure!(
offset + len <= self.len,
"invalid exporter: offset + len must be less than or equal to len"
Expand All @@ -36,8 +43,10 @@ impl ColumnExporter for AllInvalidExporter {
#[cfg(test)]
mod tests {
use vortex::array::arrays::PrimitiveArray;
use vortex_array::VortexSessionExecute;

use super::*;
use crate::SESSION;
use crate::duckdb::DataChunk;
use crate::duckdb::LogicalType;

Expand All @@ -49,7 +58,12 @@ mod tests {
let mut chunk = DataChunk::new([ltype.clone()]);

new_exporter(arr.len(), &ltype)
.export(0, 3, chunk.get_vector_mut(0))
.export(
0,
3,
chunk.get_vector_mut(0),
&mut SESSION.create_execution_ctx(),
)
.unwrap();
chunk.set_len(3);

Expand Down
28 changes: 19 additions & 9 deletions vortex-duckdb/src/exporter/bool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ pub(crate) fn new_exporter(
}

impl ColumnExporter for BoolExporter {
fn export(&self, offset: usize, len: usize, vector: &mut VectorRef) -> VortexResult<()> {
fn export(
&self,
offset: usize,
len: usize,
vector: &mut VectorRef,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
// DuckDB uses byte bools, not bit bools.
// maybe we can convert into these from a compressed array sometimes?.
unsafe { vector.as_slice_mut(len) }.copy_from_slice(
Expand Down Expand Up @@ -68,10 +74,11 @@ mod tests {
fn test_bool() {
let arr = BoolArray::from_iter([true, false, true]);
let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_BOOLEAN)]);
let mut ctx = SESSION.create_execution_ctx();

new_exporter(arr, &mut SESSION.create_execution_ctx())
new_exporter(arr, &mut ctx)
.unwrap()
.export(1, 2, chunk.get_vector_mut(0))
.export(1, 2, chunk.get_vector_mut(0), &mut ctx)
.unwrap();
chunk.set_len(2);

Expand All @@ -88,10 +95,11 @@ mod tests {
let arr = BoolArray::from_iter([true; 128]);

let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_BOOLEAN)]);
let mut ctx = SESSION.create_execution_ctx();

new_exporter(arr, &mut SESSION.create_execution_ctx())
new_exporter(arr, &mut ctx)
.unwrap()
.export(1, 66, chunk.get_vector_mut(0))
.export(1, 66, chunk.get_vector_mut(0), &mut ctx)
.unwrap();
chunk.set_len(65);

Expand All @@ -111,10 +119,11 @@ mod tests {
let arr = BoolArray::from_iter([Some(true), None, Some(false)]);

let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_BOOLEAN)]);
let mut ctx = SESSION.create_execution_ctx();

new_exporter(arr, &mut SESSION.create_execution_ctx())
new_exporter(arr, &mut ctx)
.unwrap()
.export(1, 2, chunk.get_vector_mut(0))
.export(1, 2, chunk.get_vector_mut(0), &mut ctx)
.unwrap();
chunk.set_len(2);

Expand All @@ -131,10 +140,11 @@ mod tests {
let arr = BoolArray::from_iter([None; 3]);

let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_BOOLEAN)]);
let mut ctx = SESSION.create_execution_ctx();

new_exporter(arr, &mut SESSION.create_execution_ctx())
new_exporter(arr, &mut ctx)
.unwrap()
.export(1, 2, chunk.get_vector_mut(0))
.export(1, 2, chunk.get_vector_mut(0), &mut ctx)
.unwrap();
chunk.set_len(2);

Expand Down
8 changes: 7 additions & 1 deletion vortex-duckdb/src/exporter/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ pub(crate) fn new_exporter(array: ConstantArray) -> VortexResult<Box<dyn ColumnE
}

impl ColumnExporter for ConstantExporter {
fn export(&self, _offset: usize, len: usize, vector: &mut VectorRef) -> VortexResult<()> {
fn export(
&self,
_offset: usize,
len: usize,
vector: &mut VectorRef,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
match self.value.as_ref() {
None => {
// TODO(ngates): would be good if DuckDB supported constant null vectors.
Expand Down
39 changes: 34 additions & 5 deletions vortex-duckdb/src/exporter/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ where
D: ToPrimitive,
N: BigCast,
{
fn export(&self, offset: usize, len: usize, vector: &mut VectorRef) -> VortexResult<()> {
fn export(
&self,
offset: usize,
len: usize,
vector: &mut VectorRef,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
// Copy the values from the Vortex array to the DuckDB vector.
for (src, dst) in self.values[offset..offset + len]
.iter()
Expand All @@ -100,7 +106,13 @@ where
}

impl<D: NativeDecimalType> ColumnExporter for DecimalZeroCopyExporter<D> {
fn export(&self, offset: usize, len: usize, vector: &mut VectorRef) -> VortexResult<()> {
fn export(
&self,
offset: usize,
len: usize,
vector: &mut VectorRef,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
assert!(self.values.len() >= offset + len);

let pos = unsafe { self.values.as_ptr().add(offset) };
Expand Down Expand Up @@ -129,8 +141,10 @@ mod tests {
use vortex::array::arrays::DecimalArray;
use vortex::dtype::DecimalDType;
use vortex::error::VortexExpect;
use vortex_array::VortexSessionExecute;

use super::*;
use crate::SESSION;
use crate::duckdb::DataChunk;
use crate::duckdb::LogicalType;

Expand Down Expand Up @@ -168,7 +182,12 @@ mod tests {

new_zero_copy_exporter(&arr)
.unwrap()
.export(0, 3, chunk.get_vector_mut(0))
.export(
0,
3,
chunk.get_vector_mut(0),
&mut SESSION.create_execution_ctx(),
)
.unwrap();
chunk.set_len(3);

Expand Down Expand Up @@ -196,7 +215,12 @@ mod tests {
// Export first 3 elements
new_zero_copy_exporter(&arr)
.unwrap()
.export(0, 3, chunk.get_vector_mut(0))
.export(
0,
3,
chunk.get_vector_mut(0),
&mut SESSION.create_execution_ctx(),
)
.unwrap();
chunk.set_len(3);

Expand All @@ -221,7 +245,12 @@ mod tests {

new_zero_copy_exporter(&arr)
.unwrap()
.export(0, 3, chunk.get_vector_mut(0))
.export(
0,
3,
chunk.get_vector_mut(0),
&mut SESSION.create_execution_ctx(),
)
.unwrap();
chunk.set_len(3);

Expand Down
Loading
Loading