Skip to content

async in zarr  #536

Closed
Closed
@rabernat

Description

@rabernat

I think there are some places where zarr would benefit immensely from some async capabilities when reading and writing data. I will try to illustrate this with the simplest example I can.

Let's consider a zarr array stored in a public S3 bucket, which we can read with fsspec's HTTPFileSystem interface (no special S3 API needed, just regular http calls).

import fsspec
url_base = 'https://mur-sst.s3.us-west-2.amazonaws.com/zarr/time'
mapper = fsspec.get_mapper(url_base)
za = zarr.open(mapper)
za.info

image

Note that this is a highly sub-optimal choice of chunks. The 1D array of shape (6443,) is stored in chunks of only (5,) items, resulting in over 1000 tiny chunks. Reading this data takes forever, over 5 minutes

%prun tdata = za[:]
         20312192 function calls (20310903 primitive calls) in 342.624 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1289  139.941    0.109  140.077    0.109 {built-in method _openssl.SSL_do_handshake}
     2578   99.914    0.039   99.914    0.039 {built-in method _openssl.SSL_read}
     1289   68.375    0.053   68.375    0.053 {method 'connect' of '_socket.socket' objects}
     1289    9.252    0.007    9.252    0.007 {built-in method _openssl.SSL_CTX_load_verify_locations}
     1289    7.857    0.006    7.868    0.006 {built-in method _socket.getaddrinfo}
     1289    1.619    0.001    1.828    0.001 connectionpool.py:455(close)
   930658    0.980    0.000    2.103    0.000 os.py:674(__getitem__)
...

I believe fsspec is introducing some major overhead by not reusing a connectionpool. But regardless, zarr is iterating synchronously over each chunk to load the data:

zarr-python/zarr/core.py

Lines 1023 to 1028 in 994f244

# iterate over chunks
for chunk_coords, chunk_selection, out_selection in indexer:
# load chunk selection into output array
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
drop_axes=indexer.drop_axes, fields=fields)

As a lower bound on how fast this approach could be, we bypass zarr and fsspec and just fetch all the chunks with requests:

import requests
s = requests.Session()

def get_chunk_http(n):
    r = s.get(url_base + f'/{n}')
    r.raise_for_status()
    return r.content

%prun all_data = [get_chunk_http(n) for n in range(za.shape[0] // za.chunks[0])] 
         12550435 function calls (12549147 primitive calls) in 98.508 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     2576   87.798    0.034   87.798    0.034 {built-in method _openssl.SSL_read}
       13    1.436    0.110    1.437    0.111 {built-in method _openssl.SSL_do_handshake}
   929936    1.042    0.000    2.224    0.000 os.py:674(__getitem__)

As expected, reusing a connection pool sped things up, but it still takes 100 s to read the array.

Finally, we can try the same thing with asyncio

import asyncio
import aiohttp
import time

async def get_chunk_http_async(n, session):
    url = url_base + f'/{n}'
    async with session.get(url) as r:
        r.raise_for_status()
        data = await r.read()
    return data

async with aiohttp.ClientSession() as session:
    tic = time.time()
    all_data = await asyncio.gather(*[get_chunk_http_async(n, session)
                                    for n in range(za.shape[0] // za.chunks[0])])
    print(f"{time.time() - tic} seconds")

# > 1.7969944477081299 seconds

This is a MAJOR speedup!

I am aware that using dask could possibly help me here. But I don't have big data here, and I don't want to use dask. I want zarr to support asyncio natively.

I am quite new to async programming and have no idea how hard / complicated it would be to do this. But based on this experiment, I am quite sure there are major performance benefits to be had, particularly when using zarr with remote storage protocols.

Thoughts?

cc @cgentemann

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions