Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

imread - investigate possible performance improvement #161

Closed
GenevieveBuckley opened this issue Oct 20, 2020 · 16 comments
Closed

imread - investigate possible performance improvement #161

GenevieveBuckley opened this issue Oct 20, 2020 · 16 comments
Labels
enhancement New feature or request

Comments

@GenevieveBuckley
Copy link
Collaborator

It's been found that the performance of da.map_blocks is much better than da.stack when joining large arrays: dask/dask#5913

It's unclear if da.concatenate (like we use in imread) is also slower, but this seems likely. We should investigate if we can get a performance benefit by switching to da.map_blocks.

@GenevieveBuckley
Copy link
Collaborator Author

dask/dask#5913 has several comments describing how to accurately profile speedups

@GenevieveBuckley
Copy link
Collaborator Author

One potential blocker: jni thinks the reason da.stack is slow is because it allows the flexibility of unequal chunk sizes. I can't remember, but I don't think we support folders full of differently sized images? If I'm wrong about that it might mean we don't want to give up that functionality for some speed improvement in other cases.

@jni says here:

For reference, a couple of usage examples of map_blocks:

https://github.com/jni/napariboard-proto/blob/4b00303e567328270e02bd7e977cd2656040f8ea/napariboard.py#L70-L107

and

https://github.com/jni/napari-demos/blob/91b5e8ab03852fc93fe6d74d81fed2d39dbdda3b/rootomics.py#L20-L54

@GenevieveBuckley it would be good to know whether da.concatenate, used in dask_image.imread, suffers from the same performance issues as da.stack. My suspicion is that it does, because if I understand correctly, the performance issue comes from da.stack allowing unequal chunk sizes along an axis.

@GenevieveBuckley GenevieveBuckley added the enhancement New feature or request label Oct 20, 2020
@jakirkham
Copy link
Member

One thing worth noting (and I may have mentioned this to Juan as well) is map_blocks will fill in certain special keywords of the user-defined function if the function takes them. One of these is block_info (need to scroll down a bit). This would help provide positioning info like where in the larger Dask Array this is, how big this chunk should be, etc. May be helpful when figuring out what data should be filled in for a specific chunk.

@m-albert
Copy link
Collaborator

Hi guys, just saw this here and remembered that I did stumble upon bad performance of dask.image.array.stack in the past when creating large arrays.

We should investigate if we can get a performance benefit by switching to da.map_blocks.

So I added a da.map_blocks version of dask_image.imread.imread and compared it to the current implementation using da.concatenate.

map_blocks implementation:

import itertools
import numbers
import warnings

import dask
import dask.array
import dask.delayed
import numpy
import pims

from dask_image.imread import _utils

def imread_mb(fname, nframes=1, *, arraytype="numpy"):
    """
    Read image data into a Dask Array.

    Provides a simple, fast mechanism to ingest image data into a
    Dask Array.

    Parameters
    ----------
    fname : str
        A glob like string that may match one or multiple filenames.
    nframes : int, optional
        Number of the frames to include in each chunk (default: 1).
    arraytype : str, optional
        Array type for dask chunks. Available options: "numpy", "cupy".

    Returns
    -------
    array : dask.array.Array
        A Dask Array representing the contents of all image files.
    """

    if not isinstance(nframes, numbers.Integral):
        raise ValueError("`nframes` must be an integer.")
    if (nframes != -1) and not (nframes > 0):
        raise ValueError("`nframes` must be greater than zero.")

    if arraytype == "numpy":
        arrayfunc = numpy.asanyarray
    elif arraytype == "cupy":   # pragma: no cover
        import cupy
        arrayfunc = cupy.asanyarray

    with pims.open(fname) as imgs:
        shape = (len(imgs),) + imgs.frame_shape
        dtype = numpy.dtype(imgs.pixel_type)

    if nframes == -1:
        nframes = shape[0]

    if nframes > shape[0]:
        warnings.warn(
            "`nframes` larger than number of frames in file."
            " Will truncate to number of frames in file.",
            RuntimeWarning
        )
    elif shape[0] % nframes != 0:
        warnings.warn(
            "`nframes` does not nicely divide number of frames in file."
            " Last chunk will contain the remainder.",
            RuntimeWarning
        )

    lower_iter, upper_iter = itertools.tee(itertools.chain(
        range(0, shape[0], nframes),
        [shape[0]]
    ))
    next(upper_iter)
    
