-
Notifications
You must be signed in to change notification settings - Fork 135
Expose table scan progress to DuckDB #6596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| use vortex::error::VortexExpect; | ||
|
|
||
| use crate::duckdb::TableFunction; | ||
|
|
||
| pub(crate) unsafe extern "C-unwind" fn table_scan_progress_callback<T: TableFunction>( | ||
| ctx: crate::cpp::duckdb_client_context, | ||
| bind_data: *mut ::std::os::raw::c_void, | ||
| global_state: *mut ::std::os::raw::c_void, | ||
| ) -> f64 { | ||
| let ctx = unsafe { crate::duckdb::ClientContext::borrow(ctx) }; | ||
| let bind_data = | ||
| unsafe { bind_data.cast::<T::BindData>().as_mut() }.vortex_expect("bind_data null pointer"); | ||
| let global_state = unsafe { global_state.cast::<T::GlobalState>().as_mut() } | ||
| .vortex_expect("global_init_data null pointer"); | ||
| T::table_scan_progress(&ctx, bind_data, global_state) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,6 +113,17 @@ pub struct VortexGlobalData { | |
| iterator: ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>, | ||
| batch_id: AtomicU64, | ||
| ctx: ExecutionCtx, | ||
| bytes_total: Arc<AtomicU64>, | ||
| bytes_read: AtomicU64, | ||
| } | ||
|
|
||
| impl VortexGlobalData { | ||
| pub fn progress(&self) -> f64 { | ||
| let read = self.bytes_read.load(Ordering::Relaxed); | ||
| let mut total = self.bytes_total.load(Ordering::Relaxed); | ||
| total += (total == 0) as u64; | ||
| read as f64 / total as f64 * 100. // return 100. when nothing is read | ||
| } | ||
| } | ||
|
|
||
| pub struct VortexLocalData { | ||
|
|
@@ -344,7 +355,6 @@ impl TableFunction for VortexTableFunction { | |
| let Some(result) = local_state.iterator.next() else { | ||
| return Ok(()); | ||
| }; | ||
|
|
||
| let (array_result, conversion_cache) = result?; | ||
|
|
||
| let array_result = array_result.optimize_recursive()?; | ||
|
|
@@ -380,6 +390,9 @@ impl TableFunction for VortexTableFunction { | |
| .vortex_expect("error: exporter missing"); | ||
|
|
||
| let has_more_data = exporter.export(chunk)?; | ||
| global_state | ||
| .bytes_read | ||
| .fetch_add(chunk.len(), Ordering::Relaxed); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is wrong right? The variable is called bytes_read, but we're adding the row count of the chunk. We're also adding this after pulling the chunk? So presumably the progress just hovers around or just below 100% the entire time? I think we need to figure out a better measure of progress using the Scan API after #6652 |
||
|
|
||
| if !has_more_data { | ||
| // This exporter is fully consumed. | ||
|
|
@@ -415,6 +428,9 @@ impl TableFunction for VortexTableFunction { | |
| .map(|n| n.get()) | ||
| .unwrap_or(1); | ||
|
|
||
| let bytes_total = Arc::new(AtomicU64::new(0)); | ||
| let bytes_total_copy = bytes_total.clone(); | ||
|
|
||
| let handle = RUNTIME.handle(); | ||
| let fs = bind_data.file_system.clone(); | ||
| let first_file = bind_data.first_file.clone(); | ||
|
|
@@ -428,6 +444,7 @@ impl TableFunction for VortexTableFunction { | |
| let conversion_cache = Arc::new(ConversionCache::new(idx as u64)); | ||
| let object_cache = object_cache; | ||
|
|
||
| let bytes_total = bytes_total_copy.clone(); | ||
| handle | ||
| .spawn(async move { | ||
| let vxf = if idx == 0 { | ||
|
|
@@ -451,13 +468,15 @@ impl TableFunction for VortexTableFunction { | |
| { | ||
| return Ok(None); | ||
| }; | ||
|
|
||
| let scan = vxf | ||
| .scan()? | ||
| .with_some_filter(filter_expr) | ||
| .with_projection(projection_expr) | ||
| .with_ordered(false) | ||
| .map(move |split| Ok((split, conversion_cache.clone()))) | ||
| .map(move |split| { | ||
| bytes_total.fetch_add(split.len() as u64, Ordering::Relaxed); | ||
| Ok((split, conversion_cache.clone())) | ||
| }) | ||
| .into_stream()? | ||
| .boxed(); | ||
|
|
||
|
|
@@ -479,6 +498,8 @@ impl TableFunction for VortexTableFunction { | |
| batch_id: AtomicU64::new(0), | ||
| // TODO(joe): fetch this from somewhere??. | ||
| ctx: ExecutionCtx::new(VortexSession::default()), | ||
| bytes_read: AtomicU64::new(0), | ||
| bytes_total, | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -508,6 +529,14 @@ impl TableFunction for VortexTableFunction { | |
| }) | ||
| } | ||
|
|
||
| fn table_scan_progress( | ||
| _client_context: &ClientContext, | ||
| _bind_data: &mut Self::BindData, | ||
| global_state: &mut Self::GlobalState, | ||
| ) -> f64 { | ||
| global_state.progress() | ||
| } | ||
|
|
||
| fn pushdown_complex_filter( | ||
| bind_data: &mut Self::BindData, | ||
| expr: &Expression, | ||
|
|
@@ -637,3 +666,38 @@ impl<'rt, T: 'rt> Stream for MultiScan<'rt, T> { | |
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::sync::Arc; | ||
| use std::sync::atomic::AtomicU64; | ||
| use std::sync::atomic::Ordering::Relaxed; | ||
|
|
||
| use vortex::VortexSessionDefault as _; | ||
| use vortex::io::runtime::current::CurrentThreadRuntime; | ||
| use vortex::session::VortexSession; | ||
| use vortex_array::ExecutionCtx; | ||
|
|
||
| use crate::scan::VortexGlobalData; | ||
|
|
||
| #[test] | ||
| fn test_table_scan_progress() { | ||
| let iterator = | ||
| CurrentThreadRuntime::new().block_on_stream_thread_safe(|_| futures::stream::empty()); | ||
| let state = VortexGlobalData { | ||
| iterator, | ||
| batch_id: AtomicU64::new(0), | ||
| ctx: ExecutionCtx::new(VortexSession::default()), | ||
| bytes_total: Arc::new(AtomicU64::new(100)), | ||
| bytes_read: AtomicU64::new(0), | ||
| }; | ||
|
|
||
| assert_eq!(state.progress(), 0.0); | ||
|
|
||
| state.bytes_read.fetch_add(100, Relaxed); | ||
| assert_eq!(state.progress(), 100.); | ||
|
|
||
| state.bytes_total.fetch_add(100, Relaxed); | ||
| assert!((state.progress() - 50.).abs() < f64::EPSILON); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.