Skip to content

Codecs without array metadata #1632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 36 additions & 21 deletions src/zarr/v3/abc/codec.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
# Notes:
# 1. These are missing methods described in the spec. I expected to see these method definitions:
# def compute_encoded_representation_type(self, decoded_representation_type):
# def encode(self, decoded_value):
# def decode(self, encoded_value, decoded_representation_type):
# def partial_decode(self, input_handle, decoded_representation_type, decoded_regions):
# def compute_encoded_size(self, input_size):
# 2. Understand why array metadata is included on all codecs


from __future__ import annotations

from abc import abstractmethod, ABC
Expand All @@ -20,30 +10,39 @@


if TYPE_CHECKING:
from zarr.v3.metadata import CoreArrayMetadata, CodecMetadata
from zarr.v3.metadata import (
ArraySpec,
ArrayMetadata,
DataType,
CodecMetadata,
RuntimeConfiguration,
)


class Codec(ABC):
is_fixed_size: bool
array_metadata: CoreArrayMetadata

@classmethod
@abstractmethod
def compute_encoded_size(self, input_byte_length: int) -> int:
def get_metadata_class(cls) -> Type[CodecMetadata]:
pass

def resolve_metadata(self) -> CoreArrayMetadata:
return self.array_metadata

@classmethod
@abstractmethod
def from_metadata(
cls, codec_metadata: "CodecMetadata", array_metadata: CoreArrayMetadata
) -> Codec:
def from_metadata(cls, codec_metadata: CodecMetadata) -> Codec:
pass

@classmethod
@abstractmethod
def get_metadata_class(cls) -> "Type[CodecMetadata]":
def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int:
pass

def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
return chunk_spec

def evolve(self, *, ndim: int, data_type: DataType) -> Codec:
return self

def validate(self, array_metadata: ArrayMetadata) -> None:
pass


Expand All @@ -52,13 +51,17 @@ class ArrayArrayCodec(Codec):
async def decode(
self,
chunk_array: np.ndarray,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> np.ndarray:
pass

@abstractmethod
async def encode(
self,
chunk_array: np.ndarray,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> Optional[np.ndarray]:
pass

Expand All @@ -68,13 +71,17 @@ class ArrayBytesCodec(Codec):
async def decode(
self,
chunk_array: BytesLike,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> np.ndarray:
pass

@abstractmethod
async def encode(
self,
chunk_array: np.ndarray,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> Optional[BytesLike]:
pass

Expand All @@ -85,6 +92,8 @@ async def decode_partial(
self,
store_path: StorePath,
selection: SliceSelection,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> Optional[np.ndarray]:
pass

Expand All @@ -96,6 +105,8 @@ async def encode_partial(
store_path: StorePath,
chunk_array: np.ndarray,
selection: SliceSelection,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> None:
pass

Expand All @@ -105,12 +116,16 @@ class BytesBytesCodec(Codec):
async def decode(
self,
chunk_array: BytesLike,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> BytesLike:
pass

@abstractmethod
async def encode(
self,
chunk_array: BytesLike,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> Optional[BytesLike]:
pass
62 changes: 38 additions & 24 deletions src/zarr/v3/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
import numpy as np
from attr import evolve, frozen

from zarr.v3.abc.codec import ArrayBytesCodecPartialDecodeMixin


# from zarr.v3.array_v2 import ArrayV2
from zarr.v3.codecs import CodecMetadata, CodecPipeline, bytes_codec
from zarr.v3.codecs.registry import get_codec_from_metadata
from zarr.v3.common import (
ZARR_JSON,
ChunkCoords,
Expand All @@ -31,6 +30,7 @@
from zarr.v3.indexing import BasicIndexer, all_chunk_coords, is_total_slice
from zarr.v3.metadata import (
ArrayMetadata,
ArraySpec,
DataType,
DefaultChunkKeyEncodingConfigurationMetadata,
DefaultChunkKeyEncodingMetadata,
Expand All @@ -41,7 +41,6 @@
V2ChunkKeyEncodingMetadata,
dtype_to_data_type,
)
from zarr.v3.codecs.sharding import ShardingCodec
from zarr.v3.store import StoreLike, StorePath, make_store_path
from zarr.v3.sync import sync

Expand Down Expand Up @@ -118,8 +117,11 @@ async def create(
metadata=metadata,
store_path=store_path,
runtime_configuration=runtime_configuration,
codec_pipeline=CodecPipeline.from_metadata(
metadata.codecs, metadata.get_core_metadata(runtime_configuration)
codec_pipeline=CodecPipeline.create(
[
get_codec_from_metadata(codec).evolve(ndim=len(shape), data_type=data_type)
for codec in codecs
]
),
)

Expand All @@ -134,13 +136,17 @@ def from_json(
runtime_configuration: RuntimeConfiguration,
) -> AsyncArray:
metadata = ArrayMetadata.from_json(zarr_json)
codecs = [
get_codec_from_metadata(codec).evolve(
ndim=len(metadata.shape), data_type=metadata.data_type
)
for codec in metadata.codecs
]
async_array = cls(
metadata=metadata,
store_path=store_path,
runtime_configuration=runtime_configuration,
codec_pipeline=CodecPipeline.from_metadata(
metadata.codecs, metadata.get_core_metadata(runtime_configuration)
),
codec_pipeline=CodecPipeline.create(codecs),
)
async_array._validate_metadata()
return async_array
Expand Down Expand Up @@ -240,6 +246,7 @@ def _validate_metadata(self) -> None:
self.metadata.dimension_names
), "`dimension_names` and `shape` need to have the same number of dimensions."
assert self.metadata.fill_value is not None, "`fill_value` is required."
self.codec_pipeline.validate(self.metadata)

async def _read_chunk(
self,
Expand All @@ -248,15 +255,14 @@ async def _read_chunk(
out_selection: SliceSelection,
out: np.ndarray,
):
chunk_spec = self.metadata.get_chunk_spec(chunk_coords)
chunk_key_encoding = self.metadata.chunk_key_encoding
chunk_key = chunk_key_encoding.encode_chunk_key(chunk_coords)
store_path = self.store_path / chunk_key

if len(self.codec_pipeline.codecs) == 1 and isinstance(
self.codec_pipeline.codecs[0], ArrayBytesCodecPartialDecodeMixin
):
chunk_array = await self.codec_pipeline.codecs[0].decode_partial(
store_path, chunk_selection
if self.codec_pipeline.supports_partial_decode:
chunk_array = await self.codec_pipeline.decode_partial(
store_path, chunk_selection, chunk_spec, self.runtime_configuration
)
if chunk_array is not None:
out[out_selection] = chunk_array
Expand All @@ -265,7 +271,9 @@ async def _read_chunk(
else:
chunk_bytes = await store_path.get()
if chunk_bytes is not None:
chunk_array = await self.codec_pipeline.decode(chunk_bytes)
chunk_array = await self.codec_pipeline.decode(
chunk_bytes, chunk_spec, self.runtime_configuration
)
tmp = chunk_array[chunk_selection]
out[out_selection] = tmp
else:
Expand Down Expand Up @@ -316,6 +324,7 @@ async def _write_chunk(
chunk_selection: SliceSelection,
out_selection: SliceSelection,
):
chunk_spec = self.metadata.get_chunk_spec(chunk_coords)
chunk_key_encoding = self.metadata.chunk_key_encoding
chunk_key = chunk_key_encoding.encode_chunk_key(chunk_coords)
store_path = self.store_path / chunk_key
Expand All @@ -330,17 +339,16 @@ async def _write_chunk(
chunk_array.fill(value)
else:
chunk_array = value[out_selection]
await self._write_chunk_to_store(store_path, chunk_array)
await self._write_chunk_to_store(store_path, chunk_array, chunk_spec)

elif len(self.codec_pipeline.codecs) == 1 and isinstance(
self.codec_pipeline.codecs[0], ShardingCodec
):
sharding_codec = self.codec_pipeline.codecs[0]
elif self.codec_pipeline.supports_partial_encode:
# print("encode_partial", chunk_coords, chunk_selection, repr(self))
await sharding_codec.encode_partial(
await self.codec_pipeline.encode_partial(
store_path,
value[out_selection],
chunk_selection,
chunk_spec,
self.runtime_configuration,
)
else:
# writing partial chunks
Expand All @@ -356,18 +364,24 @@ async def _write_chunk(
chunk_array.fill(self.metadata.fill_value)
else:
chunk_array = (
await self.codec_pipeline.decode(chunk_bytes)
await self.codec_pipeline.decode(
chunk_bytes, chunk_spec, self.runtime_configuration
)
).copy() # make a writable copy
chunk_array[chunk_selection] = value[out_selection]

await self._write_chunk_to_store(store_path, chunk_array)
await self._write_chunk_to_store(store_path, chunk_array, chunk_spec)

async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.ndarray):
async def _write_chunk_to_store(
self, store_path: StorePath, chunk_array: np.ndarray, chunk_spec: ArraySpec
):
if np.all(chunk_array == self.metadata.fill_value):
# chunks that only contain fill_value will be removed
await store_path.delete()
else:
chunk_bytes = await self.codec_pipeline.encode(chunk_array)
chunk_bytes = await self.codec_pipeline.encode(
chunk_array, chunk_spec, self.runtime_configuration
)
if chunk_bytes is None:
await store_path.delete()
else:
Expand Down
45 changes: 29 additions & 16 deletions src/zarr/v3/array_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
to_thread,
)
from zarr.v3.indexing import BasicIndexer, all_chunk_coords, is_total_slice
from zarr.v3.metadata import ArrayV2Metadata, RuntimeConfiguration
from zarr.v3.metadata import ArrayV2Metadata, CodecMetadata, RuntimeConfiguration
from zarr.v3.store import StoreLike, StorePath, make_store_path
from zarr.v3.sync import sync

Expand Down Expand Up @@ -83,12 +83,14 @@ async def create_async(
order=order,
dimension_separator=dimension_separator,
fill_value=0 if fill_value is None else fill_value,
compressor=numcodecs.get_codec(compressor).get_config()
if compressor is not None
else None,
filters=[numcodecs.get_codec(filter).get_config() for filter in filters]
if filters is not None
else None,
compressor=(
numcodecs.get_codec(compressor).get_config() if compressor is not None else None
),
filters=(
[numcodecs.get_codec(filter).get_config() for filter in filters]
if filters is not None
else None
),
)
array = cls(
metadata=metadata,
Expand Down Expand Up @@ -441,22 +443,29 @@ async def convert_to_v3_async(self) -> Array:
from zarr.v3.common import ZARR_JSON
from zarr.v3.metadata import (
ArrayMetadata,
DataType,
RegularChunkGridConfigurationMetadata,
RegularChunkGridMetadata,
V2ChunkKeyEncodingConfigurationMetadata,
V2ChunkKeyEncodingMetadata,
dtype_to_data_type,
)
from zarr.v3.codecs.blosc import (
BloscCodecConfigurationMetadata,
BloscCodecMetadata,
blosc_shuffle_int_to_str,
)
from zarr.v3.codecs.bytes import (
BytesCodecConfigurationMetadata,
BytesCodecMetadata,
CodecMetadata,
DataType,
)
from zarr.v3.codecs.gzip import (
GzipCodecConfigurationMetadata,
GzipCodecMetadata,
RegularChunkGridConfigurationMetadata,
RegularChunkGridMetadata,
)
from zarr.v3.codecs.transpose import (
TransposeCodecConfigurationMetadata,
TransposeCodecMetadata,
V2ChunkKeyEncodingConfigurationMetadata,
V2ChunkKeyEncodingMetadata,
blosc_shuffle_int_to_str,
dtype_to_data_type,
)

data_type = DataType[dtype_to_data_type[self.metadata.dtype.str]]
Expand All @@ -476,7 +485,11 @@ async def convert_to_v3_async(self) -> Array:

if self.metadata.order == "F":
codecs.append(
TransposeCodecMetadata(configuration=TransposeCodecConfigurationMetadata(order="F"))
TransposeCodecMetadata(
configuration=TransposeCodecConfigurationMetadata(
order=tuple(reversed(range(self.metadata.ndim)))
)
)
)
codecs.append(
BytesCodecMetadata(configuration=BytesCodecConfigurationMetadata(endian=endian))
Expand Down
Loading