#     a = []
#     for i, j in zip(lower_iter, upper_iter):
#         print(i, j)
#         a.append(dask.array.from_delayed(
#             dask.delayed(_utils._read_frame)(fname, slice(i, j),
#                                              arrayfunc=arrayfunc),
#             (j - i,) + shape[1:],
#             dtype,
#             meta=arrayfunc([])
#         ))
#     a = dask.array.concatenate(a)

    def func(fname, arrayfunc, block_info=None):
        i, j = block_info[None]['array-location'][0]
        return _utils._read_frame(fname, slice(i, j), arrayfunc=arrayfunc)
        
    from dask.array.core import normalize_chunks
    a = dask.array.map_blocks(
        func,
        chunks=normalize_chunks((nframes, ) + shape[1:], shape),
        fname=fname,
        arrayfunc=arrayfunc,
        meta=arrayfunc([]),
    )

    return a

Comparison:

# write some dummy data
import tifffile
import numpy as np
for t in range(10000):
    tmpim = np.random.randint(0,1000, [2, 2]).astype(np.uint16)
    tifffile.imsave('data/im_t%03d.tif' %t, tmpim)
from dask_image import imread
%timeit imread.imread('data/im_*.tif')

2.96 s ± 46.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

from dask_image import imread
%timeit imread_mb('data/im_*.tif')

150 ms ± 1.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

So there's a big performance difference!

Also, indexing the resulting array is faster in the map_blocks version:

im = imread.imread('data/im_*.tif')
im_mb = imread_mb('data/im_*.tif')

def iterate_through_first_axis(im):
    for i in range(im.shape[0]):
        im[i]
    return
%timeit iterate_through_first_axis(im)

11.4 s ± 199 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit iterate_through_first_axis(im_mb)

1.35 s ± 22.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

So it seems that map_blocks is the way to go for putting together large images. Happy to open a PR.

@GenevieveBuckley
Copy link
Collaborator Author

Related discussion: #121

@GenevieveBuckley
Copy link
Collaborator Author

Wow @m-albert those are some really significant speedups! A pull request would be welcome.

FYI: the tests for imread are currently failing, which will block merging new imread PRs until that is fixed. You can still make the pull request of course, just be aware that this might slow us down a bit.

@GenevieveBuckley
Copy link
Collaborator Author

I've just used @m-albert's code above to run a similar comparison, with two slight changes:

  1. I've timed both task graph construction, and also dask compute()
  2. I've changed the dummy data dimensions to be a little more representative of real life datasets. Generally I see images with ~2000 pixels in x and y, and would expect folders to contain hundreds to thousands of images (but perhaps not tens of thousands).
# write some dummy data
import tifffile
import numpy as np
for t in range(200):
    tmpim = np.random.randint(0,1000, [2000, 2000]).astype(np.uint16)
    tifffile.imsave('data/im_t%03d.tif' %t, tmpim)

Benchmark results

Baseline (current imread implemenetation)

In [16]: %timeit imread.imread('data/im_*.tif')
122 ms ± 13.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [17]: imgs = imread.imread('data/im_*.tif')
In [18]: %timeit imgs.compute()
9.16 s ± 1.3 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

Proposed change (map_blocks imread implementation)

In [19]: %timeit imread_mb('data/im_*.tif')
14.5 ms ± 1.03 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [20]: imgs_mb = imread_mb('data/im_*.tif')
In [21]: %timeit imgs_mb.compute()
8.87 s ± 774 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Summary

Task graph construction is faster when we use map_blocks. There's not much difference between the two implementations for how long it takes to call .compute().

@m-albert
Copy link
Collaborator

m-albert commented Oct 21, 2020

@GenevieveBuckley Yes, as you say graph construction is faster using map_blocks and also the resulting graph seems to have a cleaner structure:

[len(ar.dask.dependencies.keys()) for ar in [im, im_mb]]

[20001, 2]

