Description
I have a situation where I build large zarr arrays based on chunks which correspond to how I am reading data off a filesystem, for best I/O performance. Then I set these as variables on an xarray dataset which I want to persist to zarr, but with different chunks more optimal for querying.
One problem I ran into is that manually selecting chunks of a dataset prior to to_zarr
results in
xarray/xarray/backends/zarr.py
Line 83 in 66be9c5
It's difficult for me to understand exactly how to select chunks manually at the dataset level which would also make this zarr "final chunk" constraint happy. I would have been satisfied however with letting zarr choose chunks for me, but could not find a way to trigger this through the xarray API short of "unchunking" it first, which would lead to loading entire variables into memory. I came up with the following hack to trigger zarr's automatic chunking despite having differently defined chunks on my xarray dataset:
# monkey patch to get zarr to ignore dask chunks and use its own heuristics
def copy_func(f):
g = types.FunctionType(f.__code__, f.__globals__, name=f.__name__,
argdefs=f.__defaults__,
closure=f.__closure__)
g = functools.update_wrapper(g, f)
g.__kwdefaults__ = f.__kwdefaults__
return g
orig_determine_zarr_chunks = copy_func(xr.backends.zarr._determine_zarr_chunks)
xr.backends.zarr._determine_zarr_chunks = lambda enc_chunks, var_chunks, ndim: orig_determine_zarr_chunks(enc_chunks, None, ndim)
The next problem to contend with is that da.store
between zarr stores with differing chunks between source and destination is astronomically slow. The first thing to attempt would be to rechunk the dask arrays according to the destination zarr chunks, but xarray's consistent chunks constraint blocks this strategy as far as I can tell. Once again I took the dirty hack approach and inject a rechunking on a per-variable basis during the to_zarr
operation, as follows:
# monkey patch to make dask arrays writable with different chunks than zarr dest
# could do without this but would have to contend with 'inconsistent chunks' on dataset
def sync_using_zarr_copy(self, compute=True):
if self.sources:
import dask.array as da
rechunked_sources = [source.rechunk(target.chunks)
for source, target in zip(self.sources, self.targets)]
delayed_store = da.store(rechunked_sources, self.targets,
lock=self.lock, compute=compute,
flush=True)
self.sources = []
self.targets = []
return delayed_store
xr.backends.common.ArrayWriter.sync = sync_using_zarr_copy
I may have missed something in the API that would have made this easier, or another workaround which would be less hacky, but in any case I'm wondering if this scenario could be handled elegantly in xarray.
I'm not sure if there is a plan going forward to make legal xarray chunks 100% compatible with zarr; if so that would go a fair ways in alleviating the first problem. Alternatively, perhaps the xarray API could expose some ability to adjust chunks according to zarr's liking, as well as the option of defaulting entirely to zarr's heuristics for chunking.
As for the performance issue with differing chunks, I'm not sure whether my rechunking patch could be applied without causing side-effects. Or where the right place to solve this would be-- perhaps it could be more naturally addressed within da.store
.