Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Swordfish Stateful UDF support #3127

Merged
merged 3 commits into from
Oct 29, 2024
Merged

Conversation

kevinzwang
Copy link
Member

@kevinzwang kevinzwang commented Oct 25, 2024

Todos:

  • test error handling
  • test proper cleanup upon finish, ctrl+c, exception, etc

@github-actions github-actions bot added the enhancement New feature or request label Oct 25, 2024
Copy link

codspeed-hq bot commented Oct 25, 2024

CodSpeed Performance Report

Merging #3127 will not alter performance

Comparing kevin/swordfish-stateful-udf (4b61d28) with main (b0a5a40)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Oct 25, 2024

Codecov Report

Attention: Patch coverage is 72.56637% with 62 lines in your changes missing coverage. Please review.

Project coverage is 78.80%. Comparing base (5b450fb) to head (4b61d28).
Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
daft/execution/stateful_actor.py 0.00% 44 Missing ⚠️
...ecution/src/intermediate_ops/actor_pool_project.rs 90.00% 11 Missing ⚠️
daft/runners/pyrunner.py 14.28% 6 Missing ⚠️
src/daft-physical-plan/src/local_plan.rs 92.85% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3127      +/-   ##
==========================================
+ Coverage   78.62%   78.80%   +0.17%     
==========================================
  Files         621      621              
  Lines       74125    74880     +755     
==========================================
+ Hits        58284    59008     +724     
- Misses      15841    15872      +31     
Files with missing lines Coverage Δ
src/daft-dsl/src/functions/python/mod.rs 81.52% <100.00%> (+0.99%) ⬆️
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 94.44% <100.00%> (ø)
...tion/src/intermediate_ops/inner_hash_join_probe.rs 95.76% <100.00%> (ø)
...-execution/src/intermediate_ops/intermediate_op.rs 82.16% <100.00%> (+2.43%) ⬆️
src/daft-local-execution/src/pipeline.rs 94.50% <100.00%> (+0.09%) ⬆️
src/daft-physical-plan/src/translate.rs 93.58% <100.00%> (-4.27%) ⬇️
src/daft-physical-plan/src/local_plan.rs 95.85% <92.85%> (-0.19%) ⬇️
daft/runners/pyrunner.py 87.54% <14.28%> (+2.54%) ⬆️
...ecution/src/intermediate_ops/actor_pool_project.rs 90.00% <90.00%> (ø)
daft/execution/stateful_actor.py 0.00% <0.00%> (ø)

... and 41 files with indirect coverage changes

@kevinzwang kevinzwang marked this pull request as ready for review October 25, 2024 05:41
@@ -1,3 +1,4 @@
pub mod actor_pool_project;
Copy link
Contributor

@colin-ho colin-ho Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would it be possible to #[cfg(feature = "python")] at the module instead of inside the functions? I'm wondering if we can consolidate the #[cfg(feature = "python")] / #[cfg(not(feature = "python"))] logic to a single location, perhaps in the physical_plan_to_pipeline translation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did #[cfg(feature = "python")] in here so that it would be possible to write Rust tests for the actor pool project pipeline node. I don't have any right now but might add some in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ I did find it very useful to do this, because you might imagine cases where you want the pipeline node to be available (even when not compiling for Python) to do things such as testing optimization passes on the pipeline nodes

Copy link
Contributor

@jaychia jaychia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good, curious how this performs though given that I'm guessing scheduling is just doing round-robin? Would this be very sensitive to stragglers?

impl Drop for ActorHandle {
fn drop(&mut self) {
// Ignore errors since you shouldn't panic in a drop
let _ = self.teardown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should at least log instead of ignoring silently

@@ -1,3 +1,4 @@
pub mod actor_pool_project;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ I did find it very useful to do this, because you might imagine cases where you want the pipeline node to be available (even when not compiling for Python) to do things such as testing optimization passes on the pipeline nodes


expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection])
self.actor_process = mp.Process(target=stateful_actor_event_loop, args=(expr_projection, actor_conn))
self.actor_process.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems quite clean. Was using ProcessPoolExecutor not an option here?

Copy link
Member Author

@kevinzwang kevinzwang Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no need since we only need to spin up one process here. I figured it would be cleaner to implement the event loop myself so there's less magic and singleton stuff

logger = logging.getLogger(__name__)


def initialize_actor_pool_projection(projection: ExpressionsProjection) -> ExpressionsProjection:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this code similar/shared with the PyRunner? Should we use this there as well.

Copy link
Member Author

@kevinzwang kevinzwang Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sorta similar but there's some differences, including not setting CUDA_VISIBLE_DEVICES. I could share the rest though, will take a look at that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated pyrunner to use this function

.collect()
}

pub fn extract_stateful_udf_exprs(expr: ExprRef) -> Vec<StatefulPythonUDF> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some docstrings, especially if it's intended to be pub

@kevinzwang
Copy link
Member Author

Looks pretty good, curious how this performs though given that I'm guessing scheduling is just doing round-robin? Would this be very sensitive to stragglers?

The scheduling is currently round-robin but Colin is actually working on a PR (#3132) that allows for better scheduling when order isn't needed. When order is, we would need to implement a buffer, which I am choosing not to do in this PR. It should not be significantly performance critical anyway, I don't imagine people will run a stateful UDF with high concurrency on a local machine, so the straggler issue isn't as significant as with the distributed runner.

@kevinzwang kevinzwang merged commit 14b2246 into main Oct 29, 2024
45 of 46 checks passed
@kevinzwang kevinzwang deleted the kevin/swordfish-stateful-udf branch October 29, 2024 18:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants