-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Swordfish Stateful UDF support #3127
Conversation
CodSpeed Performance ReportMerging #3127 will not alter performanceComparing Summary
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3127 +/- ##
==========================================
+ Coverage 78.62% 78.80% +0.17%
==========================================
Files 621 621
Lines 74125 74880 +755
==========================================
+ Hits 58284 59008 +724
- Misses 15841 15872 +31
|
src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs
Outdated
Show resolved
Hide resolved
@@ -1,3 +1,4 @@ | |||
pub mod actor_pool_project; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Would it be possible to #[cfg(feature = "python")]
at the module instead of inside the functions? I'm wondering if we can consolidate the #[cfg(feature = "python")] / #[cfg(not(feature = "python"))]
logic to a single location, perhaps in the physical_plan_to_pipeline
translation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did #[cfg(feature = "python")]
in here so that it would be possible to write Rust tests for the actor pool project pipeline node. I don't have any right now but might add some in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ I did find it very useful to do this, because you might imagine cases where you want the pipeline node to be available (even when not compiling for Python) to do things such as testing optimization passes on the pipeline nodes
src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good, curious how this performs though given that I'm guessing scheduling is just doing round-robin? Would this be very sensitive to stragglers?
impl Drop for ActorHandle { | ||
fn drop(&mut self) { | ||
// Ignore errors since you shouldn't panic in a drop | ||
let _ = self.teardown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should at least log instead of ignoring silently
@@ -1,3 +1,4 @@ | |||
pub mod actor_pool_project; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ I did find it very useful to do this, because you might imagine cases where you want the pipeline node to be available (even when not compiling for Python) to do things such as testing optimization passes on the pipeline nodes
|
||
expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection]) | ||
self.actor_process = mp.Process(target=stateful_actor_event_loop, args=(expr_projection, actor_conn)) | ||
self.actor_process.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems quite clean. Was using ProcessPoolExecutor
not an option here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was no need since we only need to spin up one process here. I figured it would be cleaner to implement the event loop myself so there's less magic and singleton stuff
logger = logging.getLogger(__name__) | ||
|
||
|
||
def initialize_actor_pool_projection(projection: ExpressionsProjection) -> ExpressionsProjection: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this code similar/shared with the PyRunner? Should we use this there as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's sorta similar but there's some differences, including not setting CUDA_VISIBLE_DEVICES
. I could share the rest though, will take a look at that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated pyrunner to use this function
.collect() | ||
} | ||
|
||
pub fn extract_stateful_udf_exprs(expr: ExprRef) -> Vec<StatefulPythonUDF> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some docstrings, especially if it's intended to be pub
The scheduling is currently round-robin but Colin is actually working on a PR (#3132) that allows for better scheduling when order isn't needed. When order is, we would need to implement a buffer, which I am choosing not to do in this PR. It should not be significantly performance critical anyway, I don't imagine people will run a stateful UDF with high concurrency on a local machine, so the straggler issue isn't as significant as with the distributed runner. |
Todos: