-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate scheduling API from dask implementation #30
Conversation
xref pangeo-data#29 This PR moves the dask specific scheduling logic into a separate `dask.py` file, as a first step for adding support for alternative schedulers. (I'm particularly interested in supporting Apache Beam.) The existing tests pass (with minor modifications), but the documentation still needs updating. Notes: - I put `staged_copy` into a single function, but perhaps there are other generic methods (`execute`?) that would justify using a class? - `Rechunked` no longer inherits from `dask.delayed.Delayed`, and no longer has any dask specific logic at all. I think this is important for generic scheduler support, but it does means make it a little less reusable in larger pipelines. `_delayed` is currently a private attribute, but we should probably expose the scheduler equivalent of "delayed" objects in some way. I guess this is a use-case for class-based interface from the previous bullet. - `Rechunked` now always contains zarr arrays/groups rather than dask arrays. This makes the repr a little less informative, e.g., it no longer shows chunk size. This should probably be fixed before merging. - Will "two stage" copying always suffice? The interface I wrote for `staged_copy` supports any number of stages (in theory). That might be useful in the future, or it might be unnecessary complexity. - To verify that adding a new scheduler is not too painful, I should probably write at least a second example. I'll start with a naive "reference" scheduler in pure Python (this could go in the docs) and think about adding a Beam implementation as well. Beam is perhaps a nice example because it's execution models is so different from dask (based on higher level transforms like "map" rather than individual tasks).
rechunker/dask.py
Outdated
|
||
name = "rechunked-" + dask.base.tokenize([x.name for x in stores_delayed]) | ||
dsk = dask.base.merge(*[x.dask for x in stores_delayed], dsk2) | ||
dsk[name] = (_barrier,) + tuple( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this no longer makes the target group the return value of the Delayed object. We could do that, but I didn't love how it made the staged_copy
API more complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with that. That was just a superficial feature we added. The user should know how to open the target group.
Codecov Report
@@ Coverage Diff @@
## master #30 +/- ##
==========================================
+ Coverage 90.30% 92.33% +2.02%
==========================================
Files 2 5 +3
Lines 196 274 +78
Branches 45 57 +12
==========================================
+ Hits 177 253 +76
Misses 10 10
- Partials 9 11 +2
Continue to review full report at Codecov.
|
Stephan it is really great to get this PR. I'm so happy that you have found time to help contribute to rechunker. Overall, I really like your design. I will try to find time in the next few days (realistically, early next week) for a more thorough review. In the meantime, maybe @TomAugspurger can have a look? |
Thank you for releasing this tool in the first place! This fills an important niche for our current project, so I'm excited to be able to work with you on it. |
rechunker/types.py
Outdated
class CopySpec(NamedTuple): | ||
"""Specifcation of how to copy between two arrays.""" | ||
|
||
# TODO: remove Any by making CopySpec a Generic, once we only support Python | ||
# 3.7+: https://stackoverflow.com/questions/50530959 | ||
source: Any | ||
target: zarr.Array | ||
chunks: Tuple[int, ...] | ||
|
||
|
||
class StagedCopySpec: | ||
stages: Tuple[CopySpec, ...] | ||
|
||
def __init__(self, stages: Iterable[CopySpec]): | ||
self.stages = tuple(stages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might consider adding (or at least documenting) some invariant checks when constructing these objects, e.g.,
source
andtarget
should have sameshape
anddtype
- no element of
chunks
should be larger thanshape
- the
target
andsource
of subsequent stages should be the same array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The all looks good to me. The decision to return a Delayed subclass was primarily out of laziness (on my part).
Just a thought that occurred to me last night: it would be awesome to implement a prefect scheduler as well. |
I implemented a second executor just using Python. It's only ~15 lines of code and should be a useful reference. Looking at the two executors (Dask and Python), it felt like a class would be appropriate to codify the interface. So now we have a (very lightweight) |
This is ready for a full review. I'm intentionally not documenting the Executor class interface in the docs for now, because I suspect it will change as we write the next few executors. For now, anyone who wants to explore it should be willing to dive into the source code, and ideally would submit their Executor upstream into Rechunker itself! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stephan, this is really wonderful. Thanks for doing this so carefully and thoroughly. I have no issues that can block merging. Just some questions to make sure I understand your code, for maintainability's sake.
@@ -309,6 +301,7 @@ def _rechunk_array( | |||
f"Got array_dims {array_dims}, target_chunks {target_chunks}." | |||
) | |||
|
|||
# TODO: rewrite to avoid the hard dependency on dask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could easily vendor this function.
https://github.com/dask/dask/blob/cd2a42b1735dbd0eb66b95b2989047a7f02bbe50/dask/utils.py#L1185-L1236
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree!
I didn't do that yet because removing the hard dask dependency felt orthogonal to the pain point of this PR. But it's definitely worth doing soon...
ranges = [range(math.ceil(s / c)) for s, c in zip(shape, chunks)] | ||
for indices in itertools.product(*ranges): | ||
key = tuple(slice(c * i, c * (i + 1)) for i, c in zip(indices, chunks)) | ||
target_array[key] = source_array[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I just say that this is a beautiful little piece of code. 🏅
from rechunker.types import CopySpec, StagedCopySpec, Executor | ||
|
||
|
||
Thunk = Callable[[], None] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those of us who are new to typing would appreciate a comment about what Thunk
is supposed to be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"thunk" is just a name for a "function without any arguments", but the name doesn't provide much value here. See
#33 for removing it.
T = TypeVar("T") | ||
|
||
|
||
class Executor(Generic[T]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand why you didn't use an ABC here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love ABCs because they add slow/picky runtime checks, though I guess that doesn't really matter here. Python's typing module has zero overhead, and if you don't care about type checking you don't even have to make your code pass the type checker.
return _staged_copy(specs) | ||
|
||
def execute_plan(self, plan: Delayed, **kwargs): | ||
return plan.compute(**kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, when we subclassed Delayed, we could call persist
on the Recunked object. That was convenient if you wanted to avoid blocking. Now our only option is compute
. I'm fine with this change here, but maybe we should think about how to bring back that capability in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's exposed as the .plan
attribute on the Rechunked object. So you could write rechunk(...).plan.persist()
.
That said, as I think more about how this API would or would not make sense for other schedulers, I wonder if it really makes sense to have an execute()
method on Rechunked objects at all. The natural way to execute on particular schedulers may not fit neatly into that API, so perhaps it's better to encourage users to work with the scheduler specific objects representing delayed execution graphs (e.g., dask.delayed
or apache_beam.PTransform
).
xref #29
This PR moves the dask specific scheduling logic into a separate
dask.py
file, as a first step for adding support for alternative schedulers. (I'm particularly interested in supporting Apache Beam.)The existing tests pass (with minor modifications), but the documentation still needs updating.
Notes:
staged_copy
into a single function, but perhaps there are other generic methods (execute
?) that would justify using a class?Rechunked
no longer inherits fromdask.delayed.Delayed
, and no longer has any dask specific logic at all. I think this is important for generic scheduler support, but it does means make it a little less reusable in larger pipelines._delayed
is currently a private attribute, but we should probably expose the scheduler equivalent of "delayed" objects in some way. I guess this is a use-case for class-based interface from the previous bullet.Rechunked
now always contains zarr arrays/groups rather than dask arrays. This makes the repr a little less informative, e.g., it no longer shows chunk size. This should probably be fixed before merging.staged_copy
supports any number of stages (in theory). That might be useful in the future, or it might be unnecessary complexity.