-
-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dask-image imread v0.5.0 not working with dask distributed Client & napari #194
Comments
Talley says: interesting. This does appear to be at least partly related to the interplay between napari and from dask.distributed import Client
import napari
import dask.array as da
client = Client()
data = da.ones((64,64))
napari.view_image(data) also, if I cast the dask_image to numpy prior to passing to napari, it also seems to work: from dask.distributed import Client
import napari
from dask_image.imread import imread
import numpy as np
client = Client()
data = imread('./myFiles/*.tif')
napari.view_image(np.array(data)) |
From @kpasko here's the full trace |
@jakirkham adds this:
In particular would be good to test 0.4.0 to see if this works there as 0.5.0 started using And from @GenevieveBuckley there's this: Can confirm, this problem was introduced with You can use either from dask.distributed import Client
import napari
from dask.array.image import imread
client = Client()
data = imread('./myFiles/*.tif')
napari.view_image(data) |
cc @jrbourbeau (if you still want some example data, you can download some here, but honestly you can use any single image to try this out - .png, .jpg, .tiff, don't think it matters). I wonder if #182 might be the issue
I did not consider what might happen with this in a distributed context (oops). So that's probably a good first place to look. cc @m-albert |
Would just add it’s possible the issue tracks back to Dask. Though I don’t think we’ve isolated it yet. It might be worth trying to reproduce using just |
This is all very fascinating. 👀 |
So, the plot thickens... I can't reproduce. @GenevieveBuckley what is your cloudpickle version? I just installed it at 1.6 and I was able to run your example without issues. (though it should be from dask.distributed import Client
import napari
from dask_image.imread import imread
client = Client()
data = imread('./*.tif')
napari.view_image(data) |
Ok, I will edit the example up top. I'm also think there's probably no need to involve napari in the example at all, it's entirely possible that calling |
Well, @tlambert03's example (using np.asarray) would seem to disprove that... But I haven't checked it locally. |
Oh yeah, I'd almost forgotten about that. It turns out it's even more interesting than I'd thought.
from dask.distributed import Client
import napari
from dask_image.imread import imread
client = Client()
data = imread('*.tif')
napari.view_image(data) # you get an error
data.compute() # this works fine
napari.view_image(data) # works fine now Error message:---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-9-4d014add55bf> in <module>
----> 1 napari.view_image(data)
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/view_layers.py in view_image(data, channel_axis, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale, title, ndisplay, order, axis_labels, show)
7 and the ``Viewer.add_<layer_type>`` methods. The final generated functions
8 follow this pattern
----> 9 (where <layer_type> is replaced with one of the layer types):
10
11 def view_<layer_type>(*args, **kwargs):
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/components/viewer_model.py in add_image(self, data, channel_axis, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale)
674 "did you mean to specify a 'channel_axis'? "
675 )
--> 676 layer = image_class(data, **kwargs)
677 self.layers.append(layer)
678
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in __init__(self, data, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale)
274
275 # Trigger generation of view slice and thumbnail
--> 276 self._update_dims()
277
278 def _new_empty_slice(self):
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in _update_dims(self, event)
528 self._ndim = ndim
529
--> 530 self.refresh()
531 self._value = self.get_value(self.position, world=True)
532
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in refresh(self, event)
938 """Refresh all layer data based on current view slice."""
939 if self.visible:
--> 940 self.set_view_slice()
941 self.events.set_data()
942 self._update_thumbnail()
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in set_view_slice(self)
798 def set_view_slice(self):
799 with self.dask_optimized_slicing():
--> 800 self._set_view_slice()
801
802 @abstractmethod
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in _set_view_slice(self)
611 # Load our images, might be sync or async.
612 data = SliceDataClass(self, image_indices, image, thumbnail_source)
--> 613 self._load_slice(data)
614
615 def _load_slice(self, data: SliceDataClass):
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in _load_slice(self, data)
620 data : Slice
621 """
--> 622 if self._slice.load(data):
623 # The load was synchronous.
624 self._on_data_loaded(data, sync=True)
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_slice.py in load(self, data)
117 """
118 self.loaded = False # False until self._on_loaded is calls
--> 119 return self.loader.load(data)
120
121 def on_loaded(self, data: ImageSliceData) -> bool:
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_loader.py in load(self, data)
20 True if load happened synchronously.
21 """
---> 22 data.load_sync()
23 return True
24
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_slice_data.py in load_sync(self)
38 def load_sync(self) -> None:
39 """Call asarray on our images to load them."""
---> 40 self.image = np.asarray(self.image)
41
42 if self.thumbnail_source is not None:
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
100 return _asarray_with_like(a, dtype=dtype, order=order, like=like)
101
--> 102 return array(a, dtype, copy=False, order=order)
103
104
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
1446
1447 def __array__(self, dtype=None, **kwargs):
-> 1448 x = self.compute()
1449 if dtype and x.dtype != dtype:
1450 x = x.astype(dtype)
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
279 dask.base.compute
280 """
--> 281 (result,) = compute(self, traverse=False, **kwargs)
282 return result
283
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
561 postcomputes.append(x.__dask_postcompute__())
562
--> 563 results = schedule(dsk, keys, **kwargs)
564 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
565
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2633 Client.compute : Compute asynchronous collections
2634 """
-> 2635 futures = self._graph_to_futures(
2636 dsk,
2637 keys=set(flatten([keys])),
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
2541 dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
2542
-> 2543 dsk = highlevelgraph_pack(dsk, self, keyset)
2544
2545 annotations = {}
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/protocol/highlevelgraph.py in highlevelgraph_pack(hlg, client, client_keys)
122 }
123 )
--> 124 return dumps_msgpack({"layers": layers})
125
126
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/protocol/core.py in dumps_msgpack(msg, compression)
182 """
183 header = {}
--> 184 payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
185
186 fmt, payload = maybe_compress(payload, compression=compression)
~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/msgpack/__init__.py in packb(o, **kwargs)
33 See :class:`Packer` for options.
34 """
---> 35 return Packer(**kwargs).pack(o)
36
37
msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
TypeError: can not serialize 'function' object
|
Here's the Click to expand!
I made the env like this:
|
Just to clarify should this... data.compute() # this works fine ...be this? data = data.compute() # this works fine Or does it occur without that assignment? |
No, that's the crazy thing - I did not assign the result back to It seems that any kind of call that results in the dask array being computed (I tried this with |
I was afraid you might say that 😅 Is Napari doing some kind of caching these days? |
Critically, Would we lose much functionality if that became the canonical way to open images? Perhaps we could adapt it to use pims instead of skimage by default. This one wasn't on the list of imread functions profiled here. I'd say profiling the performance here might be a good next step. If it performs decently, maybe we move in that direction. |
Well I still suspect there is an underlying Dask issue that needs to be fixed. The error that we are seeing has to do with the serialization of the Dask task graph, which has seen changes in recent releases. I think if we can narrow it down to something using |
@jakirkham yes napari sets the dask cache to 1/10th of RAM on import. Yes I know it's bad, I've tried to argue to change that. 😅 |
Ah ok. So the call to |
idea: I was trying to figure out why the |
Wait which part is bad? I don't remember any arguments about this (and is it really only 1/10 now?) |
Hi guys, I found the following: Actually, the latest commit #182 fixes the reported problem when using Before: I checked this by changing the code from the last broken commit 5afde9a from a = dask.array.map_blocks(
_map_read_frame,
chunks=dask.array.core.normalize_chunks(
(nframes,) + shape[1:], shape),
fn=sfname,
arrayfunc=arrayfunc,
meta=arrayfunc([]).astype(dtype), # meta overwrites `dtype` argument
)
...
def _map_read_frame(x, block_info=None, **kwargs): into import numpy as np
a0 = dask.array.from_array(np.ones((2, 5, 5)), chunks=(1, 5, 5))
a = a0.map_blocks(
_map_read_frame,
chunks=dask.array.core.normalize_chunks(
(nframes,) + shape[1:], shape),
fn=sfname,
arrayfunc=arrayfunc,
meta=arrayfunc([]).astype(dtype), # meta overwrites `dtype` argument
)
...
def _map_read_frame(x, block_info=None, **kwargs): and confirming that the latter indeed resolves the issue (using two random input images). So, Regarding the root of the problem, I think @jakirkham is right that there could be an underlying import dask.array as da
import numpy as np
def func(block_info=None):
return 1
da.map_blocks(func, chunks=((4, 4),), dtype=np.float_).compute()[0]
da.map_blocks(func, chunks=((4, 4),), dtype=np.float_)[0].compute()
|
Thank you Marvin! 😄 Could you please file the Dask MRE as a Dask issue? 🙂 We can then follow up with people there |
@jakirkham 😅 Sorry I just realised the dask part of my comment is completely wrong, as In conclusion this doesn't hint to the underlying |
No worries 🙂 Yeah then it seems we still have more digging to do |
Well, it seems to me that modifying the behaviour of a popular third party library (dask) just by importing is undesirable. We see above that just importing napari altered the behaviour of dask during diagnostics. In general, imho there should be a visible function call to alter the dask caching behaviour. At any rate, we discussed this in various group meetings, just never seriously. The basic issue I've run into is that the dask cache is good-ish when exploring, but bad when using "play", because you fill up the cache and then spend the whole time putting stuff into the cache queue and deleting it from the back of it.
|
totally agree. let's change it! |
@GenevieveBuckley it shouldn't be that many commits to check which one did it? ie can you do git-bisect to find the commit that fixes things? (I would do this but as mentioned I couldn't reproduce this issue) |
Do we have any idea why you couldn't reproduce this? I'm pretty sure you're running the same as me, Ubuntu 20.04 |
No, my laptop remains on 18.04. But anyway I doubt it's an OS issue. I tried various combinations of cloudpickle/dask/dask-image and couldn't get the error... I did not try the complete env at the top of this issue, might try that this arvo. |
The breaking commit is 17ec4c2 which implements ...
a = dask.array.map_blocks(
_map_read_frame,
chunks=dask.array.core.normalize_chunks(
(nframes,) + shape[1:], shape),
fn=sfname,
arrayfunc=arrayfunc,
meta=arrayfunc([]).astype(dtype), # meta overwrites `dtype` argument
)
return a
def _map_read_frame(block_info=None, **kwargs):
i, j = block_info[None]['array-location'][0]
return _utils._read_frame(i=slice(i, j), **kwargs) After the latest commit 91fe6e1, the problem doesn't occur anymore. This commit changes the way ...
# place source filenames into dask array
filenames = sorted(glob.glob(sfname)) # pims also does this
if len(filenames) > 1:
ar = dask.array.from_array(filenames, chunks=(nframes,))
multiple_files = True
else:
ar = dask.array.from_array(filenames * shape[0], chunks=(nframes,))
multiple_files = False
# read in data using encoded filenames
a = ar.map_blocks(
_map_read_frame,
chunks=dask.array.core.normalize_chunks(
(nframes,) + shape[1:], shape),
multiple_files=multiple_files,
new_axis=list(range(1, len(shape))),
arrayfunc=arrayfunc,
meta=arrayfunc([]).astype(dtype), # meta overwrites `dtype` argument
)
return a
def _map_read_frame(x, multiple_files, block_info=None, **kwargs):
fn = x[0] # get filename from input chunk
if multiple_files:
i, j = 0, 1
else:
i, j = block_info[None]['array-location'][0]
return _utils._read_frame(fn=fn, i=slice(i, j), **kwargs) These code snippets include all relevant changed lines. Interestingly, the problem seems to be related to the differences in the use of import dask.array as da
import numpy as np
x = da.from_array(np.zeros(shape, dtype=dtype),
chunks=dask.array.core.normalize_chunks((nframes,) + shape[1:], shape),)
a = x.map_blocks(
_map_read_frame,
chunks=dask.array.core.normalize_chunks(
(nframes,) + shape[1:], shape),
fn=sfname,
arrayfunc=arrayfunc,
meta=arrayfunc([]).astype(dtype), # meta overwrites `dtype` argument
) However, in my understanding using import dask.array as da
import numpy as np
xn = np.random.randint(0, 100, (20, 30, 30))
xd = da.from_array(xn, chunks=(1, 5, 5))
def func():
return np.random.randint(0, 100, (1, 5, 5))
xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)
napari.view_image(xm) Something else to consider is the observation by @GenevieveBuckley that calling |
Okay, using However, when using EDIT: 2021.03.01 still shows the problem. See https://docs.dask.org/en/latest/changelog.html#id5 and https://distributed.dask.org/en/latest/changelog.html#id5. It could be something related to the point
as the traceback shows that this line fails: |
Does it occur with 2021.03.0? |
Yes it does. |
Thanks Marvin! 😄 Sorry I may have missed this, what is the reproducer now? Were we able to do this without Napari (like just calling |
Couldn't reproduce the issue with a different/simpler example than the one reported here yet... |
Do we know what |
This line in 38 def load_sync(self) -> None:
39 """Call asarray on our images to load them."""
---> 40 self.image = np.asarray(self.image) Strangely, when rerunning it in the interactive debugger it works fine. |
Yeah |
Though what is interesting is this comes from |
Interestingly, if I run However, if I set a |
Or that might hint towards the state of the dask array being changed at some point within the first failed execution of |
Interesting. Thanks for exploring this! 😄 Are you able to see what the Dask task graph looks like before calling Agree there is something magical going on here that we don't fully comprehend yet |
Btw, I found another hook of some type. This reproduces the problem without %gui qt
from dask.distributed import Client
import napari
from dask_image.imread import imread
client = Client()
import dask.array as da
import numpy as np
xn = np.random.randint(0, 100, (20, 30, 30))
xd = da.from_array(xn, chunks=(1, 5, 5))
def func(block_info=None):
return np.random.randint(0, 100, (1, 5, 5))
xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype) Now, replacing def func(block_info=None): by def func(): makes the error disappear. As if |
In this failing line ipdb> p layers[1]['state']['indices']
[<function func at 0x150d21a60>, None, (<class 'tuple'>, ['block_info']), None, 'block-info-func-b78f8575db855a3a3f1010f0ef59e206', ('.0', '.1', '.2')] |
We did make some changes to how MsgPack is used in PR ( dask/distributed#4531 ), which was merged after the recent release. Curious if that still encounters the issue as well |
Okay I narrowed it down to (without import dask.array as da
import numpy as np
from dask.distributed import Client
import napari
client = Client()
xn = np.random.randint(0, 100, (2, 4, 4))
xd = da.from_array(xn, chunks=(1, 2, 2))
# fails
# def func(block_info=None):
# return np.random.randint(0, 100, (1, 2, 2))
# works
def func():
return np.random.randint(0, 100, (1, 2, 2))
xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)
from dask.core import flatten
keyset = set(flatten(xm.__dask_keys__()))
xm.dask.__dask_distributed_pack__(client, keyset) Same error: ---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-93-163cc014c931> in <module>
21 from dask.core import flatten
22 keyset = set(flatten(xm.__dask_keys__()))
---> 23 xm.dask.__dask_distributed_pack__(client, keyset)
~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys)
942 }
943 )
--> 944 return dumps_msgpack({"layers": layers})
945
946 @staticmethod
~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/distributed/protocol/core.py in dumps_msgpack(msg, compression)
161 """
162 header = {}
--> 163 payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
164
165 fmt, payload = maybe_compress(payload, compression=compression)
~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/msgpack/__init__.py in packb(o, **kwargs)
33 See :class:`Packer` for options.
34 """
---> 35 return Packer(**kwargs).pack(o)
36
37
msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()
TypeError: can not serialize 'function' object |
Well done! 😄 👏 Can you please file this as a new issue on Distributed ( https://github.com/dask/distributed/issues )? |
Wow, this is the kind of sleuthing that one loves to wake up to! 😂 👏 👏 Awesome work @m-albert! I should have remembered and mentioned one more probably-relevant detail, which is that napari turns off dask task fusion when slicing dask arrays. That might account for why it was so hard to reproduce without napari. Sorry, I only just remembered that! |
Yep that was the missing piece 😄 Here's another repro ( dask/distributed#4574 (comment) ). Fusion on things work. Fusion off fails! |
Nice, so something goes wrong when trying to pack unfused graphs. For some reason, choosing a mapping function |
Does PR ( dask/dask#7353 ) solve the issue? We just merged it fwiw |
I tested the changes in dask/dask#7353 against the example snippet posted in dask/distributed#4574 and the issue was resolved (a corresponding test was also added). It'd be great if someone could double check with the original napari workflow |
@jakirkham @jrbourbeau |
Really nice work here @m-albert |
@kpasko made a bug report napari/napari#2304 but it turns out this is a problem caused by
dask-image
. I've copied the contents of the report into this issue (sadly I'm unable to transfer issues between different organisations).What happened:
TypeError: can not serialize 'function' object
In distributed/client.py line 2635
futures = self._graph_to_futures
What you expected to happen:
successful image viewing
Minimal Complete Verifiable Example:
(Edited)
Anything else we need to know?:
Works fine when not initializing client, i.e.
works as expected
Environment:
Napari/Dask version:
dask 2021.2.0 pyhd8ed1ab_0 conda-forge
dask-core 2021.2.0 pyhd8ed1ab_0 conda-forge
dask-image 0.5.0 pyh44b312d_0 conda-forge
distributed 2021.2.0 py39h6e9494a_0 conda-forge
napari 0.4.5 pyhd8ed1ab_0 conda-forge
napari-console 0.0.3 pyhd8ed1ab_0 conda-forge
napari-plugin-engine 0.1.9 py39h6e9494a_1 conda-forge
napari-svg 0.1.4 py_0 conda-forge
Python version:
python 3.9.2 h2502468_0_cpython conda-forge
Operating System: OS X 11.2.1
Install method (conda, pip, source): conda
The text was updated successfully, but these errors were encountered: