Skip to content

Commit 109f71f

Browse files
authored
Refactors v2 codec handling (#2425)
* refactors codec pipeline for v2 * async * rm ensure_bytes
1 parent 87ca150 commit 109f71f

File tree

3 files changed

+60
-71
lines changed

3 files changed

+60
-71
lines changed

src/zarr/codecs/_v2.py

Lines changed: 48 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,21 @@
55
from typing import TYPE_CHECKING
66

77
import numcodecs
8-
from numcodecs.compat import ensure_bytes, ensure_ndarray
8+
from numcodecs.compat import ensure_ndarray_like
99

10-
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec
11-
from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype
10+
from zarr.abc.codec import ArrayBytesCodec
1211
from zarr.registry import get_ndbuffer_class
1312

1413
if TYPE_CHECKING:
1514
import numcodecs.abc
1615

1716
from zarr.core.array_spec import ArraySpec
17+
from zarr.core.buffer import Buffer, NDBuffer
1818

1919

2020
@dataclass(frozen=True)
21-
class V2Compressor(ArrayBytesCodec):
21+
class V2Codec(ArrayBytesCodec):
22+
filters: tuple[numcodecs.abc.Codec, ...] | None
2223
compressor: numcodecs.abc.Codec | None
2324

2425
is_fixed_size = False
@@ -28,81 +29,61 @@ async def _decode_single(
2829
chunk_bytes: Buffer,
2930
chunk_spec: ArraySpec,
3031
) -> NDBuffer:
31-
if self.compressor is not None:
32-
chunk_numpy_array = ensure_ndarray(
33-
await asyncio.to_thread(self.compressor.decode, chunk_bytes.as_array_like())
34-
)
32+
cdata = chunk_bytes.as_array_like()
33+
# decompress
34+
if self.compressor:
35+
chunk = await asyncio.to_thread(self.compressor.decode, cdata)
3536
else:
36-
chunk_numpy_array = ensure_ndarray(chunk_bytes.as_array_like())
37+
chunk = cdata
38+
39+
# apply filters
40+
if self.filters:
41+
for f in reversed(self.filters):
42+
chunk = await asyncio.to_thread(f.decode, chunk)
43+
44+
# view as numpy array with correct dtype
45+
chunk = ensure_ndarray_like(chunk)
46+
# special case object dtype, because incorrect handling can lead to
47+
# segfaults and other bad things happening
48+
if chunk_spec.dtype != object:
49+
chunk = chunk.view(chunk_spec.dtype)
50+
elif chunk.dtype != object:
51+
# If we end up here, someone must have hacked around with the filters.
52+
# We cannot deal with object arrays unless there is an object
53+
# codec in the filter chain, i.e., a filter that converts from object
54+
# array to something else during encoding, and converts back to object
55+
# array during decoding.
56+
raise RuntimeError("cannot read object array without object codec")
3757

38-
# ensure correct dtype
39-
if str(chunk_numpy_array.dtype) != chunk_spec.dtype and not chunk_spec.dtype.hasobject:
40-
chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype)
58+
# ensure correct chunk shape
59+
chunk = chunk.reshape(-1, order="A")
60+
chunk = chunk.reshape(chunk_spec.shape, order=chunk_spec.order)
4161

42-
return get_ndbuffer_class().from_numpy_array(chunk_numpy_array)
62+
return get_ndbuffer_class().from_ndarray_like(chunk)
4363

4464
async def _encode_single(
45-
self,
46-
chunk_array: NDBuffer,
47-
_chunk_spec: ArraySpec,
48-
) -> Buffer | None:
49-
chunk_numpy_array = chunk_array.as_numpy_array()
50-
if self.compressor is not None:
51-
if (
52-
not chunk_numpy_array.flags.c_contiguous
53-
and not chunk_numpy_array.flags.f_contiguous
54-
):
55-
chunk_numpy_array = chunk_numpy_array.copy(order="A")
56-
encoded_chunk_bytes = ensure_bytes(
57-
await asyncio.to_thread(self.compressor.encode, chunk_numpy_array)
58-
)
59-
else:
60-
encoded_chunk_bytes = ensure_bytes(chunk_numpy_array)
61-
62-
return default_buffer_prototype().buffer.from_bytes(encoded_chunk_bytes)
63-
64-
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
65-
raise NotImplementedError
66-
67-
68-
@dataclass(frozen=True)
69-
class V2Filters(ArrayArrayCodec):
70-
filters: tuple[numcodecs.abc.Codec, ...] | None
71-
72-
is_fixed_size = False
73-
74-
async def _decode_single(
7565
self,
7666
chunk_array: NDBuffer,
7767
chunk_spec: ArraySpec,
78-
) -> NDBuffer:
79-
chunk_ndarray = chunk_array.as_ndarray_like()
80-
# apply filters in reverse order
81-
if self.filters is not None:
82-
for filter in self.filters[::-1]:
83-
chunk_ndarray = await asyncio.to_thread(filter.decode, chunk_ndarray)
84-
85-
# ensure correct chunk shape
86-
if chunk_ndarray.shape != chunk_spec.shape:
87-
chunk_ndarray = chunk_ndarray.reshape(
88-
chunk_spec.shape,
89-
order=chunk_spec.order,
90-
)
68+
) -> Buffer | None:
69+
chunk = chunk_array.as_ndarray_like()
9170

92-
return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)
71+
# apply filters
72+
if self.filters:
73+
for f in self.filters:
74+
chunk = await asyncio.to_thread(f.encode, chunk)
9375

94-
async def _encode_single(
95-
self,
96-
chunk_array: NDBuffer,
97-
chunk_spec: ArraySpec,
98-
) -> NDBuffer | None:
99-
chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order)
76+
# check object encoding
77+
if ensure_ndarray_like(chunk).dtype == object:
78+
raise RuntimeError("cannot write object array without object codec")
10079

101-
if self.filters is not None:
102-
for filter in self.filters:
103-
chunk_ndarray = await asyncio.to_thread(filter.encode, chunk_ndarray)
80+
# compress
81+
if self.compressor:
82+
cdata = await asyncio.to_thread(self.compressor.encode, chunk)
83+
else:
84+
cdata = chunk
10485

105-
return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)
86+
return chunk_spec.prototype.buffer.from_bytes(cdata)
10687

10788
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
10889
raise NotImplementedError

src/zarr/core/array.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from zarr._compat import _deprecate_positional_args
1414
from zarr.abc.store import Store, set_or_delete
1515
from zarr.codecs import _get_default_array_bytes_codec
16-
from zarr.codecs._v2 import V2Compressor, V2Filters
16+
from zarr.codecs._v2 import V2Codec
1717
from zarr.core.attributes import Attributes
1818
from zarr.core.buffer import (
1919
BufferPrototype,
@@ -118,9 +118,8 @@ def create_codec_pipeline(metadata: ArrayMetadata) -> CodecPipeline:
118118
if isinstance(metadata, ArrayV3Metadata):
119119
return get_pipeline_class().from_codecs(metadata.codecs)
120120
elif isinstance(metadata, ArrayV2Metadata):
121-
return get_pipeline_class().from_codecs(
122-
[V2Filters(metadata.filters), V2Compressor(metadata.compressor)]
123-
)
121+
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
122+
return get_pipeline_class().from_codecs([v2_codec])
124123
else:
125124
raise TypeError
126125

tests/test_v2.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,12 @@ async def test_create_dtype_str(dtype: Any) -> None:
121121
arr[:] = ["a", "bb", "ccc"]
122122
result = arr[:]
123123
np.testing.assert_array_equal(result, np.array(["a", "bb", "ccc"], dtype="object"))
124+
125+
126+
@pytest.mark.parametrize("filters", [[], [numcodecs.Delta(dtype="<i4")], [numcodecs.Zlib(level=2)]])
127+
def test_v2_filters_codecs(filters: Any) -> None:
128+
array_fixture = [42]
129+
arr = zarr.create(shape=1, dtype="<i4", zarr_format=2, filters=filters)
130+
arr[:] = array_fixture
131+
result = arr[:]
132+
np.testing.assert_array_equal(result, array_fixture)

0 commit comments

Comments
 (0)