Skip to content

apply_ufunc with dask='parallelized' and vectorize=True fails on compute_meta #3574

Closed
@smartass101

Description

@smartass101

MCVE Code Sample

import numpy as np
import xarray as xr

ds = xr.Dataset({
    'signal': (['das_time', 'das', 'record'], np.empty((1000, 120, 45))),
    'min_height': (['das'], np.empty((120,)))   # each DAS has a different resolution
})

def some_peak_finding_func(data1d, min_height):
    """process data1d with contraints by min_height"""
    result = np.zeros((4,2))  # summary matrix with 2 peak characteristics
    return result

ds_dask = ds.chunk({'record':3})

xr.apply_ufunc(some_peak_finding_func, ds_dask['signal'], ds_dask['min_height'],
               input_core_dims=[['das_time'], []],  # apply peak finding along trace
               output_core_dims=[['peak_pos', 'pulse']],
               vectorize=True, # up to here works without dask!
               dask='parallelized',
               output_sizes={'peak_pos': 4, 'pulse':2},
               output_dtypes=[np.float],
              )

fails with ValueError: cannot call vectorize with a signature including new output dimensions on size 0 inputs because dask.array.utils.compute_meta() passes it 0-sized arrays.

Expected Output

This should work and works well on the non-chunked ds, without dask='parallelized' and the associated output* parameters.

Problem Description

I'm trying to parallelize a peak finding routine with dask (works well without it) and I hoped that dask='parallelized would make that simple. However, the peak finding needs to be vectorized and it works well with vectorize=True, but np.vectorizeappears to have issues incompute_meta` which is internally issued by dask in blockwise application as indicated in the source code:

https://github.com/dask/dask/blob/e6ba8f5de1c56afeaed05c39c2384cd473d7c893/dask/array/utils.py#L118

A possible solution might be for apply_ufunc to pass meta directly to dask if it would be possible to foresee what meta should be. I suppose we are aiming for np.nadarray most of the time, though sparse might change that in the future.

I know I could use groupby-apply as an alternative, but there are several issues that made us use apply_ufunc instead:

  • groupby-apply seems to have much larger overhead
  • the non-core dimensions would have to be stacked into a new dimension over which to groupby, but some of the dimensions to be stacked are already a MutliIndex and cannot be easily stacked.
    • we could unstack the MultiIndex dimensions first at the risk of introducing quite a number of NaNs
    • extra coords might lose dimension infromation (will depend on all) after unstacking after application

Output of xr.show_versions()

commit: None
python: 3.7.4 (default, Aug 13 2019, 20:35:49)
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 4.9.0-11-amd64
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.4
libnetcdf: 4.6.1

xarray: 0.14.0
pandas: 0.25.1
numpy: 1.17.2
scipy: 1.3.1
netCDF4: 1.4.2
pydap: None
h5netcdf: 0.7.4
h5py: 2.9.0
Nio: None
zarr: None
cftime: 1.0.4.2
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.2.1
dask: 2.5.2
distributed: 2.5.2
matplotlib: 3.1.1
cartopy: None
seaborn: 0.9.0
numbagg: None
setuptools: 41.4.0
pip: 19.2.3
conda: 4.7.12
pytest: 5.2.1
IPython: 7.8.0
sphinx: 2.2.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions