-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Compute pool for native executor #2986
Conversation
CodSpeed Performance ReportMerging #2986 will not alter performanceComparing Summary
|
src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Outdated
Show resolved
Hide resolved
src/daft-local-execution/src/lib.rs
Outdated
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); | ||
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); | ||
format!("Executor-Worker-{}", id) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todo: look into thread priority: https://docs.rs/thread-priority/latest/thread_priority/, we could potentially set priorities for compute and io pool.
Would do it on https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.on_thread_start
src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Outdated
Show resolved
Hide resolved
src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Outdated
Show resolved
Hide resolved
@@ -377,7 +380,8 @@ impl StreamingSink for OuterHashJoinProbeSink { | |||
Ok(StreamingSinkOutput::NeedMoreInput(None)) | |||
} | |||
_ => { | |||
let state = state | |||
let mut guard = state.inner.lock().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can prob put this as a method on StreamingSinkState
that does the following:
- as_any
- downcasts
- error message
likefn get_state_mut<T>(&self) -> &mut T
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opted for
pub(crate) fn with_state_mut<T: DynStreamingSinkState + 'static, F, R>(&self, f: F) -> R
where
F: FnOnce(&mut T) -> R,
since returning &mut T
will outlive the lifetime of the mutex guard.
Create a multithreaded compute runtime for swordfish compute tasks. Switch query runtime to be single threaded, and use IO pool for scan task streams.
Additionally, adds in a
tokio_select
together with thetokio::signal::ctrlc
and main async execution loop so that queries can be cancelled.If you run this script you can see that the number of threads increases by only 1 per dataframe.
TODO: