Skip to content

Use config to select implementation #1982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 44 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
bdf58a6
make codec pipeline implementation configurable
brokkoli71 Jun 17, 2024
e2a5e11
add test_config_codec_pipeline_class_in_env
brokkoli71 Jun 17, 2024
311cfcc
make codec implementation configurable
brokkoli71 Jun 18, 2024
364c403
remove snake case support for class names in config
brokkoli71 Jun 18, 2024
9e94e36
use registry for codec pipeline config
brokkoli71 Jun 18, 2024
11f184d
typing
brokkoli71 Jun 18, 2024
216a5d4
load codec pipeline from entrypoints
brokkoli71 Jun 18, 2024
2a3b7ea
test if configured codec implementation and codec pipeline is used
brokkoli71 Jun 18, 2024
02d1f6e
make ndbuffer implementation configurable
brokkoli71 Jun 20, 2024
6467149
fix circular import
brokkoli71 Jun 20, 2024
460f853
change class method calls on NDBuffer to use get_ndbuffer_class()
brokkoli71 Jun 20, 2024
acc7f17
make buffer implementation configurable
brokkoli71 Jun 20, 2024
bbb8822
Merge branch 'refs/heads/master' into use-config-to-select-codecs
brokkoli71 Jun 20, 2024
b060722
format
brokkoli71 Jun 20, 2024
1ca197d
fix tests
brokkoli71 Jun 20, 2024
26329f6
ignore mypy in tests
brokkoli71 Jun 20, 2024
9601a6f
add test to lazy load (nd)buffer from entrypoint
brokkoli71 Jun 21, 2024
ffe5832
better assertion message
brokkoli71 Jun 24, 2024
556eed6
Merge branch 'refs/heads/master' into use-config-to-select-codecs
brokkoli71 Jun 24, 2024
d07a127
fix merge
brokkoli71 Jun 24, 2024
7448f36
fix merge
brokkoli71 Jun 24, 2024
5142d95
Merge remote-tracking branch 'origin/use-config-to-select-codecs' int…
brokkoli71 Jun 24, 2024
57ad3b4
formatting
brokkoli71 Jun 24, 2024
0b2cf9a
fix mypy
brokkoli71 Jun 24, 2024
4f6d690
fix ruff formatting
brokkoli71 Jun 24, 2024
3b34b60
Merge branch 'refs/heads/master' into use-config-to-select-codecs
brokkoli71 Jun 26, 2024
96676b7
fix merge
brokkoli71 Jun 26, 2024
8b2a60d
Merge branch 'refs/heads/master' into use-config-to-select-codecs
brokkoli71 Jun 27, 2024
c098d9a
fix mypy
brokkoli71 Jun 27, 2024
97e004b
use numpy_buffer_prototype for reading shard index
brokkoli71 Jul 1, 2024
efbab6b
Merge branch 'refs/heads/master' into use-config-to-select-codecs
brokkoli71 Jul 4, 2024
01ab484
rename buffer and entrypoint test-classes
brokkoli71 Jul 4, 2024
9627157
document interaction registry and config
brokkoli71 Jul 4, 2024
5e83002
change config prefix from zarr_python to zarr
brokkoli71 Jul 8, 2024
cc5f93c
use fully_qualified_name for implementation config
brokkoli71 Jul 8, 2024
885329f
Merge branch 'refs/heads/master' into use-config-to-select-codecs
brokkoli71 Jul 8, 2024
ae1023c
refactor registry dicts
brokkoli71 Jul 8, 2024
2d89931
fix default_buffer_prototype access in tests
brokkoli71 Jul 8, 2024
168efff
allow multiple implementations per entry_point
brokkoli71 Jul 9, 2024
a13e7de
add tests for multiple implementations per entry_point
brokkoli71 Jul 9, 2024
56335e4
fix DeprecationWarning: SelectableGroups in registry.py
brokkoli71 Jul 10, 2024
ca27b1d
fix DeprecationWarning: EntryPoints list interface in registry.py
brokkoli71 Jul 10, 2024
d470ec6
clarify _collect_entrypoints docstring
brokkoli71 Jul 10, 2024
d210403
Merge branch 'refs/heads/master' into use-config-to-select-codecs
brokkoli71 Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing_extensions import Self

from zarr.array_spec import ArraySpec
from zarr.common import JSON
from zarr.indexing import SelectorTuple

CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer)
Expand Down Expand Up @@ -254,7 +255,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:

