-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement dask
methods on DataTree
#9670
Merged
Merged
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
f625f5d
implement `compute` and `load`
keewis 507fb7d
also shallow-copy variables
keewis ce1683a
implement `chunksizes`
keewis f2a4683
add tests for `load`
keewis f0ff30f
add tests for `chunksizes`
keewis d12203c
improve the `load` tests using `DataTree.chunksizes`
keewis dda02ed
add a test for `compute`
keewis 329c689
un-xfail a xpassing test
keewis c9fb461
implement and test `DataTree.chunk`
keewis 0305fc5
link to `Dataset.load`
keewis e2a3a14
use `tree.subtree` to get absolute paths
keewis 7f57ffa
filter out missing dims before delegating to `Dataset.chunk`
keewis 900701b
fix the type hints for `DataTree.chunksizes`
keewis d45dbd0
try using `self.from_dict` instead
keewis 5f88937
type-hint intermediate test variables
keewis b515972
Merge branch 'main' into datatree-dask-methods
keewis 39d95f6
use `_node_dims` instead
keewis 73b4466
raise on unknown chunk dim
keewis 8b35676
check that errors in `chunk` are raised properly
keewis d70f4f0
adapt the docstrings of the new methods
keewis 3e9745f
allow computing / loading unchunked trees
keewis b6c5f9a
reword the `chunksizes` properties
keewis da8df36
also freeze the top-level chunk sizes
keewis b11a1ef
also reword `DataArray.chunksizes`
keewis 53c0897
fix a copy-paste error
keewis f7e31b4
same for `NamedArray.chunksizes`
keewis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
from xarray.core._aggregations import DataTreeAggregations | ||
from xarray.core._typed_ops import DataTreeOpsMixin | ||
from xarray.core.alignment import align | ||
from xarray.core.common import TreeAttrAccessMixin | ||
from xarray.core.common import TreeAttrAccessMixin, get_chunksizes | ||
from xarray.core.coordinates import Coordinates, DataTreeCoordinates | ||
from xarray.core.dataarray import DataArray | ||
from xarray.core.dataset import Dataset, DataVariables | ||
|
@@ -49,6 +49,8 @@ | |
parse_dims_as_set, | ||
) | ||
from xarray.core.variable import Variable | ||
from xarray.namedarray.parallelcompat import get_chunked_array_type | ||
from xarray.namedarray.pycompat import is_chunked_array | ||
|
||
try: | ||
from xarray.core.variable import calculate_dimensions | ||
|
@@ -68,8 +70,11 @@ | |
ErrorOptions, | ||
ErrorOptionsWithWarn, | ||
NetcdfWriteModes, | ||
T_ChunkDimFreq, | ||
T_ChunksFreq, | ||
ZarrWriteModes, | ||
) | ||
from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint | ||
|
||
# """ | ||
# DEVELOPERS' NOTE | ||
|
@@ -862,9 +867,9 @@ def _copy_node( | |
) -> Self: | ||
"""Copy just one node of a tree.""" | ||
new_node = super()._copy_node(inherit=inherit, deep=deep, memo=memo) | ||
data = self._to_dataset_view(rebuild_dims=False, inherit=inherit) | ||
if deep: | ||
data = data._copy(deep=True, memo=memo) | ||
data = self._to_dataset_view(rebuild_dims=False, inherit=inherit)._copy( | ||
deep=deep, memo=memo | ||
) | ||
new_node._set_node_data(data) | ||
return new_node | ||
|
||
|
@@ -1896,3 +1901,198 @@ def apply_indexers(dataset, node_indexers): | |
|
||
indexers = either_dict_or_kwargs(indexers, indexers_kwargs, "sel") | ||
return self._selective_indexing(apply_indexers, indexers) | ||
|
||
def load(self, **kwargs) -> Self: | ||
"""Manually trigger loading and/or computation of this datatree's data | ||
from disk or a remote source into memory and return this datatree. | ||
Unlike compute, the original datatree is modified and returned. | ||
|
||
Normally, it should not be necessary to call this method in user code, | ||
because all xarray functions should either work on deferred data or | ||
load data automatically. However, this method can be necessary when | ||
working with many file objects on disk. | ||
|
||
Parameters | ||
---------- | ||
**kwargs : dict | ||
Additional keyword arguments passed on to ``dask.compute``. | ||
|
||
See Also | ||
-------- | ||
Dataset.load | ||
dask.compute | ||
keewis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
# access .data to coerce everything to numpy or dask arrays | ||
lazy_data = { | ||
path: { | ||
k: v._data | ||
for k, v in node.variables.items() | ||
if is_chunked_array(v._data) | ||
} | ||
for path, node in self.subtree_with_keys | ||
} | ||
flat_lazy_data = { | ||
(path, var_name): array | ||
for path, node in lazy_data.items() | ||
for var_name, array in node.items() | ||
} | ||
if lazy_data: | ||
chunkmanager = get_chunked_array_type(*flat_lazy_data.values()) | ||
|
||
# evaluate all the chunked arrays simultaneously | ||
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( | ||
*flat_lazy_data.values(), **kwargs | ||
) | ||
|
||
for (path, var_name), data in zip( | ||
flat_lazy_data, evaluated_data, strict=False | ||
): | ||
self[path].variables[var_name].data = data | ||
keewis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# load everything else sequentially | ||
for node in self.subtree: | ||
for k, v in node.variables.items(): | ||
if k not in lazy_data: | ||
v.load() | ||
|
||
return self | ||
|
||
def compute(self, **kwargs) -> Self: | ||
"""Manually trigger loading and/or computation of this datatree's data | ||
from disk or a remote source into memory and return a new datatree. | ||
Unlike load, the original datatree is left unaltered. | ||
|
||
Normally, it should not be necessary to call this method in user code, | ||
because all xarray functions should either work on deferred data or | ||
load data automatically. However, this method can be necessary when | ||
working with many file objects on disk. | ||
|
||
Parameters | ||
---------- | ||
**kwargs : dict | ||
Additional keyword arguments passed on to ``dask.compute``. | ||
|
||
Returns | ||
------- | ||
object : DataTree | ||
New object with lazy data variables and coordinates as in-memory arrays. | ||
|
||
See Also | ||
-------- | ||
dask.compute | ||
""" | ||
new = self.copy(deep=False) | ||
return new.load(**kwargs) | ||
|
||
@property | ||
def chunksizes(self) -> Mapping[str, Mapping[Hashable, tuple[int, ...]]]: | ||
""" | ||
Mapping from group paths to a mapping of dimension names to block lengths for this datatree's data, or None if | ||
the underlying data is not a dask array. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apparently this docstring (and the one for |
||
|
||
Cannot be modified directly, but can be modified by calling .chunk(). | ||
|
||
See Also | ||
-------- | ||
DataTree.chunk | ||
Dataset.chunksizes | ||
""" | ||
return { | ||
node.path: get_chunksizes(node.variables.values()) for node in self.subtree | ||
} | ||
|
||
def chunk( | ||
self, | ||
chunks: T_ChunksFreq = {}, # noqa: B006 # {} even though it's technically unsafe, is being used intentionally here (#4667) | ||
name_prefix: str = "xarray-", | ||
token: str | None = None, | ||
lock: bool = False, | ||
inline_array: bool = False, | ||
chunked_array_type: str | ChunkManagerEntrypoint | None = None, | ||
from_array_kwargs=None, | ||
**chunks_kwargs: T_ChunkDimFreq, | ||
) -> Self: | ||
"""Coerce all arrays in all groups in this tree into dask arrays with the given | ||
chunks. | ||
|
||
Non-dask arrays in this tree will be converted to dask arrays. Dask | ||
arrays will be rechunked to the given chunk sizes. | ||
|
||
If neither chunks is not provided for one or more dimensions, chunk | ||
sizes along that dimension will not be updated; non-dask arrays will be | ||
converted into dask arrays with a single block. | ||
|
||
Along datetime-like dimensions, a :py:class:`groupers.TimeResampler` object is also accepted. | ||
|
||
Parameters | ||
---------- | ||
chunks : int, tuple of int, "auto" or mapping of hashable to int or a TimeResampler, optional | ||
Chunk sizes along each dimension, e.g., ``5``, ``"auto"``, or | ||
``{"x": 5, "y": 5}`` or ``{"x": 5, "time": TimeResampler(freq="YE")}``. | ||
name_prefix : str, default: "xarray-" | ||
Prefix for the name of any new dask arrays. | ||
token : str, optional | ||
Token uniquely identifying this datatree. | ||
lock : bool, default: False | ||
Passed on to :py:func:`dask.array.from_array`, if the array is not | ||
already as dask array. | ||
inline_array: bool, default: False | ||
Passed on to :py:func:`dask.array.from_array`, if the array is not | ||
already as dask array. | ||
chunked_array_type: str, optional | ||
Which chunked array type to coerce this datatree's arrays to. | ||
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEntryPoint` system. | ||
Experimental API that should not be relied upon. | ||
from_array_kwargs: dict, optional | ||
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create | ||
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg. | ||
For example, with dask as the default chunked array type, this method would pass additional kwargs | ||
to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon. | ||
**chunks_kwargs : {dim: chunks, ...}, optional | ||
The keyword arguments form of ``chunks``. | ||
One of chunks or chunks_kwargs must be provided | ||
|
||
Returns | ||
------- | ||
chunked : xarray.DataTree | ||
|
||
See Also | ||
-------- | ||
Dataset.chunk | ||
Dataset.chunksizes | ||
xarray.unify_chunks | ||
dask.array.from_array | ||
""" | ||
# don't support deprecated ways of passing chunks | ||
if not isinstance(chunks, Mapping): | ||
raise TypeError( | ||
f"invalid type for chunks: {type(chunks)}. Only mappings are supported." | ||
) | ||
combined_chunks = either_dict_or_kwargs(chunks, chunks_kwargs, "chunk") | ||
|
||
all_dims = self._get_all_dims() | ||
|
||
bad_dims = combined_chunks.keys() - all_dims | ||
if bad_dims: | ||
raise ValueError( | ||
f"chunks keys {tuple(bad_dims)} not found in data dimensions {tuple(all_dims)}" | ||
) | ||
|
||
rechunked_groups = { | ||
path: node.dataset.chunk( | ||
{ | ||
dim: size | ||
for dim, size in combined_chunks.items() | ||
if dim in node._node_dims | ||
}, | ||
name_prefix=name_prefix, | ||
token=token, | ||
lock=lock, | ||
inline_array=inline_array, | ||
chunked_array_type=chunked_array_type, | ||
from_array_kwargs=from_array_kwargs, | ||
) | ||
for path, node in self.subtree_with_keys | ||
} | ||
|
||
return self.from_dict(rechunked_groups, name=self.name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!