Skip to content

Commit 122760f

Browse files
d-v-bjhammandcherian
authored
Feat/latency store (#2474)
* feat: add wrapperstore * feat: add latencystore * rename noisysetter -> noisygetter * rename _wrapped to _store * loggingstore inherits from wrapperstore * Update src/zarr/storage/wrapper.py Co-authored-by: Joe Hamman <jhamman1@gmail.com> * back to asynciterators * update docstrings --------- Co-authored-by: Joe Hamman <jhamman1@gmail.com> Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
1 parent 01b73a7 commit 122760f

File tree

5 files changed

+266
-29
lines changed

5 files changed

+266
-29
lines changed

src/zarr/storage/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from zarr.storage.logging import LoggingStore
44
from zarr.storage.memory import MemoryStore
55
from zarr.storage.remote import RemoteStore
6+
from zarr.storage.wrapper import WrapperStore
67
from zarr.storage.zip import ZipStore
78

89
__all__ = [
@@ -12,6 +13,7 @@
1213
"RemoteStore",
1314
"StoreLike",
1415
"StorePath",
16+
"WrapperStore",
1517
"ZipStore",
1618
"make_store_path",
1719
]

src/zarr/storage/logging.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,19 @@
77
from contextlib import contextmanager
88
from typing import TYPE_CHECKING, Any
99

10-
from zarr.abc.store import ByteRangeRequest, Store
10+
from zarr.abc.store import Store
11+
from zarr.storage.wrapper import WrapperStore
1112

1213
if TYPE_CHECKING:
1314
from collections.abc import AsyncIterator, Generator, Iterable
1415

16+
from zarr.abc.store import ByteRangeRequest
1517
from zarr.core.buffer import Buffer, BufferPrototype
1618

19+
counter: defaultdict[str, int]
20+
1721

18-
class LoggingStore(Store):
22+
class LoggingStore(WrapperStore[Store]):
1923
"""
2024
Store wrapper that logs all calls to the wrapped store.
2125
@@ -34,7 +38,6 @@ class LoggingStore(Store):
3438
Counter of number of times each method has been called
3539
"""
3640

37-
_store: Store
3841
counter: defaultdict[str, int]
3942

4043
def __init__(
@@ -43,11 +46,10 @@ def __init__(
4346
log_level: str = "DEBUG",
4447
log_handler: logging.Handler | None = None,
4548
) -> None:
46-
self._store = store
49+
super().__init__(store)
4750
self.counter = defaultdict(int)
4851
self.log_level = log_level
4952
self.log_handler = log_handler
50-
5153
self._configure_logger(log_level, log_handler)
5254

5355
def _configure_logger(

src/zarr/storage/wrapper.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Generic, TypeVar
4+
5+
if TYPE_CHECKING:
6+
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
7+
from types import TracebackType
8+
from typing import Any, Self
9+
10+
from zarr.abc.store import ByteRangeRequest
11+
from zarr.core.buffer import Buffer, BufferPrototype
12+
from zarr.core.common import BytesLike
13+
14+
from zarr.abc.store import Store
15+
16+
T_Store = TypeVar("T_Store", bound=Store)
17+
18+
19+
class WrapperStore(Store, Generic[T_Store]):
20+
"""
21+
A store class that wraps an existing ``Store`` instance.
22+
By default all of the store methods are delegated to the wrapped store instance, which is
23+
accessible via the ``._store`` attribute of this class.
24+
25+
Use this class to modify or extend the behavior of the other store classes.
26+
"""
27+
28+
_store: T_Store
29+
30+
def __init__(self, store: T_Store) -> None:
31+
self._store = store
32+
33+
@classmethod
34+
async def open(cls: type[Self], store_cls: type[T_Store], *args: Any, **kwargs: Any) -> Self:
35+
store = store_cls(*args, **kwargs)
36+
await store._open()
37+
return cls(store=store)
38+
39+
def __enter__(self) -> Self:
40+
return type(self)(self._store.__enter__())
41+
42+
def __exit__(
43+
self,
44+
exc_type: type[BaseException] | None,
45+
exc_value: BaseException | None,
46+
traceback: TracebackType | None,
47+
) -> None:
48+
return self._store.__exit__(exc_type, exc_value, traceback)
49+
50+
async def _open(self) -> None:
51+
await self._store._open()
52+
53+
async def _ensure_open(self) -> None:
54+
await self._store._ensure_open()
55+
56+
async def is_empty(self, prefix: str) -> bool:
57+
return await self._store.is_empty(prefix)
58+
59+
async def clear(self) -> None:
60+
return await self._store.clear()
61+
62+
@property
63+
def read_only(self) -> bool:
64+
return self._store.read_only
65+
66+
def _check_writable(self) -> None:
67+
return self._store._check_writable()
68+
69+
def __eq__(self, value: object) -> bool:
70+
return type(self) is type(value) and self._store.__eq__(value)
71+
72+
async def get(
73+
self, key: str, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
74+
) -> Buffer | None:
75+
return await self._store.get(key, prototype, byte_range)
76+
77+
async def get_partial_values(
78+
self,
79+
prototype: BufferPrototype,
80+
key_ranges: Iterable[tuple[str, ByteRangeRequest]],
81+
) -> list[Buffer | None]:
82+
return await self._store.get_partial_values(prototype, key_ranges)
83+
84+
async def exists(self, key: str) -> bool:
85+
return await self._store.exists(key)
86+
87+
async def set(self, key: str, value: Buffer) -> None:
88+
await self._store.set(key, value)
89+
90+
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
91+
return await self._store.set_if_not_exists(key, value)
92+
93+
async def _set_many(self, values: Iterable[tuple[str, Buffer]]) -> None:
94+
await self._store._set_many(values)
95+
96+
@property
97+
def supports_writes(self) -> bool:
98+
return self._store.supports_writes
99+
100+
@property
101+
def supports_deletes(self) -> bool:
102+
return self._store.supports_deletes
103+
104+
async def delete(self, key: str) -> None:
105+
await self._store.delete(key)
106+
107+
@property
108+
def supports_partial_writes(self) -> bool:
109+
return self._store.supports_partial_writes
110+
111+
async def set_partial_values(
112+
self, key_start_values: Iterable[tuple[str, int, BytesLike]]
113+
) -> None:
114+
return await self._store.set_partial_values(key_start_values)
115+
116+
@property
117+
def supports_listing(self) -> bool:
118+
return self._store.supports_listing
119+
120+
def list(self) -> AsyncIterator[str]:
121+
return self._store.list()
122+
123+
def list_prefix(self, prefix: str) -> AsyncIterator[str]:
124+
return self._store.list_prefix(prefix)
125+
126+
def list_dir(self, prefix: str) -> AsyncIterator[str]:
127+
return self._store.list_dir(prefix)
128+
129+
async def delete_dir(self, prefix: str) -> None:
130+
return await self._store.delete_dir(prefix)
131+
132+
def close(self) -> None:
133+
self._store.close()
134+
135+
async def _get_many(
136+
self, requests: Iterable[tuple[str, BufferPrototype, ByteRangeRequest | None]]
137+
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
138+
async for req in self._store._get_many(requests):
139+
yield req

src/zarr/testing/store.py

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
1+
from __future__ import annotations
2+
3+
import asyncio
14
import pickle
2-
from typing import Any, Generic, TypeVar
5+
from typing import TYPE_CHECKING, Generic, TypeVar
6+
7+
from zarr.storage.wrapper import WrapperStore
8+
9+
if TYPE_CHECKING:
10+
from typing import Any
11+
12+
from zarr.abc.store import ByteRangeRequest
13+
from zarr.core.buffer.core import BufferPrototype
314

415
import pytest
516

6-
from zarr.abc.store import Store
17+
from zarr.abc.store import ByteRangeRequest, Store
718
from zarr.core.buffer import Buffer, default_buffer_prototype
819
from zarr.core.sync import _collect_aiterator
920
from zarr.storage._utils import _normalize_interval_index
@@ -319,25 +330,62 @@ async def test_set_if_not_exists(self, store: S) -> None:
319330
result = await store.get("k2", default_buffer_prototype())
320331
assert result == new
321332

322-
async def test_getsize(self, store: S) -> None:
323-
key = "k"
324-
data = self.buffer_cls.from_bytes(b"0" * 10)
325-
await self.set(store, key, data)
326-
327-
result = await store.getsize(key)
328-
assert isinstance(result, int)
329-
assert result > 0
330-
331-
async def test_getsize_raises(self, store: S) -> None:
332-
with pytest.raises(FileNotFoundError):
333-
await store.getsize("not-a-real-key")
334-
335-
async def test_getsize_prefix(self, store: S) -> None:
336-
prefix = "array/c/"
337-
for i in range(10):
338-
data = self.buffer_cls.from_bytes(b"0" * 10)
339-
await self.set(store, f"{prefix}/{i}", data)
340-
341-
result = await store.getsize_prefix(prefix)
342-
assert isinstance(result, int)
343-
assert result > 0
333+
334+
class LatencyStore(WrapperStore[Store]):
335+
"""
336+
A wrapper class that takes any store class in its constructor and
337+
adds latency to the `set` and `get` methods. This can be used for
338+
performance testing.
339+
"""
340+
341+
get_latency: float
342+
set_latency: float
343+
344+
def __init__(self, cls: Store, *, get_latency: float = 0, set_latency: float = 0) -> None:
345+
self.get_latency = float(get_latency)
346+
self.set_latency = float(set_latency)
347+
self._store = cls
348+
349+
async def set(self, key: str, value: Buffer) -> None:
350+
"""
351+
Add latency to the ``set`` method.
352+
353+
Calls ``asyncio.sleep(self.set_latency)`` before invoking the wrapped ``set`` method.
354+
355+
Parameters
356+
----------
357+
key : str
358+
The key to set
359+
value : Buffer
360+
The value to set
361+
362+
Returns
363+
-------
364+
None
365+
"""
366+
await asyncio.sleep(self.set_latency)
367+
await self._store.set(key, value)
368+
369+
async def get(
370+
self, key: str, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
371+
) -> Buffer | None:
372+
"""
373+
Add latency to the ``get`` method.
374+
375+
Calls ``asyncio.sleep(self.get_latency)`` before invoking the wrapped ``get`` method.
376+
377+
Parameters
378+
----------
379+
key : str
380+
The key to get
381+
prototype : BufferPrototype
382+
The BufferPrototype to use.
383+
byte_range : ByteRangeRequest, optional
384+
An optional byte range.
385+
386+
Returns
387+
-------
388+
buffer : Buffer or None
389+
"""
390+
await asyncio.sleep(self.get_latency)
391+
return await self._store.get(key, prototype=prototype, byte_range=byte_range)

tests/test_store/test_wrapper.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
5+
import pytest
6+
7+
from zarr.core.buffer.cpu import Buffer, buffer_prototype
8+
from zarr.storage.wrapper import WrapperStore
9+
10+
if TYPE_CHECKING:
11+
from zarr.abc.store import Store
12+
from zarr.core.buffer.core import BufferPrototype
13+
14+
15+
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=True)
16+
async def test_wrapped_set(store: Store, capsys: pytest.CaptureFixture[str]) -> None:
17+
# define a class that prints when it sets
18+
class NoisySetter(WrapperStore):
19+
async def set(self, key: str, value: Buffer) -> None:
20+
print(f"setting {key}")
21+
await super().set(key, value)
22+
23+
key = "foo"
24+
value = Buffer.from_bytes(b"bar")
25+
store_wrapped = NoisySetter(store)
26+
await store_wrapped.set(key, value)
27+
captured = capsys.readouterr()
28+
assert f"setting {key}" in captured.out
29+
assert await store_wrapped.get(key, buffer_prototype) == value
30+
31+
32+
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=True)
33+
async def test_wrapped_get(store: Store, capsys: pytest.CaptureFixture[str]) -> None:
34+
# define a class that prints when it sets
35+
class NoisyGetter(WrapperStore):
36+
def get(self, key: str, prototype: BufferPrototype) -> None:
37+
print(f"getting {key}")
38+
return super().get(key, prototype=prototype)
39+
40+
key = "foo"
41+
value = Buffer.from_bytes(b"bar")
42+
store_wrapped = NoisyGetter(store)
43+
await store_wrapped.set(key, value)
44+
assert await store_wrapped.get(key, buffer_prototype) == value
45+
captured = capsys.readouterr()
46+
assert f"getting {key}" in captured.out

0 commit comments

Comments
 (0)