@classmethod
@abstractmethod
def from_list(cls, codecs: list[Codec]) -> Self:
def from_list(cls, codecs: Iterable[Codec]) -> Self:
"""Creates a codec pipeline from a list of codecs.

Parameters
Expand Down Expand Up @@ -390,6 +391,15 @@ async def write(
"""
...

@classmethod
def from_dict(cls, data: Iterable[JSON | Codec]) -> Self:
"""
Create an instance of the model from a dictionary
"""
...

return cls(**data)


async def batching_helper(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
Expand Down
61 changes: 44 additions & 17 deletions src/zarr/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from zarr.chunk_key_encodings import ChunkKeyEncoding, DefaultChunkKeyEncoding, V2ChunkKeyEncoding
from zarr.codecs import BytesCodec
from zarr.codecs._v2 import V2Compressor, V2Filters
from zarr.codecs.pipeline import BatchedCodecPipeline
from zarr.common import (
JSON,
ZARR_JSON,
Expand Down Expand Up @@ -61,6 +60,7 @@
pop_fields,
)
from zarr.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata
from zarr.registry import get_pipeline_class
from zarr.store import StoreLike, StorePath, make_store_path
from zarr.store.core import (
ensure_no_existing_node,
Expand All @@ -79,11 +79,11 @@ def parse_array_metadata(data: Any) -> ArrayV2Metadata | ArrayV3Metadata:
raise TypeError


def create_codec_pipeline(metadata: ArrayV2Metadata | ArrayV3Metadata) -> BatchedCodecPipeline:
def create_codec_pipeline(metadata: ArrayV2Metadata | ArrayV3Metadata) -> CodecPipeline:
if isinstance(metadata, ArrayV3Metadata):
return BatchedCodecPipeline.from_list(metadata.codecs)
return get_pipeline_class().from_list(metadata.codecs)
elif isinstance(metadata, ArrayV2Metadata):
return BatchedCodecPipeline.from_list(
return get_pipeline_class().from_list(
[V2Filters(metadata.filters or []), V2Compressor(metadata.compressor)]
)
else:
Expand Down Expand Up @@ -483,8 +483,13 @@ async def _get_selection(
return out_buffer.as_ndarray_like()

async def getitem(
self, selection: BasicSelection, *, prototype: BufferPrototype = default_buffer_prototype
self,
selection: BasicSelection,
*,
prototype: BufferPrototype | None = None,
) -> NDArrayLike:
if prototype is None:
prototype = default_buffer_prototype()
indexer = BasicIndexer(
selection,
shape=self.metadata.shape,
Expand All @@ -493,7 +498,7 @@ async def getitem(
return await self._get_selection(indexer, prototype=prototype)

async def _save_metadata(self, metadata: ArrayMetadata) -> None:
to_save = metadata.to_buffer_dict()
to_save = metadata.to_buffer_dict(default_buffer_prototype())
awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()]
await gather(*awaitables)

Expand Down Expand Up @@ -545,8 +550,10 @@ async def setitem(
self,
selection: BasicSelection,
value: npt.ArrayLike,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> None:
if prototype is None:
prototype = default_buffer_prototype()
indexer = BasicIndexer(
selection,
shape=self.metadata.shape,
Expand Down Expand Up @@ -1001,7 +1008,7 @@ def get_basic_selection(
selection: BasicSelection = Ellipsis,
*,
out: NDBuffer | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
fields: Fields | None = None,
) -> NDArrayLike:
"""Retrieve data for an item or region of the array.
Expand Down Expand Up @@ -1108,6 +1115,8 @@ def get_basic_selection(

"""

if prototype is None:
prototype = default_buffer_prototype()
return sync(
self._async_array._get_selection(
BasicIndexer(selection, self.shape, self.metadata.chunk_grid),
Expand All @@ -1123,7 +1132,7 @@ def set_basic_selection(
value: npt.ArrayLike,
*,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> None:
"""Modify data for an item or region of the array.

Expand Down Expand Up @@ -1207,6 +1216,8 @@ def set_basic_selection(
vindex, oindex, blocks, __getitem__, __setitem__

"""
if prototype is None:
prototype = default_buffer_prototype()
indexer = BasicIndexer(selection, self.shape, self.metadata.chunk_grid)
sync(self._async_array._set_selection(indexer, value, fields=fields, prototype=prototype))

Expand All @@ -1216,7 +1227,7 @@ def get_orthogonal_selection(
*,
out: NDBuffer | None = None,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> NDArrayLike:
"""Retrieve data by making a selection for each dimension of the array. For
example, if an array has 2 dimensions, allows selecting specific rows and/or
Expand Down Expand Up @@ -1325,6 +1336,8 @@ def get_orthogonal_selection(
vindex, oindex, blocks, __getitem__, __setitem__

"""
if prototype is None:
prototype = default_buffer_prototype()
indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
return sync(
self._async_array._get_selection(
Expand All @@ -1338,7 +1351,7 @@ def set_orthogonal_selection(
value: npt.ArrayLike,
*,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> None:
"""Modify data via a selection for each dimension of the array.

Expand Down Expand Up @@ -1435,6 +1448,8 @@ def set_orthogonal_selection(
vindex, oindex, blocks, __getitem__, __setitem__

"""
if prototype is None:
prototype = default_buffer_prototype()
indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
return sync(
self._async_array._set_selection(indexer, value, fields=fields, prototype=prototype)
Expand All @@ -1446,7 +1461,7 @@ def get_mask_selection(
*,
out: NDBuffer | None = None,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> NDArrayLike:
"""Retrieve a selection of individual items, by providing a Boolean array of the
same shape as the array against which the selection is being made, where True
Expand Down Expand Up @@ -1513,6 +1528,8 @@ def get_mask_selection(
vindex, oindex, blocks, __getitem__, __setitem__
"""

if prototype is None:
prototype = default_buffer_prototype()
indexer = MaskIndexer(mask, self.shape, self.metadata.chunk_grid)
return sync(
self._async_array._get_selection(
Expand All @@ -1526,7 +1543,7 @@ def set_mask_selection(
value: npt.ArrayLike,
*,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> None:
"""Modify a selection of individual items, by providing a Boolean array of the
same shape as the array against which the selection is being made, where True
Expand Down Expand Up @@ -1593,6 +1610,8 @@ def set_mask_selection(
vindex, oindex, blocks, __getitem__, __setitem__

"""
if prototype is None:
prototype = default_buffer_prototype()
indexer = MaskIndexer(mask, self.shape, self.metadata.chunk_grid)
sync(self._async_array._set_selection(indexer, value, fields=fields, prototype=prototype))

Expand All @@ -1602,7 +1621,7 @@ def get_coordinate_selection(
*,
out: NDBuffer | None = None,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> NDArrayLike:
"""Retrieve a selection of individual items, by providing the indices
(coordinates) for each selected item.
Expand Down Expand Up @@ -1671,6 +1690,8 @@ def get_coordinate_selection(
vindex, oindex, blocks, __getitem__, __setitem__

"""
if prototype is None:
prototype = default_buffer_prototype()
indexer = CoordinateIndexer(selection, self.shape, self.metadata.chunk_grid)
out_array = sync(
self._async_array._get_selection(
Expand All @@ -1689,7 +1710,7 @@ def set_coordinate_selection(
value: npt.ArrayLike,
*,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> None:
"""Modify a selection of individual items, by providing the indices (coordinates)
for each item to be modified.
Expand Down Expand Up @@ -1753,6 +1774,8 @@ def set_coordinate_selection(
vindex, oindex, blocks, __getitem__, __setitem__

"""
if prototype is None:
prototype = default_buffer_prototype()
# setup indexer
indexer = CoordinateIndexer(selection, self.shape, self.metadata.chunk_grid)

Expand All @@ -1776,7 +1799,7 @@ def get_block_selection(
*,
out: NDBuffer | None = None,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> NDArrayLike:
"""Retrieve a selection of individual items, by providing the indices
(coordinates) for each selected item.
Expand Down Expand Up @@ -1859,6 +1882,8 @@ def get_block_selection(
vindex, oindex, blocks, __getitem__, __setitem__

"""
if prototype is None:
prototype = default_buffer_prototype()
indexer = BlockIndexer(selection, self.shape, self.metadata.chunk_grid)
return sync(
self._async_array._get_selection(
Expand All @@ -1872,7 +1897,7 @@ def set_block_selection(
value: npt.ArrayLike,
*,
fields: Fields | None = None,
prototype: BufferPrototype = default_buffer_prototype,
prototype: BufferPrototype | None = None,
) -> None:
"""Modify a selection of individual blocks, by providing the chunk indices
(coordinates) for each block to be modified.
Expand Down Expand Up @@ -1950,6 +1975,8 @@ def set_block_selection(
vindex, oindex, blocks, __getitem__, __setitem__

"""
if prototype is None:
prototype = default_buffer_prototype()
indexer = BlockIndexer(selection, self.shape, self.metadata.chunk_grid)
sync(self._async_array._set_selection(indexer, value, fields=fields, prototype=prototype))

Expand Down
18 changes: 17 additions & 1 deletion src/zarr/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
import numpy.typing as npt

from zarr.common import ChunkCoords
from zarr.registry import (
get_buffer_class,
get_ndbuffer_class,
register_buffer,
register_ndbuffer,
)

if TYPE_CHECKING:
from typing_extensions import Self
Expand Down Expand Up @@ -479,4 +485,14 @@ class BufferPrototype(NamedTuple):


# The default buffer prototype used throughout the Zarr codebase.
default_buffer_prototype = BufferPrototype(buffer=Buffer, nd_buffer=NDBuffer)
def default_buffer_prototype() -> BufferPrototype:
return BufferPrototype(buffer=get_buffer_class(), nd_buffer=get_ndbuffer_class())


# The numpy prototype used for E.g. when reading the shard index
def numpy_buffer_prototype() -> BufferPrototype:
return BufferPrototype(buffer=Buffer, nd_buffer=NDBuffer)


register_buffer(Buffer)
register_ndbuffer(NDBuffer)
11 changes: 6 additions & 5 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec
from zarr.array_spec import ArraySpec
from zarr.buffer import Buffer, NDBuffer
from zarr.buffer import Buffer, NDBuffer, default_buffer_prototype
from zarr.common import JSON, to_thread
from zarr.registry import get_ndbuffer_class


@dataclass(frozen=True)
Expand All @@ -34,7 +35,7 @@ async def _decode_single(
if str(chunk_numpy_array.dtype) != chunk_spec.dtype:
chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype)

return NDBuffer.from_numpy_array(chunk_numpy_array)
return get_ndbuffer_class().from_numpy_array(chunk_numpy_array)

async def _encode_single(
self,
Expand All @@ -55,7 +56,7 @@ async def _encode_single(
else:
encoded_chunk_bytes = ensure_bytes(chunk_numpy_array)

return Buffer.from_bytes(encoded_chunk_bytes)
return default_buffer_prototype().buffer.from_bytes(encoded_chunk_bytes)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
Expand Down Expand Up @@ -86,7 +87,7 @@ async def _decode_single(
order=chunk_spec.order,
)

return NDBuffer.from_ndarray_like(chunk_ndarray)
return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)

async def _encode_single(
self,
Expand All @@ -99,7 +100,7 @@ async def _encode_single(
filter = numcodecs.get_codec(filter_metadata)
chunk_ndarray = await to_thread(filter.encode, chunk_ndarray)

return NDBuffer.from_ndarray_like(chunk_ndarray)
return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
2 changes: 1 addition & 1 deletion src/zarr/codecs/blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from zarr.abc.codec import BytesBytesCodec
from zarr.array_spec import ArraySpec
from zarr.buffer import Buffer, as_numpy_array_wrapper
from zarr.codecs.registry import register_codec
from zarr.common import JSON, parse_enum, parse_named_configuration, to_thread
from zarr.registry import register_codec

if TYPE_CHECKING:
from typing_extensions import Self
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/codecs/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from zarr.abc.codec import ArrayBytesCodec
from zarr.array_spec import ArraySpec
from zarr.buffer import Buffer, NDArrayLike, NDBuffer
from zarr.codecs.registry import register_codec
from zarr.common import JSON, parse_enum, parse_named_configuration
from zarr.registry import register_codec

if TYPE_CHECKING:
from typing_extensions import Self
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/codecs/crc32c_.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from zarr.abc.codec import BytesBytesCodec
from zarr.array_spec import ArraySpec
from zarr.buffer import Buffer
from zarr.codecs.registry import register_codec
from zarr.common import JSON, parse_named_configuration
from zarr.registry import register_codec

if TYPE_CHECKING:
from typing_extensions import Self
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/codecs/gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from zarr.abc.codec import BytesBytesCodec
from zarr.array_spec import ArraySpec
from zarr.buffer import Buffer, as_numpy_array_wrapper
from zarr.codecs.registry import register_codec
from zarr.common import JSON, parse_named_configuration, to_thread
from zarr.registry import register_codec

if TYPE_CHECKING:
from typing_extensions import Self
Expand Down
Loading