-
-
Notifications
You must be signed in to change notification settings - Fork 750
P2P shuffle skeleton #5520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
P2P shuffle skeleton #5520
Changes from all commits
5a1f00e
a46246c
3d4bbf3
433ce17
743215e
6f0b531
4a4f430
c39309c
432d149
cad4717
74ddf00
a998141
c480684
3fadb0a
10c0c64
6de77db
0e403ec
9b9a68b
20a2ce4
506dda3
c8a2f7a
20c9312
9f4304d
ab2c1d1
c6fc157
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| try: | ||
| import pandas | ||
| except ImportError: | ||
| SHUFFLE_AVAILABLE = False | ||
| else: | ||
| del pandas | ||
| SHUFFLE_AVAILABLE = True | ||
|
|
||
| from .shuffle import rearrange_by_column_p2p | ||
| from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension | ||
|
|
||
| __all__ = [ | ||
| "SHUFFLE_AVAILABLE", | ||
| "rearrange_by_column_p2p", | ||
| "ShuffleId", | ||
| "ShuffleMetadata", | ||
| "ShuffleWorkerExtension", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| from dask.base import tokenize | ||
| from dask.dataframe import DataFrame | ||
| from dask.delayed import Delayed, delayed | ||
| from dask.highlevelgraph import HighLevelGraph | ||
|
|
||
| from .shuffle_extension import NewShuffleMetadata, ShuffleId, ShuffleWorkerExtension | ||
|
|
||
| if TYPE_CHECKING: | ||
| import pandas as pd | ||
|
|
||
|
|
||
| def get_ext() -> ShuffleWorkerExtension: | ||
| from distributed import get_worker | ||
|
|
||
| try: | ||
| worker = get_worker() | ||
| except ValueError as e: | ||
| raise RuntimeError( | ||
| "`shuffle='p2p'` requires Dask's distributed scheduler. This task is not running on a Worker; " | ||
| "please confirm that you've created a distributed Client and are submitting this computation through it." | ||
| ) from e | ||
| extension: ShuffleWorkerExtension | None = worker.extensions.get("shuffle") | ||
| if not extension: | ||
| raise RuntimeError( | ||
| f"The worker {worker.address} does not have a ShuffleExtension. " | ||
| "Is pandas installed on the worker?" | ||
| ) | ||
| return extension | ||
|
|
||
|
|
||
| def shuffle_setup(metadata: NewShuffleMetadata) -> None: | ||
| get_ext().create_shuffle(metadata) | ||
|
|
||
|
|
||
| def shuffle_transfer(input: pd.DataFrame, id: ShuffleId, setup=None) -> None: | ||
| get_ext().add_partition(input, id) | ||
|
|
||
|
|
||
| def shuffle_unpack(id: ShuffleId, output_partition: int, barrier=None) -> pd.DataFrame: | ||
| return get_ext().get_output_partition(id, output_partition) | ||
|
|
||
|
|
||
| def shuffle_barrier(id: ShuffleId, transfers: list[None]) -> None: | ||
| get_ext().barrier(id) | ||
|
|
||
|
|
||
| def rearrange_by_column_p2p( | ||
| df: DataFrame, | ||
| column: str, | ||
| npartitions: int | None = None, | ||
| ): | ||
| npartitions = npartitions or df.npartitions | ||
| token = tokenize(df, column, npartitions) | ||
|
|
||
| setup = delayed(shuffle_setup, pure=True)( | ||
| NewShuffleMetadata( | ||
| ShuffleId(token), | ||
| df._meta, | ||
| column, | ||
| npartitions, | ||
| ) | ||
| ) | ||
|
|
||
| transferred = df.map_partitions( | ||
| shuffle_transfer, | ||
| token, | ||
| setup, | ||
| meta=df, | ||
| enforce_metadata=False, | ||
| transform_divisions=False, | ||
| ) | ||
|
|
||
| barrier_key = "shuffle-barrier-" + token | ||
| barrier_dsk = {barrier_key: (shuffle_barrier, token, transferred.__dask_keys__())} | ||
| barrier = Delayed( | ||
| barrier_key, | ||
| HighLevelGraph.from_collections( | ||
| barrier_key, barrier_dsk, dependencies=[transferred] | ||
| ), | ||
| ) | ||
|
|
||
| name = "shuffle-unpack-" + token | ||
| dsk = { | ||
| (name, i): (shuffle_unpack, token, i, barrier_key) for i in range(npartitions) | ||
| } | ||
| # TODO use this blockwise (https://github.com/coiled/oss-engineering/issues/49) | ||
| # Changes task names, so breaks setting worker restrictions at the moment. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, neither of these PRs solves this problems, do they? Do we have an idea how to solve this problem down the road? What would need to change to enable this?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #5524 solves it (that's one of the main points). The scheduler plugin handles the unpredictable task names by traversing the graph from the barrier task and parsing the key names of its dependents. That's what the discussion in https://github.com/dask/distributed/pull/5524/files#r765203083 is about.
I think the idea we have right now is something like: with dask.annotate(shuffle_id=token, output_partition={(name, i): i for i in range(npartitions)}, fuse="propagate"):
dsk = blockwise(...)
# or
with dask.annotate(shuffle_id=token, resources={(name, i): f"shuffle-{token}-{i}" for i in range(npartitions)}, fuse="propagate"):
dsk = blockwise(...)where we add annotations to the graph explicitly marking which output partition number each task is. The And the only thing these annotations gain us is not having to parse the keys of tasks that depend on the barrier, to pull out that Basically key-parsing would be fine for 95% of situations; the annotations would only solve some edge cases and make it feel more proper. |
||
| # Also maybe would be nice if the `DataFrameIOLayer` interface supported this? | ||
| # dsk = blockwise( | ||
| # shuffle_unpack, | ||
| # name, | ||
| # "i", | ||
| # token, | ||
| # None, | ||
| # BlockwiseDepDict({(i,): i for i in range(npartitions)}), | ||
| # "i", | ||
| # barrier_key, | ||
| # None, | ||
| # numblocks={}, | ||
| # ) | ||
|
Comment on lines
+92
to
+104
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In an earlier offline conversation, I mentioned that I am doubtful whether or not this should actually be considered a blockwise operation. My line of argumentation is mostly about the fact that the individual pieces are not independently calculable. This blockwise definition you propose itself also doesn't really look healthy in my opinion since we're encoding the The most important reason why we'd want to use this blockwise layer here is to enable downstream task fusion. imho, this should not require us to define this layer as blockwise but rather subsequent blockwise operations should care and be capable to fuse with with this. We may need to teach it this first and in the intermediate term we can use blockwise. My point is, I consider blockwise as rather the shortcut and not the as final solution. Regardless of how the fusion is supposed to work, I am doubtful about whether or not this is even required. To my understanding fusion is extremely important for the input/transfer layer since it needs to be fused with whatever the input is. E.g. if we're starting off with a
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's fair. You're right that if the data is all in memory anyway, overproduction is irrelevant. And there are probably other hacks we can do to protect against it on the unpack side. I think making a dedicated layer is the best approach here. |
||
|
|
||
| return DataFrame( | ||
| HighLevelGraph.from_collections(name, dsk, [barrier]), | ||
| name, | ||
| df._meta, | ||
| [None] * (npartitions + 1), | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.