Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fs multiget #606

Merged
merged 11 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements_dev_optional.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pytest-cov==2.7.1
pytest-doctestplus==0.4.0
pytest-remotedata==0.3.2
h5py==2.10.0
s3fs==0.5.0; python_version > '3.6'
s3fs==0.5.1; python_version > '3.6'
git+https://github.com/intake/filesystem_spec; python_version > '3.6'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a package on PyPI or conda-forge that we could install?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, no.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a rough sense when these might be created?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When everything is good here, I can be sure to make a release before merging, so that this line can be reverted

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good @martindurant.

moto>=1.3.14; python_version > '3.6'
flask
130 changes: 85 additions & 45 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,11 +1021,18 @@ def _get_selection(self, indexer, out=None, fields=None):
check_array_shape('out', out, out_shape)

# iterate over chunks
for chunk_coords, chunk_selection, out_selection in indexer:
if not hasattr(self.store, "getitems"):
# sequentially get one key at a time from storage
for chunk_coords, chunk_selection, out_selection in indexer:

# load chunk selection into output array
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
drop_axes=indexer.drop_axes, fields=fields)
# load chunk selection into output array
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
drop_axes=indexer.drop_axes, fields=fields)
else:
# allow storage to get multiple items at once
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
self._chunk_getitems(lchunk_coords, lchunk_selection, out, lout_selection,
drop_axes=indexer.drop_axes, fields=fields)

if out.shape:
return out
Expand Down Expand Up @@ -1549,6 +1556,52 @@ def _set_selection(self, indexer, value, fields=None):
# put data
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)

def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection):
"""Take binary data from storage and fill output array"""
if (out_is_ndarray and
not fields and
is_contiguous_selection(out_selection) and
is_total_slice(chunk_selection, self._chunks) and
not self._filters and
self._dtype != object):

dest = out[out_selection]
write_direct = (
dest.flags.writeable and
(
(self._order == 'C' and dest.flags.c_contiguous) or
(self._order == 'F' and dest.flags.f_contiguous)
)
)

if write_direct:

# optimization: we want the whole chunk, and the destination is
# contiguous, so we can decompress directly from the chunk
# into the destination array

if self._compressor:
self._compressor.decode(cdata, dest)
else:
chunk = ensure_ndarray(cdata).view(self._dtype)
chunk = chunk.reshape(self._chunks, order=self._order)
np.copyto(dest, chunk)
return

# decode chunk
chunk = self._decode_chunk(cdata)

# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
out[out_selection] = tmp

def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
drop_axes=None, fields=None):
"""Obtain part or whole of a chunk.
Expand All @@ -1569,15 +1622,14 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
TODO

"""

assert len(chunk_coords) == len(self._cdata_shape)

out_is_ndarray = True
try:
out = ensure_ndarray(out)
except TypeError:
out_is_ndarray = False

assert len(chunk_coords) == len(self._cdata_shape)

# obtain key for chunk
ckey = self._chunk_key(chunk_coords)

Expand All @@ -1595,48 +1647,36 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
out[out_selection] = fill_value

else:
self._process_chunk(out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection)

if (out_is_ndarray and
not fields and
is_contiguous_selection(out_selection) and
is_total_slice(chunk_selection, self._chunks) and
not self._filters and
self._dtype != object):

dest = out[out_selection]
write_direct = (
dest.flags.writeable and (
(self._order == 'C' and dest.flags.c_contiguous) or
(self._order == 'F' and dest.flags.f_contiguous)
)
)

if write_direct:
def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
drop_axes=None, fields=None):
"""As _chunk_getitem, but for lists of chunks

# optimization: we want the whole chunk, and the destination is
# contiguous, so we can decompress directly from the chunk
# into the destination array
This gets called where the storage supports ``getitems``, so that
it can decide how to fetch the keys, allowing concurrency.
"""
out_is_ndarray = True
try:
out = ensure_ndarray(out)
except TypeError: # pragma: no cover
out_is_ndarray = False

if self._compressor:
self._compressor.decode(cdata, dest)
ckeys = [self._chunk_key(ch) for ch in lchunk_coords]
cdatas = self.chunk_store.getitems(ckeys)
for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
if ckey in cdatas:
self._process_chunk(out, cdatas[ckey], chunk_select, drop_axes,
out_is_ndarray, fields, out_select)
else:
# check exception type
if self._fill_value is not None:
if fields:
fill_value = self._fill_value[fields]
else:
chunk = ensure_ndarray(cdata).view(self._dtype)
chunk = chunk.reshape(self._chunks, order=self._order)
np.copyto(dest, chunk)
return

# decode chunk
chunk = self._decode_chunk(cdata)

# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
out[out_selection] = tmp
fill_value = self._fill_value
out[out_select] = fill_value

def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):
"""Replace part or whole of a chunk.
Expand Down
4 changes: 4 additions & 0 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,10 @@ def _normalize_key(self, key):
key = '/'.join(bits + [end.replace('.', self.key_separator)])
return key.lower() if self.normalize_keys else key

def getitems(self, keys):
keys = [self._normalize_key(key) for key in keys]
return self.map.getitems(keys, on_error="omit")

def __getitem__(self, key):
key = self._normalize_key(key)
try:
Expand Down
23 changes: 23 additions & 0 deletions zarr/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,29 @@ def test_s3(self):

assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0]

@pytest.mark.usefixtures("s3")
def test_s3_complex(self):
import zarr
g = zarr.open_group("s3://test/out.zarr", mode='w',
storage_options=self.s3so)
expected = np.empty((8, 8, 8), dtype='int64')
expected[:] = -1
a = g.create_dataset("data", shape=(8, 8, 8),
fill_value=-1, chunks=(1, 1, 1))
expected[0] = 0
expected[3] = 3
a[:4] = expected[:4]

a = g.create_dataset("data_f", shape=(8, ), chunks=(1,),
dtype=[('foo', 'S3'), ('bar', 'i4')],
fill_value=(b"b", 1))
a[:4] = (b"aaa", 2)
g = zarr.open_group("s3://test/out.zarr", mode='r',
storage_options=self.s3so)

assert (g.data[:] == expected).all()
assert g.data_f['foo'].tolist() == [b"aaa"] * 4 + [b"b"] * 4


@pytest.fixture()
def s3(request):
Expand Down