Skip to content

[v3] zarr-python fails to decode sharded array written by other implementations. #2302

Closed
@zoj613

Description

@zoj613

Zarr version

3.0.0a5

Numcodecs version

0.13.0

Python Version

3.10.14

Operating System

Linux

Installation

pip install zarr==3.0.0a5

Description

It appears as though the implementation for partial decoding chunks written using the sharding indexed codec is buggy. As in, the implementation fails to read a sharding-encoded array created by another implementation. The same dataset can be read without problem by the zarrita project.

Steps to reproduce

shardingdata.zip

I have attached a zip file containing the filesystem store testdata.zarr used to reproduce the bug.

import numpy
import zarr

store = zarr.store.LocalStore("./testdata.zarr", mode="r")
a = zarr.Array.open(store.root / "some/group/another") # reads the sharded array with path /some/group/another.
a[:20, 10:11, :].shape  # this fails with a long stacktrace.
# ValueError: When changing to a larger dtype, its size must be a divisor of the total size in bytes of the last axis of the array.

a2 = zarr.Array.open(store.root / "some/group/name") # reads a non-sharded array from same store with path /some/group/name.
a2[:20, 10:11, :].shape  # this works

If I use zarrita to read the problematic array, I get no errors:

import zarrita

store2 = zarrita.LocalStore('./testdata.zarr')
a3 = zarrita.Array.open(store2 / "some/group/another")
a3[:20, 10:11, :].shape  # decodes just fine and prints (20, 1, 50)

Additional output

Here is the metadata of the "problematic" array:

{
"zarr_format":3,
"shape":[100,100,50],
"node_type":"array",
"data_type":"float64",
"codecs":
	[
	{"name":"sharding_indexed",
	 "configuration":
		{"chunk_shape":[5,3,5],
		 "index_location":"start",
		 "index_codecs":[{"name":"bytes","configuration":{"endian":"big"}},{"name":"crc32c"}],
		 "codecs":[{"name":"transpose","configuration":{"order":[2,0,1]}},
	{"name":"bytes","configuration":{"endian":"little"}}]}}],
"fill_value":"-Infinity",
"chunk_grid":{"name":"regular","configuration":{"chunk_shape":[10,15,20]}},
"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}}}

Here is the stacktrace of the exception thrown by zarr-python when reading the sharded array:

stacktrace
ValueError                                Traceback (most recent call last)
Cell In[30], line 1
----> 1 a[:20, 10:11, :].shape

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:919, 
in Array.__getitem__(self, selection)
    917     return self.vindex[cast(CoordinateSelection | MaskSelection, selection)]
    918 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
--> 919     return self.get_orthogonal_selection(pure_selection, fields=fields)
    920 else:
    921     return self.get_basic_selection(cast(BasicSelection, pure_selection), fi
elds=fields)

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/_compat.py:43, in _
deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f
(*args, **kwargs)
     41 extra_args = len(args) - len(all_args)
     42 if extra_args <= 0:
---> 43     return f(*args, **kwargs)
     45 # extra_args > 0
     46 args_msg = [
     47     f"{name}={arg}"
     48     for name, arg in zip(kwonly_args[:extra_args], args[-extra_args:], stric
t=False)
     49 ]

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:1361,
 in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
   1359     prototype = default_buffer_prototype()
   1360 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 1361 return sync(
   1362     self._async_array._get_selection(
   1363         indexer=indexer, out=out, fields=fields, prototype=prototype
   1364     )
   1365 )

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/sync.py:91, in
 sync(coro, loop, timeout)
     88 return_result = next(iter(finished)).result()
     90 if isinstance(return_result, BaseException):
---> 91     raise return_result
     92 else:
     93     return return_result

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/sync.py:50, in
 _runner(coro)
     45 """
     46 Await a coroutine and return the result of running it. If awaiting the corou
tine raises an
     47 exception, the exception will be returned.
     48 """
     49 try:
---> 50     return await coro
     51 except Exception as ex:
     52     return ex

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:476, 
in AsyncArray._get_selection(self, indexer, prototype, out, fields)
    468     out_buffer = prototype.nd_buffer.create(
    469         shape=indexer.shape,
    470         dtype=out_dtype,
    471         order=self.order,
    472         fill_value=self.metadata.fill_value,
    473     )
    474 if product(indexer.shape) > 0:
    475     # reading chunks and decoding them
--> 476     await self.codec_pipeline.read(
    477         [
    478             (
    479                 self.store_path / self.metadata.encode_chunk_key(chunk_coord
s),
    480                 self.metadata.get_chunk_spec(chunk_coords, self.order, proto
type=prototype),
    481                 chunk_selection,
    482                 out_selection,
    483             )
    484             for chunk_coords, chunk_selection, out_selection in indexer
    485         ],
    486         out_buffer,
    487         drop_axes=indexer.drop_axes,
    488     )
    489 return out_buffer.as_ndarray_like()

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
427, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
    421 async def read(
    422     self,
    423     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
    424     out: NDBuffer,
    425     drop_axes: tuple[int, ...] = (),
    426 ) -> None:
--> 427     await concurrent_map(
    428         [
    429             (single_batch_info, out, drop_axes)
    430             for single_batch_info in batched(batch_info, self.batch_size)
    431         ],
    432         self.read_batch,
    433         config.get("async.concurrency"),
    434     )

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53, 
in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
238, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
    231 async def read_batch(
    232     self,
    233     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
    234     out: NDBuffer,
    235     drop_axes: tuple[int, ...] = (),
    236 ) -> None:
    237     if self.supports_partial_decode:
--> 238         chunk_array_batch = await self.decode_partial_batch(
    239             [
    240                 (byte_getter, chunk_selection, chunk_spec)
    241                 for byte_getter, chunk_spec, chunk_selection, _ in batch_inf
o
    242             ]
    243         )
    244         for chunk_array, (_, chunk_spec, _, out_selection) in zip(
    245             chunk_array_batch, batch_info, strict=False
    246         ):
    247             if chunk_array is not None:

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
194, in BatchedCodecPipeline.decode_partial_batch(self, batch_info)
    192 assert self.supports_partial_decode
    193 assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin)
