Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
combine_by_coords,
)
from ..core.dataarray import DataArray
from ..core.dataset import Dataset
from ..core.dataset import Dataset, _maybe_chunk
from ..core.utils import close_on_error, is_grib_path, is_remote_uri
from .common import AbstractDataStore, ArrayWriter
from .locks import _get_scheduler
Expand Down Expand Up @@ -525,7 +525,12 @@ def maybe_decode_store(store, chunks, lock=False):
chunks = dict.fromkeys(ds.dims, chunks)

variables = {
k: store.maybe_chunk(k, v, chunks, overwrite_encoded_chunks)
k: _maybe_chunk(
k,
v,
store.get_chunk(k, v, chunks),
overwrite_encoded_chunks=overwrite_encoded_chunks,
)
for k, v in ds.variables.items()
}
ds2 = ds._replace(variables)
Expand Down
16 changes: 0 additions & 16 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,22 +390,6 @@ def get_chunk(self, name, var, chunks):
chunk_spec[dim] = chunks[dim]
return chunk_spec

def maybe_chunk(self, name, var, chunks, overwrite_encoded_chunks):
chunk_spec = self.get_chunk(name, var, chunks)

if (var.ndim > 0) and (chunk_spec is not None):
from dask.base import tokenize

# does this cause any data to be read?
token2 = tokenize(name, var._data, chunks)
name2 = f"xarray-{name}-{token2}"
var = var.chunk(chunk_spec, name=name2, lock=None)
if overwrite_encoded_chunks and var.chunks is not None:
var.encoding["chunks"] = tuple(x[0] for x in var.chunks)
return var
else:
return var

def store(
self,
variables,
Expand Down
53 changes: 32 additions & 21 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,34 @@ def _assert_empty(args: tuple, msg: str = "%s") -> None:
raise ValueError(msg % args)


def _maybe_chunk(
name,
var,
chunks=None,
token=None,
lock=None,
name_prefix="xarray-",
overwrite_encoded_chunks=False,
):
from dask.base import tokenize

if chunks is not None:
chunks = {dim: chunks[dim] for dim in var.dims if dim in chunks}
if var.ndim:
# when rechunking by different amounts, make sure dask names change
# by provinding chunks as an input to tokenize.
# subtle bugs result otherwise. see GH3350
token2 = tokenize(name, token if token else var._data, chunks)
name2 = f"{name_prefix}{name}-{token2}"
var = var.chunk(chunks, name=name2, lock=lock)

if overwrite_encoded_chunks and var.chunks is not None:
var.encoding["chunks"] = tuple(x[0] for x in var.chunks)
return var
else:
return var


def as_dataset(obj: Any) -> "Dataset":
"""Cast the given object to a Dataset.

Expand Down Expand Up @@ -1761,7 +1789,6 @@ def chunk(
-------
chunked : xarray.Dataset
"""
from dask.base import tokenize

if isinstance(chunks, (Number, str)):
chunks = dict.fromkeys(self.dims, chunks)
Expand All @@ -1774,26 +1801,10 @@ def chunk(
"object: %s" % bad_dims
)

def selkeys(dict_, keys):
if dict_ is None:
return None
return {d: dict_[d] for d in keys if d in dict_}

def maybe_chunk(name, var, chunks):
chunks = selkeys(chunks, var.dims)
if not chunks:
chunks = None
if var.ndim > 0:
# when rechunking by different amounts, make sure dask names change
# by provinding chunks as an input to tokenize.
# subtle bugs result otherwise. see GH3350
token2 = tokenize(name, token if token else var._data, chunks)
name2 = f"{name_prefix}{name}-{token2}"
return var.chunk(chunks, name=name2, lock=lock)
else:
return var

variables = {k: maybe_chunk(k, v, chunks) for k, v in self.variables.items()}
variables = {
k: _maybe_chunk(k, v, chunks, token, lock, name_prefix)
for k, v in self.variables.items()
}
return self._replace(variables)

def _validate_indexers(
Expand Down