That might explain why not only array creation but also tasks like indexing are faster as shown above (indexing is also the test case used in the issue you linked dask/dask#5913).

@cgohlke
Copy link

cgohlke commented Oct 21, 2020

It might be worth to comparing performance to tifffile.imread():

In [1]: import tifffile

In [2]: %timeit tifffile.imread('data/im_*.tif')
1.61 s ± 22.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [3]: %timeit tifffile.imread('data/im_*.tif', ioworkers=16)
442 ms ± 7.39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [4]: from dask_image import imread

In [5]: %timeit imread.imread('data/im_*.tif').compute()
2.6 s ± 9.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [7]: %timeit imread_mb('data/im_*.tif').compute()
2.55 s ± 17.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@m-albert
Copy link
Collaborator

m-albert commented Oct 21, 2020

@cgohlke cool, tifffile is much faster than pims for these tif images!

Replacing dask_image.imread._utils.read_frame by sth like

import tifffile, glob
def _read_frame_tifffile(fn, i, *, arrayfunc=numpy.asanyarray):
    fns = glob.glob(fn)
    fns = sorted(fns)
    x = tifffile.imread(fns[i])
    if i.stop - i.start == 1:
        x = x[None, ...]
    return arrayfunc(x)

leads to faster reading than with pims:

%timeit imread_mb_tifffile('data/im_*.tif').compute(scheduler='threads')

1.62 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit imread_mb('data3/im_*.tif').compute(scheduler='threads')

2.46 s ± 53 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Interestingly, despite threading dask doesn't improve upon single-threaded tifffile.imread:

%timeit tifffile.imread('data/im_*.tif')

1.26 s ± 33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In any case I guess dask doesn't aim to achieve the same performance here but rather to efficiently provide chunks of large images in a lazy manner. Also, in my understanding it doesn't make sense to try to use multithreaded tifffile.imread in this context because of the risk of oversubscribing threads.

So I guess this issue is about minimising specifically the overhead due to task graph manipulations.

@cgohlke
Copy link

cgohlke commented Oct 21, 2020

In any case I guess dask doesn't aim to achieve the same performance here but rather to efficiently provide chunks of large images in a lazy manner. Also, in my understanding it doesn't make sense to try to use multithreaded tifffile.imread in this context because of the risk of oversubscribing threads.

So I guess this issue is about minimising specifically the overhead due to task graph manipulations.

Sure. It was just meant as a baseline.

If you are concerned about oversubscribing threads, it's probably better to disable multi-threading in tifffile with tifffile.imread(..., maxworkers=1).

I guess dask_image performance should be comparable to this (requires latest version of tifffile):

import tifffile
import zarr
import dask.array

def imread(filenames):
    with tifffile.imread(filenames, aszarr=True) as storage:
        return dask.array.from_zarr(storage)

%timeit imread('data/im_*.tif').compute()
1.52 s ± 12.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@jakirkham
Copy link
Member

Do we have a sense of why stack is slow?

@m-albert
Copy link
Collaborator

m-albert commented Oct 21, 2020

If you are concerned about oversubscribing threads, it's probably better to disable multi-threading in tifffile with tifffile.imread(..., maxworkers=1).

@cgohlke that makes sense.

Do we have a sense of why stack is slow?

@jakirkham I suspect it might have to do with the use of symbolic mappings (dask.blockwise.Blockwise) in the case of da.map_blocks as opposed to da.stack or da.concatenate, which leads to a much lighter dask graph.

[len(ar.dask.dependencies.keys()) for ar in [im, im_mb]]
[len(ar.dask.layers.keys()) for ar in [im, im_mb]]
[sys.getsizeof(ar.dask.layers) for ar in [im, im_mb]]

[401, 2]
[401, 2]
[18520, 232]

E.g. an entire layer in the da.map_blocks version is given by just this expression:

im_mb.dask.layers['func-019ae6ced5c051d76aced6041c13072a']
Blockwise<((<function imread_mb.<locals>.func at 0x10e66ef70>, None), ((<class 'tuple'>, ['block_info']), None), ('block-info-func-019ae6ced5c051d76aced6041c13072a', ('.0', '.1', '.2'))) -> func-019ae6ced5c051d76aced6041c13072a>

See also the docstring of dask.blockwise.Blockwise:

class Blockwise(Layer):
    """Tensor Operation

    This is a lazily constructed mapping for tensor operation graphs.
    This defines a dictionary using an operation and an indexing pattern.
    It is built for many operations like elementwise, transpose, tensordot, and
    so on.  We choose to keep these as symbolic mappings rather than raw
    dictionaries because we are able to fuse them during optimization,
    sometimes resulting in much lower overhead.
    """

@jakirkham
Copy link
Member

Maybe it's worth profiling? stack and concatenate are such common operations that it would be valuable for them to perform well. Plus if we fix them, we not only fix this use case, but also other use cases 😉

@m-albert
Copy link
Collaborator

@jakirkham True, improving the performance of stack and concatenate would probably be very useful.

At least regarding the arrays they produce I found that calling dask.optimize on the output recovers the fast indexing (see dask/dask#5913 (comment)).

@GenevieveBuckley
Copy link
Collaborator Author

Closed by #165

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants