Description
Is your feature request related to a problem? Please describe.
Currently, when using apply_ufunc with dask=parallelized
the wrapped function receives numpy arrays upon computation.
Some xarray operations generate enormous amount of chunks (best example : da.groupby('time.dayofyear')
, so any complex script using dask ends up with huge task graphs. Dask's scheduler becomes overloaded, sometimes even hangs, sometimes uses way more RAM than its workers.
Describe the solution you'd like
I'd want to profit from both the tools of xarray and the power of dask parallelization. I'd like to be able to do something like this:
def func(da):
"""Example of an operation not (easily) possible with numpy."""
return da.groupby('time').mean()
xr.apply_ufunc(
da,
func,
input_core_dims=[['time']],
pass_xr=True,
dask='parallelized'
)
I'd like the wrapped func to receive DataArrays resembling the inputs (named dims, coords and all), but only with the subset of that dask chunk. Doing this, the whole function gets parallelized : dask only sees 1 task and I can code using xarray. Depending on the implementation, it might be less efficient than dask=allowed
for small dataset, but I think this could be beneficial for long and complex computations on large datasets.
Describe alternatives you've considered
The alternative is to reduce the size of the datasets (looping on other dimensions), but that defeats the purpose of dask.
Another alternative I am currently testing, is to add a layer between apply_ufunc and the func
. That layer reconstruct a DataArray and deconstructs it before returning the result, so xarray/dask only passing by. If this works and is elegant enough, I can maybe suggest an implementation within xarray.