--> 194 return await self.array_bytes_codec.decode_partial(batch_info)

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:200, i
n ArrayBytesCodecPartialDecodeMixin.decode_partial(self, batch_info)
    180 async def decode_partial(
    181     self,
    182     batch_info: Iterable[tuple[ByteGetter, SelectorTuple, ArraySpec]],
    183 ) -> Iterable[NDBuffer | None]:
    184     """Partially decodes a batch of chunks.
    185     This method determines parts of a chunk from the slice selection,
    186     fetches these parts from the store (via ByteGetter) and decodes them.
   (...)
    198     Iterable[NDBuffer | None]
    199     """
--> 200     return await concurrent_map(
    201         list(batch_info),
    202         self._decode_partial_single,
    203         config.get("async.concurrency"),
    204     )

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53, 
in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/sharding.py:
510, in ShardingCodec._decode_partial_single(self, byte_getter, selection, shard_spe
c)
    507                 shard_dict[chunk_coords] = chunk_bytes
    509 # decoding chunks and writing them into the output buffer
--> 510 await self.codec_pipeline.read(
    511     [
    512         (
    513             _ShardingByteGetter(shard_dict, chunk_coords),
    514             chunk_spec,
    515             chunk_selection,
    516             out_selection,
    517         )
    518         for chunk_coords, chunk_selection, out_selection in indexer
    519     ],
    520     out,
    521 )
    522 return out

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
427, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
    421 async def read(
    422     self,
    423     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
    424     out: NDBuffer,
    425     drop_axes: tuple[int, ...] = (),
    426 ) -> None:
