This is a dump of some of the performance experiments. It's part of a larger issue of performance setup and best practices for dask/sgkit and genetic data. The goal is to share the findings and continue the discussion.

Where not otherwise stated, the test machine is a GCE VM, 16 cores and 64GB of memory, 400 SPD. Dask cluster is a single node process based. If the data is read from GCS, the bucket is in the same region as the VM:

The issue with suboptimal saturation was originally reported for this code:

import fsspec
import xarray as xr
from import unpack_variables
from dask.diagnostics import ProgressBar, ResourceProfiler, Profiler

path = "gs://foobar/data.zarr"
store = fsspec.mapping.get_mapper(path, check=False, create=False)
ds = xr.open_zarr(store, concat_characters=False, consolidated=False)
ds = unpack_variables(ds, dtype='float16')

ds["variant_dosage_std"] = ds["call_dosage"].astype("float32").std(dim="samples")
with ProgressBar(), Profiler() as prof, ResourceProfiler() as rprof:
    ds['variant_dosage_std'] = ds['variant_dosage_std'].compute()

With local input, performance graph:

bokeh_plot (1)

It's pretty clear the cores are well saturated. I also measure GIL, GIL was held for 13% of time and waited on for 2.1%, with each worker thread (16 threads) holding it for 0.7% and waiting for 0.1% of time.

For GCS input (via fsspec):

bokeh_plot (2)

GIL summary: GIL was held for 18% of time and waited on for 3.8%, with each worker thread (16 threads) holding it for 0.6% and waiting for 0.2% of time, with one thread holding GIL for 6.5% and waiting 1.6% time.

held: 0.186 (0.191, 0.187, 0.186)
wait: 0.038 (0.046, 0.041, 0.039)
    held: 0.015 (0.029, 0.017, 0.015)
    wait: 0.002 (0.002, 0.002, 0.002)
    held: 0.065 (0.061, 0.064, 0.065)
    wait: 0.016 (0.015, 0.017, 0.016)
    held: 0.0 (0.0, 0.0, 0.0)
    wait: 0.0 (0.0, 0.0, 0.0)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.001)
    held: 0.006 (0.008, 0.007, 0.007)
    wait: 0.002 (0.001, 0.001, 0.002)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.001, 0.001, 0.002)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.001 (0.001, 0.001, 0.001)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.001 (0.001, 0.001, 0.001)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.001, 0.002, 0.002)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.001, 0.002, 0.002)
    held: 0.006 (0.007, 0.007, 0.007)
    wait: 0.002 (0.001, 0.002, 0.002)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
    held: 0.006 (0.006, 0.007, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.001 (0.001, 0.001, 0.001)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.001 (0.002, 0.002, 0.001)
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.001, 0.001, 0.001)
    held: 0.0 (0.0, 0.0, 0.0)
    wait: 0.0 (0.0, 0.0, 0.0)
    held: 0.001 (0.0, 0.001, 0.001)
    wait: 0.001 (0.001, 0.001, 0.001)
    held: 0.002 (0.002, 0.002, 0.002)
    wait: 0.001 (0.001, 0.001, 0.001)
    held: 0.001 (0.001, 0.001, 0.001)
    wait: 0.003 (0.012, 0.004, 0.003)

It's clear that the CPU usage is lower, and not fully saturated, GIL wait time is a bit up (with a concerning spike in one thread). With remote/fsspec input, we have the overhead of data decryption and potential network IO overhead (tho it doesn't seem like we hit network limits).

Some doc for fsspec:

  • most of file object implementation use AbstractBufferedFile which is a buffered implementation. Default block size is 5MiB, and buffer implementation is readahead which is best suited for sequential reads (keeps only a single/current block in memory, and read next block when the control reaches the block boundary)
  • there are 5 buffer implementations in fsspec (doc):
    • BaseCache: no buffering/caching
    • ReadAheadCache : read a block size when the reader crosses the block boundary, keeps one block in memory
    • MMapCache: uses temporary file memory mapped file, which is filled blocks-wise when data is requested
    • BlockCache: data read blocksize at a time, and are stored in an LRU cache
    • BytesCache: read-ahead by the block size, for semi-random reads progressing through the file
  • LocalFileSystem uses local open buffering, usually 4-8KiB block size

This is a partial output of the method profile on the code above (

  %Own   %Total  OwnTime  TotalTime  Function (filename:line)
 40.00%  40.00%    3811s     3811s   npy_floatbits_to_halfbits (numpy/core/
 20.00%  20.00%    2593s     5033s   npy_half_to_float (numpy/core/
  0.00%   0.00%    2269s     2269s   npy_halfbits_to_floatbits (numpy/core/
 20.00%  20.00%    1825s     5854s   HALF_add (numpy/core/
 40.00%  80.00%    1770s     3848s   _aligned_contig_cast_float_to_half (numpy/core/
  0.00%  20.00%    1720s     5337s   HALF_multiply (numpy/core/
 20.00%  20.00%    1393s     1413s   PyArray_Where (numpy/core/
 20.00%  20.00%   847.4s     1521s   _aligned_contig_cast_half_to_float (numpy/core/
  0.00%   0.00%   844.4s    844.4s   0x7fe616f5fdf0 (numpy/core/
  0.00%   0.00%   757.2s    757.2s   BOOL_logical_or (numpy/core/
 20.00%  20.00%   752.4s    752.4s   DOUBLE_subtract (numpy/core/
  0.00%   0.00%   687.0s    687.0s   _aligned_contig_cast_ubyte_to_float (numpy/core/

Unsurprising we get numpy methods, what is interesting and can't be seen here, are the sporadic spikes in CPU usage, looking at the network IO:

11:42:17        IFACE   rxpck/s   txpck/s    rxkB/s    txkB/s   rxcmp/s   txcmp/s  rxmcst/s   %ifutil
11:42:18         ens4     64.00     64.00     11.58     20.74      0.00      0.00      0.00      0.00
11:42:19         ens4    397.00     70.00   4581.27     14.38      0.00      0.00      0.00      0.00
11:42:20         ens4    146.00     67.00   1501.01     15.61      0.00      0.00      0.00      0.00
11:42:21         ens4    276.00    103.00   2982.42     27.17      0.00      0.00      0.00      0.00
11:42:22         ens4    235.00     91.00   4396.02     19.31      0.00      0.00      0.00      0.00
11:42:23         ens4     43.00     43.00      6.50     12.35      0.00      0.00      0.00      0.00

There are also spikes of network IO, which could suggest that we should look into smoothing out the network buffering. This reminded me about some stats I once stumbled upon in Beam codebase (here):

# This is the size of each partial-file read operation from GCS.  This
# parameter was chosen to give good throughput while keeping memory usage at
# a reasonable level; the following table shows throughput reached when
# reading files of a given size with a chosen buffer size and informed the
# choice of the value, as of 11/2016:
# +---------------+------------+-------------+-------------+-------------+
# |               | 50 MB file | 100 MB file | 200 MB file | 400 MB file |
# +---------------+------------+-------------+-------------+-------------+
# | 8 MB buffer   | 17.12 MB/s | 22.67 MB/s  | 23.81 MB/s  | 26.05 MB/s  |
# | 16 MB buffer  | 24.21 MB/s | 42.70 MB/s  | 42.89 MB/s  | 46.92 MB/s  |
# | 32 MB buffer  | 28.53 MB/s | 48.08 MB/s  | 54.30 MB/s  | 54.65 MB/s  |
# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s  | 79.13 MB/s  | 85.39 MB/s  |
# +---------------+------------+-------------+-------------+-------------+
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024

So Beam is using 16MiB read buffer. But it's also using Google's storage API client (python-storage), whilst fsspec calls GCS API directly via fsspec implementation of GCS gcsfs and there definitely are implementation differences between those two and thus the consequences of buffer size might be different.

In the original issue: I have tried fsspec block size of 16MiB, but the results look very similar to the 5MiB block size, which was somewhat suspicious, and turns out that the zarr data ("gs://foobar/data.zarr") is saved as a large number of small files, for example for the call_genotype_probability, there are 20570 chunks/objects with average object size of about 2MiB. This might be suboptimal for sequential reads.

As a general rule, when you download large objects within a Region from Amazon S3 to Amazon EC2, we suggest making concurrent requests for byte ranges of an object at the granularity of 8–16 MB.

From S3 best practices.

The Pangeo project has found that chunk sizes in the range of 10–200MB work well with Dask reading from Zarr data in cloud storage

From cloud performant reading of netcdf4 hdf5 data using the zarr library.

Interesting/related issue "File Chunk Store" zarr-python#556.

To validate the impact of the chunk size, let's rechunked the call_genotype arrays, specifically call_genotype_probability array from 20570 chunks (chunk size: (5216, 5792, 2), avg. GCS object size 2MiB, average in memory chunk size: 230MiB) to 2349 chunks (chunk size: (15648, 17376, 2), avg. GCS object size 17MiB, average in memory chunk size: 2GiB):

Performance graph:

bokeh_plot (3)

There is a cost to it tho, visible in the memory usage, notice that compressed chunk of 17MiB translates to 2GiB in memory (vs 2MiB -> 230MiB), this bumps the memory usage roughly from 6GB to 40GB.

We could look into investigating the cost of reading zarr data in smaller chunks (then it was written as). This is a profile of the computation with zarr chunks (15648, 17376, 2) read as (5216, 5792, 2) via xarray.open_zarr:

bokeh_plot (4)


  • memory usage is much less, akin to the usage with zarr chunks at (5216, 5792, 2), which is expected
  • the whole computation takes much longer about 1.7h vs 1.3h, which likely due to the extra cost of rereading zarr chunks to split into smaller chunks for xarray
  • when zarr reads subset of a chunk, it needs to fetch full compressed chunk, decompress and then retrieve part of it, see below:

Say we have a random float zarr array shape (10_000, 100_000) saved on local disk, with chunk shape (10_000, 10_000), thus 10 chunks of size ~660MiB on disk (compressed).

We retrieve a single element:

zs ="/tmp/zarr", mode="r")
print([0, 0])

Zarr will:

  • read full compressed ~660M into memory
  • decompress the full chunk, whilst keeping the compressed chunk on the side (thus memory usage is right now full compressed chunk + decompressed data), in case of random floats ~2x (1.4G)
  • then slicing happens to select the 1st element of the full decompressed data
  • 1st element is returned and memory (in this case 1.4G) is freed up

So overall the lesson from this exercise is that - slicing zarr in a way that doesn't align with the native chunks is inefficient as a common step in pipelines.

Zarr issues for partial chunk reads:


some compressors also support partial decompression which would allow for extracting out part of a compressed chunk (e.g. the blosc_getitem method in Blosc)

  • it sounds like even with partial decompress, we will need to fetch full compressed file

Some experiments that incorporate xarray + zarr + dask:


  • dask uses slicing to fetch data from underlying "array"
  • zarr won't use async chunk fetching if dask slice/chunk aligns with zarr chunk
  • we can force zarr async chunk fetching if dask chunking multiplies zarr chunk shape
  • if dask chunking doesn't align with zarr chunking, performance degrades

Long story

Say we have a dataset:

fsmap = fsspec.get_mapper("/tmp/zarr")

ar = np.random.random((10_000, 100_000))
xar = xr.Dataset(data_vars={"foo": (("x", "y"), ar)})["chunks"] = (10_000, 10_000)


This gives us a zarr store with one 2d array stored as 10 chunks (10_000, 10_000). Now, say we read this back via:

xar_back = xr.open_zarr(fsmap)

Dask graph looks like this:

Screenshot 2020-11-23 at 13 01 02

and an example of one zarr task definition:

('zarr-9c3a2bfde447a3cde69b39f8c28181f9', 0, 0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   (slice(0, 10000, None), slice(0, 10000, None)))


  • we get 10 tasks
  • 1 for each zarr chunk
  • each task reads data via slice (that perfectly fits the chunk shape)
  • this also means this graph does not leverage async multiget from zarr, since there is just one chunk to fetch per dask task

The question now is - how can we force async zarr multiget via dask.

  1. If we follow up xr.open_zarr(fsmap) with a rechunk, like this:
xar_back = xr.open_zarr(fsmap).chunk({"y": 20_000, "x": 10_000})

This won't work:
Screenshot 2020-11-23 at 13 09 34


  • we still read each chunk
  • we do rechunking at later stage (which is even worse)
  1. But if we specify chunk at read time, like this:
xar_back = xr.open_zarr(fsmap, chunks={"y": 20_000, "x": 10_000})

we get:

Screenshot 2020-11-23 at 13 11 37

and an example of one zarr task definition:

('zarr-351e7d43530c734ce1fd649b79f9fc59', 0,0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   (slice(0, 10000, None), slice(0, 20000, None)))


  • notice the slice definition on y is 20000 (which is double the chunk size on that dimension)
  • from previous comments, this will result in zarr potentially fetching 2 chunks asynchronously

Another interesting question is: how would dask react to chunking that doesn't align with zarr native chunks:

xar_back = xr.open_zarr(fsmap, chunks={"y": 12_500, "x": 10_000})

Notice that y now has 2_500 more elements than the chunk, so based on the comments above, dask will likely use slicing to request chunks, which in turn would delegate that to zarr slicing, and zarr would have to read 2 chunks for each dask task.

When we run the code above, xarray tells us:

/Users/rav/miniconda3/envs/tr-dev/lib/python3.8/site-packages/xarray/backends/ UserWarning: Specified Dask chunks 12500 would separate Zarr chunk shape 10000 for dimension 'y'. This significantly degrades performance. Consider rechunking after loading instead. chunk_spec = get_chunk(name, var, chunks)

Let's see the graph:

Screenshot 2020-11-23 at 13 21 27

and an example of two zarr task definitions:

('zarr-c5cbd628b3db23f6ff80870f5d0d52ec', 0, 0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   (slice(0, 10000, None), slice(0, 12500, None))),

  ('zarr-c5cbd628b3db23f6ff80870f5d0d52ec', 0, 1):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   (slice(0, 10000, None), slice(12500, 25000, None)))


  • looking at the graph doesn't tell us much, we just see 8 tasks (8 because 10_000/12_500 = 8)
  • but looking at the zarr task definition we can see the slice definitions, and how they span across chunks
  • based on the comments above, dask tasks would use zarr to slice the data, and based on the slices zarr would need to fetch multiple chunks, in some cases to read just small parts, thus performance degrades

Experiments with simplified computation and random data.

We generate reasonably large float array 1e5 x 1e5 and save it in large chunks (400MB) and small chunks (4MB). Then run 3 experiments (many times 30 - 100):

  1. read the large chunks using native zarr chunks, this will read one 400MB chunk per task (aka 400/400/4)
  2. read the small chunks using native zarr chunks, this will read one 4MB chunk per task (aka 4/4/4)
  3. read the small chunks using large zarr chunks, this will read 100 small chunks concurrently (using zarr -> fsspec getitems) which accumulate to 400MB per task (aka 4/400/4)

In the (X/Y/Z) notation, X stands for chunk size at rest time (storage), Y is read chunk size (the size zarr will use to read data from storage, at this point the data is already in memory), Z stands for chunk size used for computation.

In each experiment we multiple the array by a scalar, and use 4MB chunking for this computation (this is to imitate that our problem requires certain chunk size that doesn't necessarily correspond to the zarr chunk size). We run the computations many times and measure time/memory/network. All data is stored on GCS (in the same region as the VM).

Dask cluster setup:

We are using distributed process pool based cluster on a single 16 core VM, with 16 workers/processes with 1 thread each. This is to avoid any GIL issues, and since the computation is embarrassingly parallel, unless there is some job stilling happening there is very little communication between workers. We run 2 rounds of computation to warm up the cluster. I have also tried a thread pool based cluster but was seeing GIL impact.


Storage chunks IO chunks Computation chunks Min time Mean ± stdev Mean GCS BW
400MB 400MB 4MB 37.06 47.68 ± 17.35 774.45 MB/s
4MB 4MB 4MB 112.83 126.8 ± 10.19 287.62 MB/s
4MB 400MB 4MB 52.89 62.05 ± 15.13 572.71 MB/s


  • Mean GCS BW (bandwidth) is over the period of all 30 repeats of each experiment
  • peak GCS BW was ~4GB/s

newplot (44)

And next a macro (much less accurate than the data used for the network histogram above) view of the CPU/IO (Network)/Memory usage for the cases (400/400/4) and (4/400/4). First we perform many 400/400/4 from 3pm until around 4:30, and then switch to 4/400/4 until the end.


  • this is obviously an IO bound task
  • 400/400/4 has best network bandwidth and fastest computation
  • it arguable makes more sense to look at the min than average time for these experiment
  • 4/4/4 will have lowest memory footprint since it reads the least amount of data into memory

Conclusion: this validates that larger chunks result in better throughput even when compared with concurrent read of many small chunks. But reading large chunks (or many small chunks) comes at the cost of memory (as mentioned in the previous comments).


Code: generate the data
def create_data(path, chunks):
    fs_map = gcs.get_mapper(path)

    ar_dsk = da.random.random(size=(100_000, 100_000), chunks=chunks).astype(np.float32)
    ar_xr = xr.Dataset(data_vars=(dict(foo=(("x", "y"), ar_dsk))))

    with ProgressBar(), ResourceProfiler() as rprof:
        ar_xr.to_zarr(fs_map, mode="w")
Code: computations
def large_chunks(path):
    # 16MB block size
    gcs = gcsfs.GCSFileSystem(block_size=16 * 1024 * 1024)

    fs_map = gcs.get_mapper(path)

    # use zarr chunking
    ar_xr = xr.open_zarr(fs_map)
    # rechunk/split to smaller chunks
    ar_xr["foo"] =, y=1_000))

    # Simple computation on part of the matrix
    dat = ( * 2.0).data.persist()
    del dat

def small_chunks(path):
    # default block size of 5MB
    gcs = gcsfs.GCSFileSystem()

    fs_map = gcs.get_mapper(path)

    # read in many zarr chunks at the same time
    ar_xr = xr.open_zarr(fs_map, chunks=dict(x=10_000, y=10_000))
    # rechunk/split to smaller chunks
    ar_xr["foo"] =, y=1_000))

    # Simple computation on part of the matrix
    dat = ( * 2.0).data.persist()
    del dat

def native_chunks(path):
    # default block size of 5MB
    gcs = gcsfs.GCSFileSystem()

    fs_map = gcs.get_mapper(path)

    # use zarr chunking
    ar_xr = xr.open_zarr(fs_map)

    # Simple computation on part of the matrix
    dat = ( * 2.0).data.persist()
    del dat

  • reads 1GiB file in sequential reads
  • both VM and GCS bucket are in EU
  • using gcsfs 0.7.1 as the GCS client
    newplot (22)
    newplot (23)

  • when dask.array fetches data it uses slicing on the underlying array implementation
  • fsspec has async "multi_get", but to see the benefits in the dask context, the task must slice across multiple FS objects
  • reading zarr data in chunks smaller than the native/storage chunk is suboptimal
  • genetics data is likely going to compress extremely well, resulting in small objects on storage, but large dense arrays in memory
  • large number of small chunks results in large number of dask tasks

Another discussion that came up is the memory impact on the performance, given that larger chunk will require more memory. afaiu by default in a dask cluster:

  • dask will use as many workers (W) as available nodes (1 worker per node)
  • dask worker uses as many "worker threads" (WT) as CPUs
  • each worker gets access to memory (M) available for all worker threads (WT)
  • so each WT gets about ~M/WT memory

In GCP, default VMs are divided into classes of machines like “standard” or “highmem”, in each class the memory to CPU ratio is constant, for standard class it's 4GB/CPU and for highmem it's 8GB/CPU. So in theory increasing the size of the VM within a class doesn’t affect the M/WT ratio. In the context of Spark or MR/YARN, a user would often set the executor/container memory and cores request per pipeline (each pipeline having different requirements), in the context of Dask the same thing can be achieved, but is arguably less explicit. There are worker resources and more recently layer annotations dask/dask#6701, but we can also use the cluster configuration to manipulate the M/WT ratio, by setting the number of WT (+ making sure the worker gets full memory, and potentially adjusting the number of thread available for numpy ops).

A lot to digest here, thanks for the great work @ravwojdyla!

And a way to connect these with #390, I personally would be very curious to see the #390 GWAS benchmark with:

Based on the performance tests done above, here are some high level guidelines for dask performance experiments (this is a starting point, we might find a better home for this later, and potentially have someone from Dask review them):

  • disable spilling during performance tuning (for example: when you search for the right chunking scheme, spilling involves serde and IO, both are expensive, it's better to immediately fail spilling job since the chunking and/or cluster spec is likely suboptimal)
def get_dask_cluster(n_workers=1, threads_per_worker=None):
    dk.config.set({"distributed.worker.memory.terminate": False})
    workers_kwargs = {"memory_target_fraction": False,
                      "memory_spill_fraction": False,
                      "memory_pause_fraction": .9}
    return Client(n_workers=n_workers,
  • use distributed cluster (and if there is a single worker and you perform non-numpy operations measure GIL, see gil_load, if you want to measure cluster communication overhead, you need more workers)
  • capture performance report, see diagnostics distributed
  • if you need high granularity of VM status use atop to record stats, remember to install netatop to capture network stats
  • if you read data from GCS, make sure your VM is in the same region as the data bucket


  • add VM specs

