Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ fn main() -> anyhow::Result<()> {
args.exclude_queries.as_ref(),
);

if args.formats.is_empty() {
anyhow::bail!("provide a format with --formats");
}

// Generate Vortex files from Parquet for any Vortex formats requested
if benchmark.data_url().scheme() == "file" {
// This is ugly, but otherwise some complicated async interaction might result in a deadlock
Expand Down
20 changes: 20 additions & 0 deletions vortex-duckdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,23 @@ VX_DUCKDB_DEBUG=1 cargo test -p vortex-duckdb
# Link against the DuckDB debug build from source with ASAN & TSAN.
ASAN_OPTIONS=detect_container_overflow=0 VX_DUCKDB_DEBUG=1 VX_DUCKDB_SAN=1 cargo test -p vortex-duckdb
```

## Testing the extension with DuckDB

By default, our tests use a precompiled build which means you don't get an
.extension which you can load in DuckDB. If you want to test a full setup,

1. Clone [duckdb-vortex](https://github.com/vortex-data/duckdb-vortex)
repository.

2. If there is an api difference between duckdb-vortex's duckdb submodule and
vortex's vortex-duckdb/duckdb submodule, checkout duckdb-vortex to previous
commit. For example, if duckdb-vortex's HEAD uses 1.5 API but vortex's HEAD
uses 1.4.2, checkout duckdb-vortex at 8a41ee6ebd9.

3. Update duckdb-vortex's submodules. Replace vortex/ submodule by a softlink to
your local vortex repository.
4. Inside duckdb-vortex, run make -j.

./target/release/duckdb will be a duckdb instance with vortex-duckdb already
loaded.
4 changes: 3 additions & 1 deletion vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ typedef struct {
void *pushdown_expression;
duckdb_vx_string_map (*to_string)(void *bind_data);
// void *dynamic_to_string;
void *table_scan_progress;

double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);

idx_t (*get_partition_data)(const void *bind_data,
void *init_global_data,
void *init_local_data,
Expand Down
13 changes: 12 additions & 1 deletion vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ struct CTableBindResult {
vector<string> &names;
};

double c_table_scan_progress(ClientContext &context,
const FunctionData *bind_data,
const GlobalTableFunctionState *global_state) {
auto &bind = bind_data->Cast<CTableBindData>();
duckdb_client_context c_ctx = reinterpret_cast<duckdb_client_context>(&context);
void *const c_bind_data = bind.ffi_data->DataPtr();
void *const c_global_state = global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
return bind.info->vtab.table_scan_progress(c_ctx, c_bind_data, c_global_state);
}

unique_ptr<FunctionData> c_bind(ClientContext &context,
TableFunctionBindInput &input,
vector<LogicalType> &return_types,
Expand Down Expand Up @@ -351,6 +361,7 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
tf.get_partition_data = c_get_partition_data;
tf.get_virtual_columns = c_get_virtual_columns;
tf.to_string = c_to_string;
tf.table_scan_progress = c_table_scan_progress;

// Set up the parameters
tf.arguments.reserve(vtab->parameter_count);
Expand Down Expand Up @@ -398,4 +409,4 @@ extern "C" void duckdb_vx_string_map_free(duckdb_vx_string_map map) {
auto *cpp_map = reinterpret_cast<InsertionOrderPreservingMap<string> *>(map);
delete cpp_map;
}
} // namespace vortex
} // namespace vortex
11 changes: 10 additions & 1 deletion vortex-duckdb/src/duckdb/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod cardinality;
mod init;
mod partition;
mod pushdown_complex_filter;
mod table_scan_progress;
mod virtual_columns;

pub use bind::*;
Expand All @@ -30,6 +31,7 @@ use crate::duckdb::expr::Expression;
use crate::duckdb::table_function::cardinality::cardinality_callback;
use crate::duckdb::table_function::partition::get_partition_data_callback;
use crate::duckdb::table_function::pushdown_complex_filter::pushdown_complex_filter_callback;
use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback;
use crate::duckdb::table_function::virtual_columns::get_virtual_columns_callback;
use crate::duckdb_try;

Expand Down Expand Up @@ -103,6 +105,13 @@ pub trait TableFunction: Sized + Debug {
global: &mut Self::GlobalState,
) -> VortexResult<Self::LocalState>;

/// Return table scanning progress from 0. to 100.
fn table_scan_progress(
client_context: &ClientContext,
bind_data: &mut Self::BindData,
global_state: &mut Self::GlobalState,
) -> f64;

/// Pushes down a filter expression to the table function.
///
/// Returns `true` if the filter was successfully pushed down (and stored on the bind data),
Expand Down Expand Up @@ -181,7 +190,7 @@ impl Database {
pushdown_expression: ptr::null_mut::<c_void>(),
get_virtual_columns: Some(get_virtual_columns_callback::<T>),
to_string: Some(to_string_callback::<T>),
table_scan_progress: ptr::null_mut::<c_void>(),
table_scan_progress: Some(table_scan_progress_callback::<T>),
get_partition_data: Some(get_partition_data_callback::<T>),
projection_pushdown: T::PROJECTION_PUSHDOWN,
filter_pushdown: T::FILTER_PUSHDOWN,
Expand Down
19 changes: 19 additions & 0 deletions vortex-duckdb/src/duckdb/table_function/table_scan_progress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex::error::VortexExpect;

use crate::duckdb::TableFunction;

pub(crate) unsafe extern "C-unwind" fn table_scan_progress_callback<T: TableFunction>(
ctx: crate::cpp::duckdb_client_context,
bind_data: *mut ::std::os::raw::c_void,
global_state: *mut ::std::os::raw::c_void,
) -> f64 {
let ctx = unsafe { crate::duckdb::ClientContext::borrow(ctx) };
let bind_data =
unsafe { bind_data.cast::<T::BindData>().as_mut() }.vortex_expect("bind_data null pointer");
let global_state = unsafe { global_state.cast::<T::GlobalState>().as_mut() }
.vortex_expect("global_init_data null pointer");
T::table_scan_progress(&ctx, bind_data, global_state)
}
8 changes: 8 additions & 0 deletions vortex-duckdb/src/e2e_test/object_cache_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ impl TableFunction for TestTableFunction {
})
}

fn table_scan_progress(
_client_context: &ClientContext,
_bind_data: &mut Self::BindData,
_global_state: &mut Self::GlobalState,
) -> f64 {
100.0
}

fn scan(
_client_context: &ClientContext,
_bind_data: &Self::BindData,
Expand Down
70 changes: 67 additions & 3 deletions vortex-duckdb/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ pub struct VortexGlobalData {
iterator: ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>,
batch_id: AtomicU64,
ctx: ExecutionCtx,
bytes_total: Arc<AtomicU64>,
bytes_read: AtomicU64,
}

impl VortexGlobalData {
pub fn progress(&self) -> f64 {
let read = self.bytes_read.load(Ordering::Relaxed);
let mut total = self.bytes_total.load(Ordering::Relaxed);
total += (total == 0) as u64;
read as f64 / total as f64 * 100. // return 100. when nothing is read
}
}

pub struct VortexLocalData {
Expand Down Expand Up @@ -344,7 +355,6 @@ impl TableFunction for VortexTableFunction {
let Some(result) = local_state.iterator.next() else {
return Ok(());
};

let (array_result, conversion_cache) = result?;

let array_result = array_result.optimize_recursive()?;
Expand Down Expand Up @@ -380,6 +390,9 @@ impl TableFunction for VortexTableFunction {
.vortex_expect("error: exporter missing");

let has_more_data = exporter.export(chunk)?;
global_state
.bytes_read
.fetch_add(chunk.len(), Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong right? The variable is called bytes_read, but we're adding the row count of the chunk.

We're also adding this after pulling the chunk? So presumably the progress just hovers around or just below 100% the entire time?

I think we need to figure out a better measure of progress using the Scan API after #6652


if !has_more_data {
// This exporter is fully consumed.
Expand Down Expand Up @@ -415,6 +428,9 @@ impl TableFunction for VortexTableFunction {
.map(|n| n.get())
.unwrap_or(1);

let bytes_total = Arc::new(AtomicU64::new(0));
let bytes_total_copy = bytes_total.clone();

let handle = RUNTIME.handle();
let fs = bind_data.file_system.clone();
let first_file = bind_data.first_file.clone();
Expand All @@ -428,6 +444,7 @@ impl TableFunction for VortexTableFunction {
let conversion_cache = Arc::new(ConversionCache::new(idx as u64));
let object_cache = object_cache;

let bytes_total = bytes_total_copy.clone();
handle
.spawn(async move {
let vxf = if idx == 0 {
Expand All @@ -451,13 +468,15 @@ impl TableFunction for VortexTableFunction {
{
return Ok(None);
};

let scan = vxf
.scan()?
.with_some_filter(filter_expr)
.with_projection(projection_expr)
.with_ordered(false)
.map(move |split| Ok((split, conversion_cache.clone())))
.map(move |split| {
bytes_total.fetch_add(split.len() as u64, Ordering::Relaxed);
Ok((split, conversion_cache.clone()))
})
.into_stream()?
.boxed();

Expand All @@ -479,6 +498,8 @@ impl TableFunction for VortexTableFunction {
batch_id: AtomicU64::new(0),
// TODO(joe): fetch this from somewhere??.
ctx: ExecutionCtx::new(VortexSession::default()),
bytes_read: AtomicU64::new(0),
bytes_total,
})
}

Expand Down Expand Up @@ -508,6 +529,14 @@ impl TableFunction for VortexTableFunction {
})
}

fn table_scan_progress(
_client_context: &ClientContext,
_bind_data: &mut Self::BindData,
global_state: &mut Self::GlobalState,
) -> f64 {
global_state.progress()
}

fn pushdown_complex_filter(
bind_data: &mut Self::BindData,
expr: &Expression,
Expand Down Expand Up @@ -637,3 +666,38 @@ impl<'rt, T: 'rt> Stream for MultiScan<'rt, T> {
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;

use vortex::VortexSessionDefault as _;
use vortex::io::runtime::current::CurrentThreadRuntime;
use vortex::session::VortexSession;
use vortex_array::ExecutionCtx;

use crate::scan::VortexGlobalData;

#[test]
fn test_table_scan_progress() {
let iterator =
CurrentThreadRuntime::new().block_on_stream_thread_safe(|_| futures::stream::empty());
let state = VortexGlobalData {
iterator,
batch_id: AtomicU64::new(0),
ctx: ExecutionCtx::new(VortexSession::default()),
bytes_total: Arc::new(AtomicU64::new(100)),
bytes_read: AtomicU64::new(0),
};

assert_eq!(state.progress(), 0.0);

state.bytes_read.fetch_add(100, Relaxed);
assert_eq!(state.progress(), 100.);

state.bytes_total.fetch_add(100, Relaxed);
assert!((state.progress() - 50.).abs() < f64::EPSILON);
}
}
Loading