--> 427     await concurrent_map(
    428         [
    429             (single_batch_info, out, drop_axes)
    430             for single_batch_info in batched(batch_info, self.batch_size)
    431         ],
    432         self.read_batch,
    433         config.get("async.concurrency"),
    434     )

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53, 
in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
260, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
    251 else:
    252     chunk_bytes_batch = await concurrent_map(
    253         [
    254             (byte_getter, array_spec.prototype)
   (...)
    258         config.get("async.concurrency"),
    259     )
--> 260     chunk_array_batch = await self.decode_batch(
    261         [
    262             (chunk_bytes, chunk_spec)
    263             for chunk_bytes, (_, chunk_spec, _, _) in zip(
    264                 chunk_bytes_batch, batch_info, strict=False
    265             )
    266         ],
    267     )
    268     for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
    269         chunk_array_batch, batch_info, strict=False
    270     ):
    271         if chunk_array is not None:

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
177, in BatchedCodecPipeline.decode_batch(self, chunk_bytes_and_specs)
    172     chunk_bytes_batch = await bb_codec.decode(
    173         zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
    174     )
    176 ab_codec, chunk_spec_batch = ab_codec_with_spec
--> 177 chunk_array_batch = await ab_codec.decode(
    178     zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
    179 )
    181 for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
    182     chunk_array_batch = await aa_codec.decode(
    183         zip(chunk_array_batch, chunk_spec_batch, strict=False)
    184     )

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:125, i
n _Codec.decode(self, chunks_and_specs)
    109 async def decode(
    110     self,
    111     chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]],
    112 ) -> Iterable[CodecInput | None]:
    113     """Decodes a batch of chunks.
    114     Chunks can be None in which case they are ignored by the codec.
    115 
   (...)
    123     Iterable[CodecInput | None]
    124     """
--> 125     return await _batching_helper(self._decode_single, chunks_and_specs)

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:409, i
n _batching_helper(func, batch_info)
    405 async def _batching_helper(
    406     func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
    407     batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
    408 ) -> list[CodecOutput | None]:
--> 409     return await concurrent_map(
    410         list(batch_info),
    411         _noop_for_none(func),
    412         config.get("async.concurrency"),
    413     )

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53, 
in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:422, i
n _noop_for_none.<locals>.wrap(chunk, chunk_spec)
    420 if chunk is None:
    421     return None
--> 422 return await func(chunk, chunk_spec)

File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/bytes.py:89,
 in BytesCodec._decode_single(self, chunk_bytes, chunk_spec)
     86 else:
     87     as_nd_array_like = np.asanyarray(as_array_like)
     88 chunk_array = chunk_spec.prototype.nd_buffer.from_ndarray_like(
---> 89     as_nd_array_like.view(dtype=dtype)
     90 )
     92 # ensure correct chunk shape
     93 if chunk_array.shape != chunk_spec.shape:

ValueError: When changing to a larger dtype, its size must be a divisor of the total
 size in bytes of the last axis of the array.

NOTE

It's worth noting that if I add a compression codec to the sharding config's codec pipeline (e.g Gzip with compression level 5); I get a different exception: ValueError: cannot reshape array of size 225 into shape (5,5,3) . zarrita still reads the array correctly even in this scenario.

Data generation code

The data inside the attached zip can be generated using the following python code:

Python code
import numpy as np
import zarrita

store = zarrita.LocalStore('testdata.zarr')
a = zarrita.Array.create(
   store / "some/group/another",
   shape=(100, 100, 50),
   dtype='float64',
   chunk_shape=(10, 15, 20),
   chunk_key_encoding=('default', '/'),
   codecs=[
   	zarrita.codecs.sharding_codec(
		chunk_shape=(5,3,5),
		codecs=[
			zarrita.codecs.transpose_codec(order=(2,0,1)),
 			zarrita.codecs.bytes_codec(endian="little")
		],
		index_codecs=[
			zarrita.codecs.bytes_codec(endian="big"), 
			zarrita.codecs.crc32c_codec()
		],
		index_location="start")
	],
   fill_value=-np.inf
)
a[:30, 10:12, :] = np.random.rand(30, 2, 50)

Metadata

Metadata

Assignees

Labels

bugPotential issues with the zarr-python library

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions