Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Store.getsize #2426

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/guide/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,13 @@ Zarr-Python :class:`zarr.abc.store.Store` API is meant to be extended. The Store
Class includes all of the methods needed to be a fully operational store in Zarr Python.
Zarr also provides a test harness for custom stores: :class:`zarr.testing.store.StoreTests`.

``Store.get``
~~~~~~~~~~~~~

The ``prototype`` keyword of :func:`zarr.abc.store.Store.get` uses a default of
``None``. When given ``None``, implementations should use
:func:`zarr.buffer.default_buffer_prototype` to look up the prototype users have
configured.

.. _Zip Store Specification: https://github.com/zarr-developers/zarr-specs/pull/311
.. _Fsspec: https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#consolidated-metadata
68 changes: 67 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from itertools import starmap
from typing import TYPE_CHECKING, NamedTuple, Protocol, runtime_checkable

from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.common import concurrent_map
from zarr.core.config import config

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable
from types import TracebackType
Expand Down Expand Up @@ -197,14 +201,17 @@ def __eq__(self, value: object) -> bool:
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype | None = None,
byte_range: ByteRangeRequest | None = None,
) -> Buffer | None:
"""Retrieve the value associated with a given key.

Parameters
----------
key : str
prototype : BufferPrototype, optional
The prototype giving the buffer classes to use for buffers and nbuffers.
By default, :func:`zarr.buffer.default_buffer_prototype` is used.
byte_range : tuple[int | None, int | None], optional

Returns
Expand Down Expand Up @@ -399,6 +406,65 @@ async def _get_many(
for req in requests:
yield (req[0], await self.get(*req))

async def getsize(self, key: str) -> int:
"""
Return the size, in bytes, of a value in a Store.

Parameters
----------
key : str

Returns
-------
nbytes : int
The size of the value (in bytes).

Raises
------
FileNotFoundError
When the given key does not exist in the store.
"""
# Note to implementers: this default implementation is very inefficient since
# it requires reading the entire object. Many systems will have ways to get the
# size of an object without reading it.
value = await self.get(key, prototype=default_buffer_prototype())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason default_buffer_prototype() isn't the default to get?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a good reason to avoid dynamic default arguments, e.g. function invocations (and I think there's a linting rule that complains about such things). But that doesn't change the fact that default_buffer_prototype() is the default, which makes it annoying to specify. One idea that came up in the past was to statically associate a store with a buffer prototype, e.g. by making the store class generic, but this would reduce the flexibility of the store... although whether we realistically need the same store instance to return multiple different types of buffers is yet to be seen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be OK with defualt_buffer_prototype being an optional argument (| None, with a default of None, which means use default_buffer_prototype()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, some value like None or the literal string "default" would be perfect for this, at least in my opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small bits of awkwardness:

  1. We'll now require Store implementors to read the documentation to understand what the value of None / "default" for prototype means when they're implementing get. I think that's fine, but merits documenting.
  2. I might have missed it, but IIUC zarr.core is private, but zarr.core.buffer isn't re-exported from anywhere else. I'll add it to zarr.buffer unless anyone objects.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea that came up in the past was to statically associate a store with a buffer prototype

This thread came up in #1661, where I was struggling to understand the purpose of the prototype argument.

I think it would be nice to either statically associate stores with buffer prototypes or remove the prototypes altogether and let a store return a protocol instead of a specific class.

if value is None:
raise FileNotFoundError(key)
return len(value)

async def getsize_prefix(self, prefix: str) -> int:
"""
Return the size, in bytes, of all values under a prefix.

Parameters
----------
prefix : str
The prefix of the directory to measure.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we offer implementers the following in documentation?:

This function will be called by zarr using a prefix that is the path of a group, an array, or the root. Implementations can choose to do undefined behavior when that is not the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure... I was hoping we could somehow ensure that we don't call it with anything other than a group / array / root path, but users can directly use Store.getsize_prefix and they can do whatever.

LMK if you want any more specific guidance on what to do (e.g. raise a ValueError). I'm hesitant about trying to force required exceptions into an ABC / interface.

Copy link
Contributor Author

@TomAugspurger TomAugspurger Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant about trying to force required exceptions into an ABC / interface.

And now I'm noticing that I've done exactly that in getsize, with requiring implementations to raise FileNotFoundError if the key isn't found :)


Returns
-------
nbytes : int
The sum of the sizes of the values in the directory (in bytes).

See Also
--------
zarr.Array.nbytes_stored
Store.getsize

Notes
-----
``getsize_prefix`` is just provided as a potentially faster alternative to
listing all the keys under a prefix calling :meth:`Store.getsize` on each.

In general, ``prefix`` should be the path of an Array or Group in the Store.
Implementations may differ on the behavior when some other ``prefix``
is provided.
"""
keys = ((x,) async for x in self.list_prefix(prefix))
limit = config.get("async.concurrency")
sizes = await concurrent_map(keys, self.getsize, limit=limit)
return sum(sizes)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This materializes the full list of keys in memory, can we maintain the generator longer to avoid that?

Also, this has unlimited concurrency, for a potentially very large number of keys. It could easily create millions of async tasks. We should probably run in chunks limited by the value of the concurrency setting.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See concurrent_map for an example

Copy link
Contributor Author

@TomAugspurger TomAugspurger Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This materializes the full list of keys in memory, can we maintain the generator longer to avoid that?

I don't immediately see how that's possible.

The best I'm coming up with is a fold-like function that asynchronously iterates through keys from list_prefix and (asynchronously) calls self.getsize to update the size. Sounds kinda complicated.

FWIW, it looks like concurrent_map wants an iterable of items:

>           return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
E           TypeError: 'async_generator' object is not iterable```

In 7cbc5003eb4d397a7cf0f019bad85cd1c3d0927a I've hacked in some support for AsyncIterable there. I haven't had enough coffee to figure out what the flow of

return await asyncio.gather(*[asyncio.ensure_future(run(item)) async for item in items])

is. I'm a bit worried the async for item in items is happening immediately, so we end up building that list of keys in memory anyway.

We should probably run in chunks limited by the value of the concurrency setting.

Fixed. We should probably replace all instances of asyncio.gather with a concurrency-limited version. I'll make a separate issue for that.



@runtime_checkable
class ByteGetter(Protocol):
Expand Down
5 changes: 5 additions & 0 deletions src/zarr/buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from zarr.core.buffer import default_buffer_prototype

__all__ = [
"default_buffer_prototype",
]
13 changes: 13 additions & 0 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,9 @@ async def nchunks_initialized(self) -> int:
"""
return len(await chunks_initialized(self))

async def nbytes_stored(self) -> int:
return await self.store_path.store.getsize_prefix(self.store_path.path)

def _iter_chunk_coords(
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
) -> Iterator[ChunkCoords]:
Expand Down Expand Up @@ -1540,6 +1543,16 @@ def nchunks_initialized(self) -> int:
"""
return sync(self._async_array.nchunks_initialized())

def nbytes_stored(self) -> int:
"""
Determine the size, in bytes, of the array actually written to the store.

Returns
-------
size : int
"""
return sync(self._async_array.nbytes_stored())

def _iter_chunk_keys(
self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
) -> Iterator[str]:
Expand Down
16 changes: 12 additions & 4 deletions src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
import functools
import operator
from collections.abc import Iterable, Mapping
from collections.abc import AsyncIterable, Iterable, Mapping
from enum import Enum
from itertools import starmap
from typing import (
Expand Down Expand Up @@ -50,10 +50,15 @@ def product(tup: ChunkCoords) -> int:


async def concurrent_map(
items: Iterable[T], func: Callable[..., Awaitable[V]], limit: int | None = None
items: Iterable[T] | AsyncIterable[T],
func: Callable[..., Awaitable[V]],
limit: int | None = None,
) -> list[V]:
if limit is None:
return await asyncio.gather(*list(starmap(func, items)))
if isinstance(items, AsyncIterable):
return await asyncio.gather(*list(starmap(func, [x async for x in items])))
else:
return await asyncio.gather(*list(starmap(func, items)))

else:
sem = asyncio.Semaphore(limit)
Expand All @@ -62,7 +67,10 @@ async def run(item: tuple[Any]) -> V:
async with sem:
return await func(*item)

return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
if isinstance(items, AsyncIterable):
return await asyncio.gather(*[asyncio.ensure_future(run(item)) async for item in items])
else:
return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])


E = TypeVar("E", bound=Enum)
Expand Down
8 changes: 7 additions & 1 deletion src/zarr/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from zarr.abc.store import ByteRangeRequest, Store
from zarr.core.buffer import Buffer
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.common import concurrent_map

if TYPE_CHECKING:
Expand Down Expand Up @@ -143,10 +144,12 @@ def __eq__(self, other: object) -> bool:
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype | None = None,
byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
# docstring inherited
if prototype is None:
prototype = default_buffer_prototype()
if not self._is_open:
await self._open()
assert isinstance(key, str)
Expand Down Expand Up @@ -243,3 +246,6 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
yield key.as_posix().replace(to_strip, "")
except (FileNotFoundError, NotADirectoryError):
pass

async def getsize(self, key: str) -> int:
return os.path.getsize(self.root / key)
2 changes: 1 addition & 1 deletion src/zarr/storage/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def __eq__(self, other: object) -> bool:
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype | None = None,
byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
# docstring inherited
Expand Down
5 changes: 4 additions & 1 deletion src/zarr/storage/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from zarr.abc.store import ByteRangeRequest, Store
from zarr.core.buffer import Buffer, gpu
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.common import concurrent_map
from zarr.storage._utils import _normalize_interval_index

Expand Down Expand Up @@ -83,10 +84,12 @@ def __eq__(self, other: object) -> bool:
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype | None = None,
byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
# docstring inherited
if prototype is None:
prototype = default_buffer_prototype()
if not self._is_open:
await self._open()
assert isinstance(key, str)
Expand Down
18 changes: 17 additions & 1 deletion src/zarr/storage/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING, Any, Self

from zarr.abc.store import ByteRangeRequest, Store
from zarr.core.buffer.core import default_buffer_prototype
from zarr.storage.common import _dereference_path

if TYPE_CHECKING:
Expand Down Expand Up @@ -217,10 +218,12 @@ def __eq__(self, other: object) -> bool:
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype | None = None,
byte_range: ByteRangeRequest | None = None,
) -> Buffer | None:
# docstring inherited
if prototype is None:
prototype = default_buffer_prototype()
if not self._is_open:
await self._open()
path = _dereference_path(self.path, key)
Expand Down Expand Up @@ -344,3 +347,16 @@ async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
f"{self.path}/{prefix}", detail=False, maxdepth=None, withdirs=False
):
yield onefile.removeprefix(f"{self.path}/")

async def getsize(self, key: str) -> int:
path = _dereference_path(self.path, key)
info = await self.fs._info(path)

size = info.get("size")

if size is None:
# Not all filesystems support size. Fall back to reading the entire object
return await super().getsize(key)
else:
# fsspec doesn't have typing. We'll need to assume or verify this is true
return int(size)
5 changes: 4 additions & 1 deletion src/zarr/storage/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from zarr.abc.store import ByteRangeRequest, Store
from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.buffer.core import default_buffer_prototype

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable
Expand Down Expand Up @@ -166,11 +167,13 @@ def _get(
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype | None = None,
byte_range: ByteRangeRequest | None = None,
) -> Buffer | None:
# docstring inherited
assert isinstance(key, str)
if prototype is None:
prototype = default_buffer_prototype()

with self._lock:
return self._get(key, prototype=prototype, byte_range=byte_range)
Expand Down
32 changes: 32 additions & 0 deletions src/zarr/testing/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ async def test_get(
expected = data_buf[start : start + length]
assert_bytes_equal(observed, expected)

async def test_get_default_prototype(self, store: S) -> None:
key = "c/0"
data = b"\x01\x02\x03\x04"
data_buf = self.buffer_cls.from_bytes(data)
await self.set(store, key, data_buf)
observed = await store.get(key)
expected = data_buf[:]
assert_bytes_equal(observed, expected)

async def test_get_many(self, store: S) -> None:
"""
Ensure that multiple keys can be retrieved at once with the _get_many method.
Expand Down Expand Up @@ -352,3 +361,26 @@ async def test_set_if_not_exists(self, store: S) -> None:

result = await store.get("k2", default_buffer_prototype())
assert result == new

async def test_getsize(self, store: S) -> None:
key = "k"
data = self.buffer_cls.from_bytes(b"0" * 10)
await self.set(store, key, data)

result = await store.getsize(key)
assert isinstance(result, int)
assert result > 0

async def test_getsize_raises(self, store: S) -> None:
with pytest.raises(FileNotFoundError):
await store.getsize("not-a-real-key")

async def test_getsize_prefix(self, store: S) -> None:
prefix = "array/c/"
for i in range(10):
data = self.buffer_cls.from_bytes(b"0" * 10)
await self.set(store, f"{prefix}/{i}", data)

result = await store.getsize_prefix(prefix)
assert isinstance(result, int)
assert result > 0
24 changes: 24 additions & 0 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,30 @@ async def test_chunks_initialized() -> None:
assert observed == expected


def test_nbytes_stored() -> None:
arr = zarr.create(shape=(100,), chunks=(10,), dtype="i4")
result = arr.nbytes_stored()
assert result == 366 # the size of the metadata document. This is a fragile test.
arr[:50] = 1
result = arr.nbytes_stored()
assert result == 566 # the size with 5 chunks filled.
arr[50:] = 2
result = arr.nbytes_stored()
assert result == 766 # the size with all chunks filled.


async def test_nbytes_stored_async() -> None:
arr = await zarr.api.asynchronous.create(shape=(100,), chunks=(10,), dtype="i4")
result = await arr.nbytes_stored()
assert result == 366 # the size of the metadata document. This is a fragile test.
await arr.setitem(slice(50), 1)
result = await arr.nbytes_stored()
assert result == 566 # the size with 5 chunks filled.
await arr.setitem(slice(50, 100), 2)
result = await arr.nbytes_stored()
assert result == 766 # the size with all chunks filled.


def test_default_fill_values() -> None:
a = Array.create(MemoryStore({}, mode="w"), shape=5, chunk_shape=5, dtype="<U4")
assert a.fill_value == ""
Expand Down
2 changes: 1 addition & 1 deletion tests/test_codecs/test_blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from zarr import AsyncArray
from zarr.abc.store import Store
from zarr.buffer import default_buffer_prototype
from zarr.codecs import BloscCodec, BytesCodec, ShardingCodec
from zarr.core.buffer import default_buffer_prototype
from zarr.storage.common import StorePath


Expand Down
Loading