Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic working FsspecStore #1785

Merged
merged 36 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c21da45
Basic working FsspecStore
martindurant Apr 10, 2024
5489408
Merge branch 'v3' into v3_fsspec
d-v-b Apr 11, 2024
29fa0a4
upath to be optional
martindurant Apr 12, 2024
c2adda5
Merge branch 'v3_fsspec' of https://github.com/martindurant/zarr into…
martindurant Apr 12, 2024
320a671
fill out methods
martindurant Apr 13, 2024
79477f5
add fsspec to deps (I believe we want this)
martindurant Apr 13, 2024
9104bdc
fixes
martindurant Apr 14, 2024
581eafe
Merge branch 'v3' into v3_fsspec
martindurant May 29, 2024
20c9387
importable
martindurant May 29, 2024
b6bfa11
exceptions
martindurant May 29, 2024
02d8149
Merge branch 'v3' into v3_fsspec
martindurant May 29, 2024
9c402b6
Merge branch 'v3' into v3_fsspec
martindurant Jun 4, 2024
2d2a251
Merge branch 'v3' into v3_fsspec
jhamman Jun 4, 2024
e0c0ee4
Add simple test
martindurant Jun 4, 2024
0be84d2
Merge branch 'v3_fsspec' of https://github.com/martindurant/zarr into…
martindurant Jun 4, 2024
682f8e0
Add to test env
martindurant Jun 4, 2024
5315fda
fix typing
normanrz Jun 5, 2024
6eac094
Update src/zarr/store/remote.py
martindurant Jun 5, 2024
f5114a9
Merge remote-tracking branch 'origin/v3' into v3_fsspec
normanrz Jun 5, 2024
7920195
BufferPrototype
normanrz Jun 5, 2024
12baaa7
set up testing infrastructure for remote store
d-v-b Jun 9, 2024
095d72e
broken tests but get and set are implemented correctly for TestRemote…
d-v-b Jun 9, 2024
6ab1099
remove implementation of test_get, and make s3 fixture autoused, to r…
d-v-b Jun 9, 2024
a4ca371
Update tests/v3/test_store/test_remote.py
d-v-b Jun 10, 2024
0896cb8
don't use fsmap, and don't use os.path.join
d-v-b Jun 10, 2024
a4e1c20
Merge branch 'mdurant_v3_fsspec' of https://github.com/d-v-b/zarr-pyt…
d-v-b Jun 10, 2024
93a2c6a
scope s3 fixture to session, mark test_store_supports_partial_writes …
d-v-b Jun 10, 2024
7c22f74
Merge pull request #23 from d-v-b/mdurant_v3_fsspec
martindurant Jun 10, 2024
3c037c3
Update src/zarr/store/remote.py
martindurant Jun 10, 2024
3454da9
Fix most
martindurant Jun 10, 2024
9b6a6d3
fixed more
martindurant Jun 10, 2024
266fa37
fix rest
martindurant Jun 10, 2024
221cebc
Massage old v2 tests
martindurant Jun 10, 2024
1bfcbb5
just skip them..
martindurant Jun 10, 2024
362951e
Attribute rename to allowed_exceptions
martindurant Jun 11, 2024
3fb24da
Merge branch 'v3' into v3_fsspec
jhamman Jun 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ dependencies = [
'numpy>=1.24',
'fasteners',
'numcodecs>=0.10.0',
'fsspec>2024',
'crc32c',
'zstandard',
'typing_extensions',
'donfig'
'donfig',
'pytest'
]
dynamic = [
"version",
Expand Down Expand Up @@ -111,7 +113,12 @@ extra-dependencies = [
"pytest-cov",
"msgpack",
"lmdb",
"s3fs",
"pytest-asyncio",
"moto[s3]",
"flask-cors",
"flask",
"requests",
"mypy"
]
features = ["extra"]
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def _dereference_path(root: str, path: str) -> str:
assert isinstance(root, str)
assert isinstance(path, str)
root = root.rstrip("/")
path = f"{root}/{path}" if root != "" else path
path = f"{root}/{path}" if root else path
path = path.rstrip("/")
return path

Expand Down
191 changes: 139 additions & 52 deletions src/zarr/store/remote.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,190 @@
from __future__ import annotations

from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any

import fsspec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this import in RemoteStore.__init__? Would help us keep this an optional dep here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since fsspec is very small and itself has no further dependencies, making it required will reduce friction for users

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is fine -- at least for now.


from zarr.abc.store import Store
from zarr.buffer import Buffer, BufferPrototype
from zarr.buffer import Buffer, BufferPrototype, default_buffer_prototype
from zarr.common import OpenMode
from zarr.store.core import _dereference_path

if TYPE_CHECKING:
from fsspec.asyn import AsyncFileSystem
from upath import UPath

from zarr.buffer import Buffer
from zarr.common import BytesLike


class RemoteStore(Store):
# based on FSSpec
supports_writes: bool = True
supports_partial_writes: bool = False
supports_listing: bool = True

root: UPath
_fs: AsyncFileSystem
path: str
allowed_exceptions: tuple[type[Exception], ...]

def __init__(
self, url: UPath | str, *, mode: OpenMode = "r", **storage_options: dict[str, Any]
self,
url: UPath | str,
mode: OpenMode = "r",
allowed_exceptions: tuple[type[Exception], ...] = (
FileNotFoundError,
IsADirectoryError,
NotADirectoryError,
),
**storage_options: Any,
):
import fsspec
from upath import UPath
"""
Parameters
----------
url: root of the datastore. In fsspec notation, this is usually like "protocol://path/to".
Can also be a upath.UPath instance/
allowed_exceptions: when fetching data, these cases will be deemed to correspond to missing
keys, rather than some other IO failure
storage_options: passed on to fsspec to make the filesystem instance. If url is a UPath,
this must not be used.
"""

super().__init__(mode=mode)

if isinstance(url, str):
self.root = UPath(url, **storage_options)
self._fs, self.path = fsspec.url_to_fs(url, **storage_options)
elif hasattr(url, "protocol") and hasattr(url, "fs"):
# is UPath-like - but without importing
if storage_options:
raise ValueError(
"If constructed with a UPath object, no additional "
"storage_options are allowed"
)
self.path = url.path
self._fs = url._fs
else:
assert (
len(storage_options) == 0
), "If constructed with a UPath object, no additional storage_options are allowed."
self.root = url.rstrip("/")

raise ValueError("URL not understood, %s", url)
self.allowed_exceptions = allowed_exceptions
# test instantiate file system
fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs)
assert fs.__class__.async_impl, "FileSystem needs to support async operations."
if not self._fs.async_impl:
raise TypeError("FileSystem needs to support async operations")

