Description
Zarr version
3.0.0a5
Numcodecs version
0.13.0
Python Version
3.10.14
Operating System
Linux
Installation
pip install zarr==3.0.0a5
Description
It appears as though the implementation for partial decoding chunks written using the sharding indexed codec is buggy. As in, the implementation fails to read a sharding-encoded array created by another implementation. The same dataset can be read without problem by the zarrita
project.
Steps to reproduce
I have attached a zip file containing the filesystem store testdata.zarr
used to reproduce the bug.
import numpy
import zarr
store = zarr.store.LocalStore("./testdata.zarr", mode="r")
a = zarr.Array.open(store.root / "some/group/another") # reads the sharded array with path /some/group/another.
a[:20, 10:11, :].shape # this fails with a long stacktrace.
# ValueError: When changing to a larger dtype, its size must be a divisor of the total size in bytes of the last axis of the array.
a2 = zarr.Array.open(store.root / "some/group/name") # reads a non-sharded array from same store with path /some/group/name.
a2[:20, 10:11, :].shape # this works
If I use zarrita
to read the problematic array, I get no errors:
import zarrita
store2 = zarrita.LocalStore('./testdata.zarr')
a3 = zarrita.Array.open(store2 / "some/group/another")
a3[:20, 10:11, :].shape # decodes just fine and prints (20, 1, 50)
Additional output
Here is the metadata of the "problematic" array:
{
"zarr_format":3,
"shape":[100,100,50],
"node_type":"array",
"data_type":"float64",
"codecs":
[
{"name":"sharding_indexed",
"configuration":
{"chunk_shape":[5,3,5],
"index_location":"start",
"index_codecs":[{"name":"bytes","configuration":{"endian":"big"}},{"name":"crc32c"}],
"codecs":[{"name":"transpose","configuration":{"order":[2,0,1]}},
{"name":"bytes","configuration":{"endian":"little"}}]}}],
"fill_value":"-Infinity",
"chunk_grid":{"name":"regular","configuration":{"chunk_shape":[10,15,20]}},
"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}}}
Here is the stacktrace of the exception thrown by zarr-python
when reading the sharded array:
stacktrace
ValueError Traceback (most recent call last)
Cell In[30], line 1
----> 1 a[:20, 10:11, :].shape
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:919,
in Array.__getitem__(self, selection)
917 return self.vindex[cast(CoordinateSelection | MaskSelection, selection)]
918 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
--> 919 return self.get_orthogonal_selection(pure_selection, fields=fields)
920 else:
921 return self.get_basic_selection(cast(BasicSelection, pure_selection), fi
elds=fields)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/_compat.py:43, in _
deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f
(*args, **kwargs)
41 extra_args = len(args) - len(all_args)
42 if extra_args <= 0:
---> 43 return f(*args, **kwargs)
45 # extra_args > 0
46 args_msg = [
47 f"{name}={arg}"
48 for name, arg in zip(kwonly_args[:extra_args], args[-extra_args:], stric
t=False)
49 ]
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:1361,
in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
1359 prototype = default_buffer_prototype()
1360 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 1361 return sync(
1362 self._async_array._get_selection(
1363 indexer=indexer, out=out, fields=fields, prototype=prototype
1364 )
1365 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/sync.py:91, in
sync(coro, loop, timeout)
88 return_result = next(iter(finished)).result()
90 if isinstance(return_result, BaseException):
---> 91 raise return_result
92 else:
93 return return_result
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/sync.py:50, in
_runner(coro)
45 """
46 Await a coroutine and return the result of running it. If awaiting the corou
tine raises an
47 exception, the exception will be returned.
48 """
49 try:
---> 50 return await coro
51 except Exception as ex:
52 return ex
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:476,
in AsyncArray._get_selection(self, indexer, prototype, out, fields)
468 out_buffer = prototype.nd_buffer.create(
469 shape=indexer.shape,
470 dtype=out_dtype,
471 order=self.order,
472 fill_value=self.metadata.fill_value,
473 )
474 if product(indexer.shape) > 0:
475 # reading chunks and decoding them
--> 476 await self.codec_pipeline.read(
477 [
478 (
479 self.store_path / self.metadata.encode_chunk_key(chunk_coord
s),
480 self.metadata.get_chunk_spec(chunk_coords, self.order, proto
type=prototype),
481 chunk_selection,
482 out_selection,
483 )
484 for chunk_coords, chunk_selection, out_selection in indexer
485 ],
486 out_buffer,
487 drop_axes=indexer.drop_axes,
488 )
489 return out_buffer.as_ndarray_like()
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
427, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
421 async def read(
422 self,
423 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
424 out: NDBuffer,
425 drop_axes: tuple[int, ...] = (),
426 ) -> None:
--> 427 await concurrent_map(
428 [
429 (single_batch_info, out, drop_axes)
430 for single_batch_info in batched(batch_info, self.batch_size)
431 ],
432 self.read_batch,
433 config.get("async.concurrency"),
434 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53,
in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
238, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
231 async def read_batch(
232 self,
233 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
234 out: NDBuffer,
235 drop_axes: tuple[int, ...] = (),
236 ) -> None:
237 if self.supports_partial_decode:
--> 238 chunk_array_batch = await self.decode_partial_batch(
239 [
240 (byte_getter, chunk_selection, chunk_spec)
241 for byte_getter, chunk_spec, chunk_selection, _ in batch_inf
o
242 ]
243 )
244 for chunk_array, (_, chunk_spec, _, out_selection) in zip(
245 chunk_array_batch, batch_info, strict=False
246 ):
247 if chunk_array is not None:
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
194, in BatchedCodecPipeline.decode_partial_batch(self, batch_info)
192 assert self.supports_partial_decode
193 assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin)
--> 194 return await self.array_bytes_codec.decode_partial(batch_info)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:200, i
n ArrayBytesCodecPartialDecodeMixin.decode_partial(self, batch_info)
180 async def decode_partial(
181 self,
182 batch_info: Iterable[tuple[ByteGetter, SelectorTuple, ArraySpec]],
183 ) -> Iterable[NDBuffer | None]:
184 """Partially decodes a batch of chunks.
185 This method determines parts of a chunk from the slice selection,
186 fetches these parts from the store (via ByteGetter) and decodes them.
(...)
198 Iterable[NDBuffer | None]
199 """
--> 200 return await concurrent_map(
201 list(batch_info),
202 self._decode_partial_single,
203 config.get("async.concurrency"),
204 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53,
in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/sharding.py:
510, in ShardingCodec._decode_partial_single(self, byte_getter, selection, shard_spe
c)
507 shard_dict[chunk_coords] = chunk_bytes
509 # decoding chunks and writing them into the output buffer
--> 510 await self.codec_pipeline.read(
511 [
512 (
513 _ShardingByteGetter(shard_dict, chunk_coords),
514 chunk_spec,
515 chunk_selection,
516 out_selection,
517 )
518 for chunk_coords, chunk_selection, out_selection in indexer
519 ],
520 out,
521 )
522 return out
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
427, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
421 async def read(
422 self,
423 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
424 out: NDBuffer,
425 drop_axes: tuple[int, ...] = (),
426 ) -> None:
--> 427 await concurrent_map(
428 [
429 (single_batch_info, out, drop_axes)
430 for single_batch_info in batched(batch_info, self.batch_size)
431 ],
432 self.read_batch,
433 config.get("async.concurrency"),
434 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53,
in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
260, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
251 else:
252 chunk_bytes_batch = await concurrent_map(
253 [
254 (byte_getter, array_spec.prototype)
(...)
258 config.get("async.concurrency"),
259 )
--> 260 chunk_array_batch = await self.decode_batch(
261 [
262 (chunk_bytes, chunk_spec)
263 for chunk_bytes, (_, chunk_spec, _, _) in zip(
264 chunk_bytes_batch, batch_info, strict=False
265 )
266 ],
267 )
268 for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
269 chunk_array_batch, batch_info, strict=False
270 ):
271 if chunk_array is not None:
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
177, in BatchedCodecPipeline.decode_batch(self, chunk_bytes_and_specs)
172 chunk_bytes_batch = await bb_codec.decode(
173 zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
174 )
176 ab_codec, chunk_spec_batch = ab_codec_with_spec
--> 177 chunk_array_batch = await ab_codec.decode(
178 zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
179 )
181 for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
182 chunk_array_batch = await aa_codec.decode(
183 zip(chunk_array_batch, chunk_spec_batch, strict=False)
184 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:125, i
n _Codec.decode(self, chunks_and_specs)
109 async def decode(
110 self,
111 chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]],
112 ) -> Iterable[CodecInput | None]:
113 """Decodes a batch of chunks.
114 Chunks can be None in which case they are ignored by the codec.
115
(...)
123 Iterable[CodecInput | None]
124 """
--> 125 return await _batching_helper(self._decode_single, chunks_and_specs)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:409, i
n _batching_helper(func, batch_info)
405 async def _batching_helper(
406 func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
407 batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
408 ) -> list[CodecOutput | None]:
--> 409 return await concurrent_map(
410 list(batch_info),
411 _noop_for_none(func),
412 config.get("async.concurrency"),
413 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53,
in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:422, i
n _noop_for_none.<locals>.wrap(chunk, chunk_spec)
420 if chunk is None:
421 return None
--> 422 return await func(chunk, chunk_spec)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/bytes.py:89,
in BytesCodec._decode_single(self, chunk_bytes, chunk_spec)
86 else:
87 as_nd_array_like = np.asanyarray(as_array_like)
88 chunk_array = chunk_spec.prototype.nd_buffer.from_ndarray_like(
---> 89 as_nd_array_like.view(dtype=dtype)
90 )
92 # ensure correct chunk shape
93 if chunk_array.shape != chunk_spec.shape:
ValueError: When changing to a larger dtype, its size must be a divisor of the total
size in bytes of the last axis of the array.
NOTE
It's worth noting that if I add a compression codec to the sharding config's codec pipeline (e.g Gzip with compression level 5); I get a different exception: ValueError: cannot reshape array of size 225 into shape (5,5,3)
. zarrita
still reads the array correctly even in this scenario.
Data generation code
The data inside the attached zip can be generated using the following python code:
Python code
import numpy as np
import zarrita
store = zarrita.LocalStore('testdata.zarr')
a = zarrita.Array.create(
store / "some/group/another",
shape=(100, 100, 50),
dtype='float64',
chunk_shape=(10, 15, 20),
chunk_key_encoding=('default', '/'),
codecs=[
zarrita.codecs.sharding_codec(
chunk_shape=(5,3,5),
codecs=[
zarrita.codecs.transpose_codec(order=(2,0,1)),
zarrita.codecs.bytes_codec(endian="little")
],
index_codecs=[
zarrita.codecs.bytes_codec(endian="big"),
zarrita.codecs.crc32c_codec()
],
index_location="start")
],
fill_value=-np.inf
)
a[:30, 10:12, :] = np.random.rand(30, 2, 50)