- 
          
 - 
                Notifications
    
You must be signed in to change notification settings  - Fork 69
 
Implement option to use dask to do image co-addition #388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
          Codecov ReportAttention: Patch coverage is  
 
 Additional details and impacted files@@            Coverage Diff             @@
##             main     #388      +/-   ##
==========================================
- Coverage   93.60%   89.36%   -4.25%     
==========================================
  Files          25       25              
  Lines         892      959      +67     
==========================================
+ Hits          835      857      +22     
- Misses         57      102      +45     ☔ View full report in Codecov by Sentry.  | 
    
| 
               | 
          ||
| @delayed(pure=True) | ||
| def as_delayed_memmap_path(array, tmp_dir): | ||
| tmp_dir = tempfile.mkdtemp() # FIXME | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why I didn't catch this before but I ran into an issue where the temporary directory no longer exists by the time the array is computed out of dask when using return_type='dask'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yikes - are you saying that is a problem with the current implementation, or this line is fixing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a temporary fix for this PR but I need to figure out a proper solution. It is a problem in main.
| 
           Ok so the   | 
    
| 
           I get the following traceback when trying on my big cubes: Writing
[                                        ] | 0% Completed | 548.30 ms
Traceback (most recent call last):
  File "<ipython-input-3-9471e55417d0>", line 1, in <module>
    make_downsampled_cube(f'{basepath}/mosaics/cubes/CS21_CubeMosaic.fits',
  File "/orange/adamginsburg/ACES/reduction_ACES/aces/imaging/make_mosaic.py", line 721, in make_downsampled_cube
    dscube.write(outcubename, overwrite=overwrite)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py", line 1382, in write
    super().write(*args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/core.py", line 131, in __call__
    registry.write(self._instance, *args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/registry/compat.py", line 52, in wrapper
    return getattr(registry, method_name)(*args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/registry/core.py", line 383, in write
    return writer(data, *args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/fits.py", line 280, in write_fits_cube
    hdulist.writeto(filename, overwrite=overwrite)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/hdulist.py", line 1021, in writeto
    hdu._writeto(hdulist._file)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 710, in _writeto
    self._writeto_internal(fileobj, inplace, copy)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 716, in _writeto_internal
    data_offset, data_size = self._writedata(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 647, in _writedata
    size += self._writedata_internal(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/image.py", line 649, in _writedata_internal
    return self._writeinternal_dask(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/image.py", line 740, in _writeinternal_dask
    output.store(outarr, lock=True, compute=True)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1770, in store
    r = store([self], [target], **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1238, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 342, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 557, in get_sync
    return get_async(
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 542, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 229, in execute_task
    result = pack_exception(e, dumps)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 224, in execute_task
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 122, in getter
    c = a[b]
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py", line 199, in __getitem__
    return self._mask._filled(data=self._data,
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/masks.py", line 240, in _filled
    return np.ma.masked_array(sliced_data, mask=ex).filled(fill)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/numpy/ma/core.py", line 2826, in __new__
    _data = np.array(data, dtype=dtype, copy=copy,
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1702, in __array__
    x = self.compute()
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 557, in get_sync
    return get_async(
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 542, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 229, in execute_task
    result = pack_exception(e, dumps)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1284, in finalize
    return concatenate3(results)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 5289, in concatenate3
    result = np.empty(shape=shape, dtype=dtype(deepfirst(arrays)))
MemoryError: Unable to allocate 634. GiB for an array with shape (300, 10200, 27800) and data type >f8I might have copy-pasted the traceback a bit incorrectly, it's very deep  | 
    
| 
           @keflavich - just to check, is that how big you would expect the output cube to be? Or is that incorrect? What is the command you are using? If it is what you expect, have you tried using   | 
    
| 
           that's the size of the input cube, not the output. Call was this: dscube = cube.reproject(hdr, return_type='dask', filled=False, parallel=True, block_size=[1,1000,1000])Yes, passing the memmaped array as output sounds like a good idea - can try that next  | 
    
| 
           Which branch of spectral-cube are you using?  | 
    
| 
           I'm on cubewcs_mosaic_and_dask_reproject, https://github.com/keflavich/spectral-cube/tree/cubewcs_mosaic_and_dask_reproject. It's not really a stable branch... I'm doing all sorts of experiments in parallel and getting a bit lost along the way.  | 
    
| 
           This is going to require some non-trivial rebasing  | 
    
This is an experiment for now but it allows:
combine_function='median'(not implemented here but trivial to do)reproject_and_coaddto optionally return a dask array (return_type='dask') which means actually delaying the computation of the co-addition. You can extract a cutout from a co-added mosaic and compute that without ever having to actually compute the full mosaic.At the moment, setting
block_sizeis what toggles between the old code and the new dask-backed co-addition.Still a work in progress and currently looking into how to get the parallel mode to work correctly (and actually be fast).
@keflavich - would you be able to try this out with some of your data? To use this, you just need to set e.g.
block_size=(100, 100, 100)(or whatever you think makes sense) when calling thereproject_and_coaddfunction. For now, I am still investigating performance with the parallel mode, so I don't recommend trying to setparallel=...yet (though you technically can). I'm just curious to know at this point if it even gets close to working for you.