def __str__(self) -> str:
return str(self.root)
return f"Remote fsspec store: {type(self._fs).__name__} , {self.path}"

def __repr__(self) -> str:
return f"RemoteStore({str(self)!r})"

def _make_fs(self) -> tuple[AsyncFileSystem, str]:
import fsspec

storage_options = self.root._kwargs.copy()
storage_options.pop("_url", None)
fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs)
assert fs.__class__.async_impl, "FileSystem needs to support async operations."
return fs, root
return f"<RemoteStore({type(self._fs).__name__} , {self.path})>"

async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype = default_buffer_prototype,
byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
assert isinstance(key, str)
fs, root = self._make_fs()
path = _dereference_path(root, key)
path = _dereference_path(self.path, key)

try:
value: Buffer | None = await (
fs._cat_file(path, start=byte_range[0], end=byte_range[1])
if byte_range
else fs._cat_file(path)
if byte_range:
# fsspec uses start/end, not start/length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I count this as additional evidence that we should switch to start/end semantics for the rest of the stores

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kerchunk is the exception, storing start/length, mostly because length is generally smaller for chunks in big files.

start, length = byte_range
if start is not None and length is not None:
end = start + length
elif length is not None:
end = length
else:
end = None
value: Buffer = prototype.buffer.from_bytes(
await (
self._fs._cat_file(path, start=byte_range[0], end=end)
if byte_range
else self._fs._cat_file(path)
)
)
except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
return None
return value

return value
except self.allowed_exceptions:
return None
except OSError as e:
if "not satisfiable" in str(e):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flagging this as s3 abstraction leakage that we might want to address later on by making an s3-specific storage class

# this is an s3-specific condition we probably don't want to leak
return prototype.buffer.from_bytes(b"")
raise

async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
async def set(
self,
key: str,
value: Buffer,
byte_range: tuple[int, int] | None = None,
) -> None:
self._check_writable()
assert isinstance(key, str)
fs, root = self._make_fs()
path = _dereference_path(root, key)

path = _dereference_path(self.path, key)
jhamman marked this conversation as resolved.
Show resolved Hide resolved
# write data
if byte_range:
with fs._open(path, "r+b") as f:
f.seek(byte_range[0])
f.write(value)
else:
await fs._pipe_file(path, value)
raise NotImplementedError
await self._fs._pipe_file(path, value.to_bytes())

async def delete(self, key: str) -> None:
self._check_writable()
fs, root = self._make_fs()
path = _dereference_path(root, key)
if await fs._exists(path):
await fs._rm(path)
path = _dereference_path(self.path, key)
jhamman marked this conversation as resolved.
Show resolved Hide resolved
try:
await self._fs._rm(path)
except FileNotFoundError:
pass
except self.allowed_exceptions:
pass

async def exists(self, key: str) -> bool:
fs, root = self._make_fs()
path = _dereference_path(root, key)
exists: bool = await fs._exists(path)
path = _dereference_path(self.path, key)
exists: bool = await self._fs._exists(path)
return exists

async def get_partial_values(
self,
prototype: BufferPrototype,
key_ranges: list[tuple[str, tuple[int | None, int | None]]],
) -> list[Buffer | None]:
if key_ranges:
paths, starts, stops = zip(
*(
(
_dereference_path(self.path, k[0]),
k[1][0],
((k[1][0] or 0) + k[1][1]) if k[1][1] is not None else None,
)
for k in key_ranges
),
strict=False,
)
else:
return []
# TODO: expectations for exceptions or missing keys?
res = await self._fs._cat_ranges(list(paths), starts, stops, on_error="return")
Comment on lines +161 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think returning the exceptions is the right thing here

# the following is an s3-specific condition we probably don't want to leak
res = [b"" if (isinstance(r, OSError) and "not satisfiable" in str(r)) else r for r in res]
for r in res:
if isinstance(r, Exception) and not isinstance(r, self.allowed_exceptions):
raise r

return [None if isinstance(r, Exception) else prototype.buffer.from_bytes(r) for r in res]

async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None:
raise NotImplementedError

async def list(self) -> AsyncGenerator[str, None]:
allfiles = await self._fs._find(self.path, detail=False, withdirs=False)
for onefile in (a.replace(self.path + "/", "") for a in allfiles):
yield onefile

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
prefix = f"{self.path}/{prefix.rstrip('/')}"
try:
allfiles = await self._fs._ls(prefix, detail=False)
except FileNotFoundError:
return
for onefile in (a.replace(prefix + "/", "") for a in allfiles):
yield onefile

async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
for onefile in await self._fs._ls(prefix, detail=False):
yield onefile
4 changes: 3 additions & 1 deletion src/zarr/testing/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,14 @@ async def test_list(self, store: S) -> None:
f"foo/c/{i}", Buffer.from_bytes(i.to_bytes(length=3, byteorder="little"))
)

@pytest.mark.xfail
async def test_list_prefix(self, store: S) -> None:
# TODO: we currently don't use list_prefix anywhere
raise NotImplementedError

async def test_list_dir(self, store: S) -> None:
assert [k async for k in store.list_dir("")] == []
out = [k async for k in store.list_dir("")]
assert out == []
assert [k async for k in store.list_dir("foo")] == []
await store.set("foo/zarr.json", Buffer.from_bytes(b"bar"))
await store.set("foo/c/1", Buffer.from_bytes(b"\x01"))
Expand Down
12 changes: 8 additions & 4 deletions tests/v2/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,9 @@ def test_hierarchy(self):
assert [] == store.listdir(self.root + "c/x/y")
assert [] == store.listdir(self.root + "c/d/y")
assert [] == store.listdir(self.root + "c/d/y/z")
assert [] == store.listdir(self.root + "c/e/f")
# the following is listdir(filepath), for which fsspec gives [filepath]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the advantage of going with POSIX semantics here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsspec tries to adhere to posix as much as possible. If we want to exclude the [file] case, we'd have to code that special case into our store.

# as posix would, but an empty list was previously assumed
# assert [] == store.listdir(self.root + "c/e/f")

# test rename (optional)
if store.is_erasable():
Expand Down Expand Up @@ -1064,9 +1066,8 @@ def test_complex(self):
store[self.root + "foo"] = b"hello"
assert "foo" in os.listdir(str(path1) + "/" + self.root)
assert self.root + "foo" in store
assert not os.listdir(str(path2))
assert store[self.root + "foo"] == b"hello"
assert "foo" in os.listdir(str(path2))
assert store[self.root + "foo"] == b"hello"

def test_deep_ndim(self):
import zarr.v2
Expand Down Expand Up @@ -1285,6 +1286,8 @@ def create_store(self, normalize_keys=False, dimension_separator=".", path=None,
@pytest.fixture()
def s3(request):
# writable local S3 system
pytest.skip("old v3 tests are disabled", allow_module_level=True)

import shlex
import subprocess
import time
Expand All @@ -1299,7 +1302,7 @@ def s3(request):
s3fs = pytest.importorskip("s3fs")
pytest.importorskip("moto")

port = 5555
port = 5556
endpoint_uri = "http://127.0.0.1:%d/" % port
proc = subprocess.Popen(
shlex.split("moto_server s3 -p %d" % port),
Expand All @@ -1318,6 +1321,7 @@ def s3(request):
timeout -= 0.1 # pragma: no cover
time.sleep(0.1) # pragma: no cover
s3so = dict(client_kwargs={"endpoint_url": endpoint_uri}, use_listings_cache=False)
s3fs.S3FileSystem.clear_instance_cache()
s3 = s3fs.S3FileSystem(anon=False, **s3so)
s3.mkdir("test")
request.cls.s3so = s3so
Expand Down
36 changes: 0 additions & 36 deletions tests/v3/test_store.py → tests/v3/test_store/test_local.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,12 @@
from __future__ import annotations

from typing import Any

import pytest

from zarr.buffer import Buffer
from zarr.store.local import LocalStore
from zarr.store.memory import MemoryStore
from zarr.testing.store import StoreTests


class TestMemoryStore(StoreTests[MemoryStore]):
store_cls = MemoryStore

def set(self, store: MemoryStore, key: str, value: Buffer) -> None:
store._store_dict[key] = value

def get(self, store: MemoryStore, key: str) -> Buffer:
return store._store_dict[key]

@pytest.fixture(scope="function", params=[None, {}])
def store_kwargs(self, request) -> dict[str, Any]:
return {"store_dict": request.param, "mode": "w"}

@pytest.fixture(scope="function")
def store(self, store_kwargs: dict[str, Any]) -> MemoryStore:
return self.store_cls(**store_kwargs)

def test_store_repr(self, store: MemoryStore) -> None:
assert str(store) == f"memory://{id(store._store_dict)}"

def test_store_supports_writes(self, store: MemoryStore) -> None:
assert store.supports_writes

def test_store_supports_listing(self, store: MemoryStore) -> None:
assert store.supports_listing

def test_store_supports_partial_writes(self, store: MemoryStore) -> None:
assert store.supports_partial_writes

def test_list_prefix(self, store: MemoryStore) -> None:
assert True


class TestLocalStore(StoreTests[LocalStore]):
store_cls = LocalStore

Expand Down
Loading