Description
MCVE Code Sample
import numpy as np
import xarray as xr
ds = xr.Dataset({
'signal': (['das_time', 'das', 'record'], np.empty((1000, 120, 45))),
'min_height': (['das'], np.empty((120,))) # each DAS has a different resolution
})
def some_peak_finding_func(data1d, min_height):
"""process data1d with contraints by min_height"""
result = np.zeros((4,2)) # summary matrix with 2 peak characteristics
return result
ds_dask = ds.chunk({'record':3})
xr.apply_ufunc(some_peak_finding_func, ds_dask['signal'], ds_dask['min_height'],
input_core_dims=[['das_time'], []], # apply peak finding along trace
output_core_dims=[['peak_pos', 'pulse']],
vectorize=True, # up to here works without dask!
dask='parallelized',
output_sizes={'peak_pos': 4, 'pulse':2},
output_dtypes=[np.float],
)
fails with ValueError: cannot call
vectorize with a signature including new output dimensions on size 0 inputs
because dask.array.utils.compute_meta()
passes it 0-sized arrays.
Expected Output
This should work and works well on the non-chunked ds, without dask='parallelized'
and the associated output*
parameters.
Problem Description
I'm trying to parallelize a peak finding routine with dask (works well without it) and I hoped that dask='parallelized
would make that simple. However, the peak finding needs to be vectorized and it works well with vectorize=True, but
np.vectorizeappears to have issues in
compute_meta` which is internally issued by dask in blockwise application as indicated in the source code:
https://github.com/dask/dask/blob/e6ba8f5de1c56afeaed05c39c2384cd473d7c893/dask/array/utils.py#L118
A possible solution might be for apply_ufunc
to pass meta
directly to dask if it would be possible to foresee what meta
should be. I suppose we are aiming for np.nadarray
most of the time, though sparse
might change that in the future.
I know I could use groupby-apply as an alternative, but there are several issues that made us use apply_ufunc
instead:
- groupby-apply seems to have much larger overhead
- the non-core dimensions would have to be stacked into a new dimension over which to groupby, but some of the dimensions to be stacked are already a MutliIndex and cannot be easily stacked.
- we could unstack the MultiIndex dimensions first at the risk of introducing quite a number of NaNs
- extra coords might lose dimension infromation (will depend on all) after unstacking after application
Output of xr.show_versions()
commit: None
python: 3.7.4 (default, Aug 13 2019, 20:35:49)
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 4.9.0-11-amd64
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.4
libnetcdf: 4.6.1
xarray: 0.14.0
pandas: 0.25.1
numpy: 1.17.2
scipy: 1.3.1
netCDF4: 1.4.2
pydap: None
h5netcdf: 0.7.4
h5py: 2.9.0
Nio: None
zarr: None
cftime: 1.0.4.2
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.2.1
dask: 2.5.2
distributed: 2.5.2
matplotlib: 3.1.1
cartopy: None
seaborn: 0.9.0
numbagg: None
setuptools: 41.4.0
pip: 19.2.3
conda: 4.7.12
pytest: 5.2.1
IPython: 7.8.0
sphinx: 2.2.0