Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate scheduling API from dask implementation #30

Merged
merged 12 commits into from
Jul 27, 2020

Conversation

shoyer
Copy link
Collaborator

@shoyer shoyer commented Jul 24, 2020

xref #29

This PR moves the dask specific scheduling logic into a separate dask.py file, as a first step for adding support for alternative schedulers. (I'm particularly interested in supporting Apache Beam.)

The existing tests pass (with minor modifications), but the documentation still needs updating.

Notes:

  • I put staged_copy into a single function, but perhaps there are other generic methods (execute?) that would justify using a class?
  • Rechunked no longer inherits from dask.delayed.Delayed, and no longer has any dask specific logic at all. I think this is important for generic scheduler support, but it does means make it a little less reusable in larger pipelines. _delayed is currently a private attribute, but we should probably expose the scheduler equivalent of "delayed" objects in some way. I guess this is a use-case for class-based interface from the previous bullet.
  • Rechunked now always contains zarr arrays/groups rather than dask arrays. This makes the repr a little less informative, e.g., it no longer shows chunk size. This should probably be fixed before merging.
  • Will "two stage" copying always suffice? The interface I wrote for staged_copy supports any number of stages (in theory). That might be useful in the future, or it might be unnecessary complexity.
  • To verify that adding a new scheduler is not too painful, I should probably write at least a second example. I'll start with a naive "reference" scheduler in pure Python (this could go in the docs) and I'll think about adding a Beam implementation as well. Beam is perhaps a nice example because its execution models is so different from dask (based on higher level transforms like "map" rather than individual tasks).

xref pangeo-data#29

This PR moves the dask specific scheduling logic into a separate `dask.py`
file, as a first step for adding support for alternative schedulers. (I'm
particularly interested in supporting Apache Beam.)

The existing tests pass (with minor modifications), but the documentation still
needs updating.

Notes:

- I put `staged_copy` into a single function, but perhaps there are other
  generic methods (`execute`?) that would justify using a class?
- `Rechunked` no longer inherits from `dask.delayed.Delayed`, and no longer has
  any dask specific logic at all. I think this is important for generic
  scheduler support, but it does means make it a little less reusable in larger
  pipelines. `_delayed` is currently a private attribute, but we should
  probably expose the scheduler equivalent of "delayed" objects in some way.
  I guess this is a use-case for class-based interface from the previous
  bullet.
- `Rechunked` now always contains zarr arrays/groups rather than dask arrays.
  This makes the repr a little less informative, e.g., it no longer shows
  chunk size. This should probably be fixed before merging.
- Will "two stage" copying always suffice? The interface I wrote for
  `staged_copy` supports any number of stages (in theory). That might be useful
  in the future, or it might be unnecessary complexity.
- To verify that adding a new scheduler is not too painful, I should probably
  write at least a second example. I'll start with a naive "reference"
  scheduler in pure Python (this could go in the docs) and think about adding
  a Beam implementation as well. Beam is perhaps a nice example because it's
  execution models is so different from dask (based on higher level transforms
  like "map" rather than individual tasks).

name = "rechunked-" + dask.base.tokenize([x.name for x in stores_delayed])
dsk = dask.base.merge(*[x.dask for x in stores_delayed], dsk2)
dsk[name] = (_barrier,) + tuple(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this no longer makes the target group the return value of the Delayed object. We could do that, but I didn't love how it made the staged_copy API more complicated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with that. That was just a superficial feature we added. The user should know how to open the target group.

@codecov
Copy link

codecov bot commented Jul 24, 2020

Codecov Report

Merging #30 into master will increase coverage by 2.02%.
The diff coverage is 96.96%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #30      +/-   ##
==========================================
+ Coverage   90.30%   92.33%   +2.02%     
==========================================
  Files           2        5       +3     
  Lines         196      274      +78     
  Branches       45       57      +12     
==========================================
+ Hits          177      253      +76     
  Misses         10       10              
- Partials        9       11       +2     
Impacted Files Coverage Δ
rechunker/api.py 90.26% <88.88%> (-2.87%) ⬇️
rechunker/executors/dask.py 100.00% <100.00%> (ø)
rechunker/executors/python.py 100.00% <100.00%> (ø)
rechunker/types.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 270a107...6732366. Read the comment docs.

@shoyer shoyer marked this pull request as draft July 24, 2020 01:09
rechunker/core.py Outdated Show resolved Hide resolved
@rabernat
Copy link
Member

Stephan it is really great to get this PR. I'm so happy that you have found time to help contribute to rechunker.

Overall, I really like your design. I will try to find time in the next few days (realistically, early next week) for a more thorough review. In the meantime, maybe @TomAugspurger can have a look?

@shoyer
Copy link
Collaborator Author

shoyer commented Jul 24, 2020

Stephan it is really great to get this PR. I'm so happy that you have found time to help contribute to rechunker.

Thank you for releasing this tool in the first place! This fills an important niche for our current project, so I'm excited to be able to work with you on it.

Comment on lines 6 to 20
class CopySpec(NamedTuple):
"""Specifcation of how to copy between two arrays."""

# TODO: remove Any by making CopySpec a Generic, once we only support Python
# 3.7+: https://stackoverflow.com/questions/50530959
source: Any
target: zarr.Array
chunks: Tuple[int, ...]


class StagedCopySpec:
stages: Tuple[CopySpec, ...]

def __init__(self, stages: Iterable[CopySpec]):
self.stages = tuple(stages)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might consider adding (or at least documenting) some invariant checks when constructing these objects, e.g.,

  • source and target should have same shape and dtype
  • no element of chunks should be larger than shape
  • the target and source of subsequent stages should be the same array

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The all looks good to me. The decision to return a Delayed subclass was primarily out of laziness (on my part).

rechunker/api.py Show resolved Hide resolved
@rabernat
Copy link
Member

Just a thought that occurred to me last night: it would be awesome to implement a prefect scheduler as well.

@shoyer
Copy link
Collaborator Author

shoyer commented Jul 25, 2020

I implemented a second executor just using Python. It's only ~15 lines of code and should be a useful reference.

Looking at the two executors (Dask and Python), it felt like a class would be appropriate to codify the interface. So now we have a (very lightweight) Executor class.

@shoyer shoyer marked this pull request as ready for review July 25, 2020 06:18
@shoyer shoyer marked this pull request as draft July 25, 2020 06:20
@shoyer shoyer marked this pull request as ready for review July 27, 2020 04:10
@shoyer
Copy link
Collaborator Author

shoyer commented Jul 27, 2020

This is ready for a full review.

I'm intentionally not documenting the Executor class interface in the docs for now, because I suspect it will change as we write the next few executors. For now, anyone who wants to explore it should be willing to dive into the source code, and ideally would submit their Executor upstream into Rechunker itself!

Copy link
Member

@rabernat rabernat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stephan, this is really wonderful. Thanks for doing this so carefully and thoroughly. I have no issues that can block merging. Just some questions to make sure I understand your code, for maintainability's sake.

@@ -309,6 +301,7 @@ def _rechunk_array(
f"Got array_dims {array_dims}, target_chunks {target_chunks}."
)

# TODO: rewrite to avoid the hard dependency on dask
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree!

I didn't do that yet because removing the hard dask dependency felt orthogonal to the pain point of this PR. But it's definitely worth doing soon...

ranges = [range(math.ceil(s / c)) for s, c in zip(shape, chunks)]
for indices in itertools.product(*ranges):
key = tuple(slice(c * i, c * (i + 1)) for i, c in zip(indices, chunks))
target_array[key] = source_array[key]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I just say that this is a beautiful little piece of code. 🏅

from rechunker.types import CopySpec, StagedCopySpec, Executor


Thunk = Callable[[], None]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those of us who are new to typing would appreciate a comment about what Thunk is supposed to be.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"thunk" is just a name for a "function without any arguments", but the name doesn't provide much value here. See
#33 for removing it.

T = TypeVar("T")


class Executor(Generic[T]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand why you didn't use an ABC here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love ABCs because they add slow/picky runtime checks, though I guess that doesn't really matter here. Python's typing module has zero overhead, and if you don't care about type checking you don't even have to make your code pass the type checker.

return _staged_copy(specs)

def execute_plan(self, plan: Delayed, **kwargs):
return plan.compute(**kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, when we subclassed Delayed, we could call persist on the Recunked object. That was convenient if you wanted to avoid blocking. Now our only option is compute. I'm fine with this change here, but maybe we should think about how to bring back that capability in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's exposed as the .plan attribute on the Rechunked object. So you could write rechunk(...).plan.persist().

That said, as I think more about how this API would or would not make sense for other schedulers, I wonder if it really makes sense to have an execute() method on Rechunked objects at all. The natural way to execute on particular schedulers may not fit neatly into that API, so perhaps it's better to encourage users to work with the scheduler specific objects representing delayed execution graphs (e.g., dask.delayed or apache_beam.PTransform).

@rabernat rabernat merged commit 721c601 into pangeo-data:master Jul 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants