Skip to content

Commit

Permalink
Merge pull request #606 from martindurant/fs_multiget
Browse files Browse the repository at this point in the history
use getitems with FSStore for concurrent reading
  • Loading branch information
martindurant authored Sep 28, 2020
2 parents e3cdd1a + 82d33d7 commit 610db34
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 46 deletions.
11 changes: 11 additions & 0 deletions docs/release.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ Next release
documentation.
By :user:`Josh Moore <joshmoore>`; :issue:`571`.

* Added support for generic URL opening by ``fsspec``, where the URLs have the
form "protocol://[server]/path" or can be chained URls with "::" separators.
The additional argument ``storage_options`` is passed to the backend, see
the ``fsspec`` docs.
By :user:`Martin Durant <martindurant>`; :issue:`546`

* Added support for fetching multiple items via ``getitems`` method of a
store, if it exists. This allows for concurrent fetching of data blocks
from stores that implement this; presently HTTP, S3, GCS. Currently only
applies to reading.
By :user:`Martin Durant <martindurant>`; :issue:`606`

.. _release_2.4.0:

Expand Down
3 changes: 2 additions & 1 deletion requirements_dev_optional.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pytest-cov==2.7.1
pytest-doctestplus==0.4.0
pytest-remotedata==0.3.2
h5py==2.10.0
s3fs==0.5.0; python_version > '3.6'
s3fs==0.5.1; python_version > '3.6'
fsspec==0.8.3; python_version > '3.6'
moto>=1.3.14; python_version > '3.6'
flask
130 changes: 85 additions & 45 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,11 +1020,18 @@ def _get_selection(self, indexer, out=None, fields=None):
check_array_shape('out', out, out_shape)

# iterate over chunks
for chunk_coords, chunk_selection, out_selection in indexer:
if not hasattr(self.chunk_store, "getitems"):
# sequentially get one key at a time from storage
for chunk_coords, chunk_selection, out_selection in indexer:

# load chunk selection into output array
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
drop_axes=indexer.drop_axes, fields=fields)
# load chunk selection into output array
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
drop_axes=indexer.drop_axes, fields=fields)
else:
# allow storage to get multiple items at once
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
self._chunk_getitems(lchunk_coords, lchunk_selection, out, lout_selection,
drop_axes=indexer.drop_axes, fields=fields)

if out.shape:
return out
Expand Down Expand Up @@ -1548,6 +1555,52 @@ def _set_selection(self, indexer, value, fields=None):
# put data
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)

def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection):
"""Take binary data from storage and fill output array"""
if (out_is_ndarray and
not fields and
is_contiguous_selection(out_selection) and
is_total_slice(chunk_selection, self._chunks) and
not self._filters and
self._dtype != object):

dest = out[out_selection]
write_direct = (
dest.flags.writeable and
(
(self._order == 'C' and dest.flags.c_contiguous) or
(self._order == 'F' and dest.flags.f_contiguous)
)
)

if write_direct:

# optimization: we want the whole chunk, and the destination is
# contiguous, so we can decompress directly from the chunk
# into the destination array

if self._compressor:
self._compressor.decode(cdata, dest)
else:
chunk = ensure_ndarray(cdata).view(self._dtype)
chunk = chunk.reshape(self._chunks, order=self._order)
np.copyto(dest, chunk)
return

# decode chunk
chunk = self._decode_chunk(cdata)

# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
out[out_selection] = tmp

def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
drop_axes=None, fields=None):
"""Obtain part or whole of a chunk.
Expand All @@ -1568,15 +1621,14 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
TODO
"""

assert len(chunk_coords) == len(self._cdata_shape)

out_is_ndarray = True
try:
out = ensure_ndarray(out)
except TypeError:
out_is_ndarray = False

assert len(chunk_coords) == len(self._cdata_shape)

# obtain key for chunk
ckey = self._chunk_key(chunk_coords)

Expand All @@ -1594,48 +1646,36 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
out[out_selection] = fill_value

else:
self._process_chunk(out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection)

if (out_is_ndarray and
not fields and
is_contiguous_selection(out_selection) and
is_total_slice(chunk_selection, self._chunks) and
not self._filters and
self._dtype != object):

dest = out[out_selection]
write_direct = (
dest.flags.writeable and (
(self._order == 'C' and dest.flags.c_contiguous) or
(self._order == 'F' and dest.flags.f_contiguous)
)
)

if write_direct:
def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
drop_axes=None, fields=None):
"""As _chunk_getitem, but for lists of chunks
# optimization: we want the whole chunk, and the destination is
# contiguous, so we can decompress directly from the chunk
# into the destination array
This gets called where the storage supports ``getitems``, so that
it can decide how to fetch the keys, allowing concurrency.
"""
out_is_ndarray = True
try:
out = ensure_ndarray(out)
except TypeError: # pragma: no cover
out_is_ndarray = False

if self._compressor:
self._compressor.decode(cdata, dest)
ckeys = [self._chunk_key(ch) for ch in lchunk_coords]
cdatas = self.chunk_store.getitems(ckeys)
for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
if ckey in cdatas:
self._process_chunk(out, cdatas[ckey], chunk_select, drop_axes,
out_is_ndarray, fields, out_select)
else:
# check exception type
if self._fill_value is not None:
if fields:
fill_value = self._fill_value[fields]
else:
chunk = ensure_ndarray(cdata).view(self._dtype)
chunk = chunk.reshape(self._chunks, order=self._order)
np.copyto(dest, chunk)
return

# decode chunk
chunk = self._decode_chunk(cdata)

# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
out[out_selection] = tmp
fill_value = self._fill_value
out[out_select] = fill_value

def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):
"""Replace part or whole of a chunk.
Expand Down
4 changes: 4 additions & 0 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,10 @@ def _normalize_key(self, key):
key = '/'.join(bits + [end.replace('.', self.key_separator)])
return key.lower() if self.normalize_keys else key

def getitems(self, keys):
keys = [self._normalize_key(key) for key in keys]
return self.map.getitems(keys, on_error="omit")

def __getitem__(self, key):
key = self._normalize_key(key)
try:
Expand Down
23 changes: 23 additions & 0 deletions zarr/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,29 @@ def test_s3(self):

assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0]

@pytest.mark.usefixtures("s3")
def test_s3_complex(self):
import zarr
g = zarr.open_group("s3://test/out.zarr", mode='w',
storage_options=self.s3so)
expected = np.empty((8, 8, 8), dtype='int64')
expected[:] = -1
a = g.create_dataset("data", shape=(8, 8, 8),
fill_value=-1, chunks=(1, 1, 1))
expected[0] = 0
expected[3] = 3
a[:4] = expected[:4]

a = g.create_dataset("data_f", shape=(8, ), chunks=(1,),
dtype=[('foo', 'S3'), ('bar', 'i4')],
fill_value=(b"b", 1))
a[:4] = (b"aaa", 2)
g = zarr.open_group("s3://test/out.zarr", mode='r',
storage_options=self.s3so)

assert (g.data[:] == expected).all()
assert g.data_f['foo'].tolist() == [b"aaa"] * 4 + [b"b"] * 4


@pytest.fixture()
def s3(request):
Expand Down

0 comments on commit 610db34

Please sign in to comment.