-
Notifications
You must be signed in to change notification settings - Fork 25
Remove legacy shuffle, add docs for distributed testing #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0340b80
b7fb348
7167d19
8913dcb
c8d024e
30a3a3b
554cbda
4f6d16e
303b6d8
e781250
a1ad321
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,7 +77,9 @@ def _get_worker_inputs( | |
| plan_bytes = datafusion_ray.serialize_execution_plan(stage.get_execution_plan()) | ||
| futures = [] | ||
| opt = {} | ||
| opt["resources"] = {"worker": 1e-3} | ||
| # TODO not sure why we had this but my Ray cluster could not find suitable resource | ||
| # until I commented this out | ||
| # opt["resources"] = {"worker": 1e-3} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe a comment here is useful? |
||
| opt["num_returns"] = output_partitions_count | ||
| for part in range(concurrency): | ||
| ids, inputs = _get_worker_inputs(part) | ||
|
|
@@ -93,7 +95,6 @@ def _get_worker_inputs( | |
| def execute_query_stage( | ||
| query_stages: list[QueryStage], | ||
| stage_id: int, | ||
| use_ray_shuffle: bool, | ||
| ) -> tuple[int, list[ray.ObjectRef]]: | ||
| """ | ||
| Execute a query stage on the workers. | ||
|
|
@@ -106,7 +107,7 @@ def execute_query_stage( | |
| child_futures = [] | ||
| for child_id in stage.get_child_stage_ids(): | ||
| child_futures.append( | ||
| execute_query_stage.remote(query_stages, child_id, use_ray_shuffle) | ||
| execute_query_stage.remote(query_stages, child_id) | ||
| ) | ||
|
|
||
| # if the query stage has a single output partition then we need to execute for the output | ||
|
|
@@ -133,33 +134,28 @@ def _get_worker_inputs( | |
| ) -> tuple[list[tuple[int, int, int]], list[ray.ObjectRef]]: | ||
| ids = [] | ||
| futures = [] | ||
| if use_ray_shuffle: | ||
| for child_stage_id, child_futures in child_outputs: | ||
| for i, lst in enumerate(child_futures): | ||
| if isinstance(lst, list): | ||
| for j, f in enumerate(lst): | ||
| if concurrency == 1 or j == part: | ||
| # If concurrency is 1, pass in all shuffle partitions. Otherwise, | ||
| # only pass in the partitions that match the current worker partition. | ||
| ids.append((child_stage_id, i, j)) | ||
| futures.append(f) | ||
| elif concurrency == 1 or part == 0: | ||
| ids.append((child_stage_id, i, 0)) | ||
| futures.append(lst) | ||
| for child_stage_id, child_futures in child_outputs: | ||
| for i, lst in enumerate(child_futures): | ||
| if isinstance(lst, list): | ||
| for j, f in enumerate(lst): | ||
| if concurrency == 1 or j == part: | ||
| # If concurrency is 1, pass in all shuffle partitions. Otherwise, | ||
| # only pass in the partitions that match the current worker partition. | ||
| ids.append((child_stage_id, i, j)) | ||
| futures.append(f) | ||
| elif concurrency == 1 or part == 0: | ||
| ids.append((child_stage_id, i, 0)) | ||
| futures.append(lst) | ||
| return ids, futures | ||
|
|
||
| # if we are using disk-based shuffle, wait until the child stages to finish | ||
| # writing the shuffle files to disk first. | ||
| if not use_ray_shuffle: | ||
| ray.get([f for _, lst in child_outputs for f in lst]) | ||
|
|
||
| # schedule the actual execution workers | ||
| plan_bytes = datafusion_ray.serialize_execution_plan(stage.get_execution_plan()) | ||
| futures = [] | ||
| opt = {} | ||
| opt["resources"] = {"worker": 1e-3} | ||
| if use_ray_shuffle: | ||
| opt["num_returns"] = output_partitions_count | ||
| # TODO not sure why we had this but my Ray cluster could not find suitable resource | ||
| # until I commented this out | ||
| #opt["resources"] = {"worker": 1e-3} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||
| opt["num_returns"] = output_partitions_count | ||
| for part in range(concurrency): | ||
| ids, inputs = _get_worker_inputs(part) | ||
| futures.append( | ||
|
|
@@ -210,10 +206,9 @@ def execute_query_partition( | |
|
|
||
|
|
||
| class DatafusionRayContext: | ||
| def __init__(self, num_workers: int = 1, use_ray_shuffle: bool = False): | ||
| self.ctx = Context(num_workers, use_ray_shuffle) | ||
| def __init__(self, num_workers: int = 1): | ||
| self.ctx = Context(num_workers) | ||
| self.num_workers = num_workers | ||
| self.use_ray_shuffle = use_ray_shuffle | ||
|
|
||
| def register_csv(self, table_name: str, path: str, has_header: bool): | ||
| self.ctx.register_csv(table_name, path, has_header) | ||
|
|
@@ -234,23 +229,7 @@ def sql(self, sql: str) -> pa.RecordBatch: | |
|
|
||
| graph = self.ctx.plan(sql) | ||
| final_stage_id = graph.get_final_query_stage().id() | ||
| if self.use_ray_shuffle: | ||
| partitions = schedule_execution(graph, final_stage_id, True) | ||
| else: | ||
| # serialize the query stages and store in Ray object store | ||
| query_stages = [ | ||
| datafusion_ray.serialize_execution_plan( | ||
| graph.get_query_stage(i).get_execution_plan() | ||
| ) | ||
| for i in range(final_stage_id + 1) | ||
| ] | ||
| # schedule execution | ||
| future = execute_query_stage.remote( | ||
| query_stages, | ||
| final_stage_id, | ||
| self.use_ray_shuffle, | ||
| ) | ||
| _, partitions = ray.get(future) | ||
| partitions = schedule_execution(graph, final_stage_id, True) | ||
| # assert len(partitions) == 1, len(partitions) | ||
| result_set = ray.get(partitions[0]) | ||
| return result_set | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| # Distributed Testing | ||
|
|
||
| Install Ray on at least two nodes. | ||
|
|
||
| https://docs.ray.io/en/latest/ray-overview/installation.html | ||
|
|
||
| ```shell | ||
| sudo apt install -y python3-pip python3.12-venv | ||
| python3 -m venv venv | ||
| source venv/bin/activate | ||
| pip3 install -U "ray[default]" | ||
| ``` | ||
|
|
||
| ## Start Ray Head Node | ||
|
|
||
| ```shell | ||
| ray start --head --node-ip-address=10.0.0.23 --port=6379 --dashboard-host=0.0.0.0 | ||
| ``` | ||
|
|
||
| ## Start Ray Worker Nodes(s) | ||
|
|
||
| ```shell | ||
| ray start --address=10.0.0.23:6379 --redis-password='5241590000000000' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you need to start redis?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, this is built into ray somehow and this is the password that it uses |
||
| ``` | ||
|
|
||
| ## Install DataFusion Ray (on each node) | ||
|
|
||
| Clone the repo with the version that you want to test. Run `maturin build --release` in the virtual env. | ||
|
|
||
| ```shell | ||
| curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh | ||
| . "$HOME/.cargo/env" | ||
| ``` | ||
|
|
||
| ```shell | ||
| pip3 install maturin | ||
| ``` | ||
|
|
||
| ```shell | ||
| git clone https://github.com/apache/datafusion-ray.git | ||
| cd datafusion-ray | ||
| maturin develop --release | ||
| ``` | ||
|
|
||
| ## Submit Job | ||
|
|
||
| ```shell | ||
| cd examples | ||
| RAY_ADDRESS='http://10.0.0.23:8265' ray job submit --working-dir `pwd` -- python3 tips.py | ||
| ``` | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to remove this; otherwise, the Ray cluster could not find a suitable worker node.
@franklsf95 do we still need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only works when you start a Ray cluster with custom resources, e.g. the head node with
ray start --head --resources='{"head":1}'and worker nodes each withray start --resources='{"worker":1}'. I had this resource requirement to make sure the tasks run on worker nodes exclusively (for fair benchmarking). If we don't do this, the task could also run on the driver node. Depending on the use case, this may be harmless.