-
-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic working FsspecStore #1785
Merged
Merged
Changes from 17 commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
c21da45
Basic working FsspecStore
martindurant 5489408
Merge branch 'v3' into v3_fsspec
d-v-b 29fa0a4
upath to be optional
martindurant c2adda5
Merge branch 'v3_fsspec' of https://github.com/martindurant/zarr into…
martindurant 320a671
fill out methods
martindurant 79477f5
add fsspec to deps (I believe we want this)
martindurant 9104bdc
fixes
martindurant 581eafe
Merge branch 'v3' into v3_fsspec
martindurant 20c9387
importable
martindurant b6bfa11
exceptions
martindurant 02d8149
Merge branch 'v3' into v3_fsspec
martindurant 9c402b6
Merge branch 'v3' into v3_fsspec
martindurant 2d2a251
Merge branch 'v3' into v3_fsspec
jhamman e0c0ee4
Add simple test
martindurant 0be84d2
Merge branch 'v3_fsspec' of https://github.com/martindurant/zarr into…
martindurant 682f8e0
Add to test env
martindurant 5315fda
fix typing
normanrz 6eac094
Update src/zarr/store/remote.py
martindurant f5114a9
Merge remote-tracking branch 'origin/v3' into v3_fsspec
normanrz 7920195
BufferPrototype
normanrz 12baaa7
set up testing infrastructure for remote store
d-v-b 095d72e
broken tests but get and set are implemented correctly for TestRemote…
d-v-b 6ab1099
remove implementation of test_get, and make s3 fixture autoused, to r…
d-v-b a4ca371
Update tests/v3/test_store/test_remote.py
d-v-b 0896cb8
don't use fsmap, and don't use os.path.join
d-v-b a4e1c20
Merge branch 'mdurant_v3_fsspec' of https://github.com/d-v-b/zarr-pyt…
d-v-b 93a2c6a
scope s3 fixture to session, mark test_store_supports_partial_writes …
d-v-b 7c22f74
Merge pull request #23 from d-v-b/mdurant_v3_fsspec
martindurant 3c037c3
Update src/zarr/store/remote.py
martindurant 3454da9
Fix most
martindurant 9b6a6d3
fixed more
martindurant 266fa37
fix rest
martindurant 221cebc
Massage old v2 tests
martindurant 1bfcbb5
just skip them..
martindurant 362951e
Attribute rename to allowed_exceptions
martindurant 3fb24da
Merge branch 'v3' into v3_fsspec
jhamman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,10 @@ | ||
from __future__ import annotations | ||
|
||
from collections.abc import AsyncGenerator | ||
from typing import TYPE_CHECKING, Any | ||
|
||
import fsspec | ||
|
||
from zarr.abc.store import Store | ||
from zarr.buffer import Buffer | ||
from zarr.common import OpenMode | ||
|
@@ -11,90 +14,143 @@ | |
from fsspec.asyn import AsyncFileSystem | ||
from upath import UPath | ||
|
||
from zarr.buffer import Buffer | ||
from zarr.common import BytesLike | ||
|
||
|
||
class RemoteStore(Store): | ||
# based on FSSpec | ||
supports_writes: bool = True | ||
supports_partial_writes: bool = False | ||
supports_listing: bool = True | ||
|
||
root: UPath | ||
_fs: AsyncFileSystem | ||
path: str | ||
exceptions: tuple[type[Exception], ...] | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __init__( | ||
self, url: UPath | str, *, mode: OpenMode = "r", **storage_options: dict[str, Any] | ||
self, | ||
url: UPath | str, | ||
mode: OpenMode = "r", | ||
allowed_exceptions: tuple[type[Exception], ...] = ( | ||
FileNotFoundError, | ||
IsADirectoryError, | ||
NotADirectoryError, | ||
), | ||
**storage_options: Any, | ||
): | ||
import fsspec | ||
from upath import UPath | ||
""" | ||
Parameters | ||
---------- | ||
url: root of the datastore. In fsspec notation, this is usually like "protocol://path/to". | ||
Can also be a upath.UPath instance/ | ||
allowed_exceptions: when fetching data, these cases will be deemed to correspond to missinf | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
keys, rather than some other IO failure | ||
storage_options: passed on to fsspec to make the filesystem instance. If url is a UPath, | ||
this must not be used. | ||
""" | ||
|
||
super().__init__(mode=mode) | ||
|
||
if isinstance(url, str): | ||
self.root = UPath(url, **storage_options) | ||
self._fs, self.path = fsspec.url_to_fs(url, **storage_options) | ||
elif hasattr(url, "protocol") and hasattr(url, "fs"): | ||
# is UPath-like - but without importing | ||
if storage_options: | ||
raise ValueError( | ||
"If constructed with a UPath object, no additional " | ||
"storage_options are allowed" | ||
) | ||
self.path = url.path | ||
self._fs = url._fs | ||
else: | ||
assert ( | ||
len(storage_options) == 0 | ||
), "If constructed with a UPath object, no additional storage_options are allowed." | ||
self.root = url.rstrip("/") | ||
|
||
raise ValueError("URL not understood, %s", url) | ||
self.exceptions = allowed_exceptions | ||
normanrz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# test instantiate file system | ||
fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) | ||
assert fs.__class__.async_impl, "FileSystem needs to support async operations." | ||
if not self._fs.async_impl: | ||
raise TypeError("FileSystem needs to support async operations") | ||
|
||
def __str__(self) -> str: | ||
return str(self.root) | ||
return f"Remote fsspec store: {self.path}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should put the protocol in here |
||
|
||
def __repr__(self) -> str: | ||
return f"RemoteStore({str(self)!r})" | ||
|
||
def _make_fs(self) -> tuple[AsyncFileSystem, str]: | ||
import fsspec | ||
|
||
storage_options = self.root._kwargs.copy() | ||
storage_options.pop("_url", None) | ||
fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) | ||
assert fs.__class__.async_impl, "FileSystem needs to support async operations." | ||
return fs, root | ||
return f"<FsspecStore({self.path})>" | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
async def get( | ||
self, key: str, byte_range: tuple[int | None, int | None] | None = None | ||
) -> Buffer | None: | ||
assert isinstance(key, str) | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
path = _dereference_path(self.path, key) | ||
|
||
try: | ||
value: Buffer | None = await ( | ||
fs._cat_file(path, start=byte_range[0], end=byte_range[1]) | ||
if byte_range | ||
else fs._cat_file(path) | ||
value: Buffer = Buffer.from_bytes( | ||
await ( | ||
self._fs._cat_file(path, start=byte_range[0], end=byte_range[1]) | ||
if byte_range | ||
else self._fs._cat_file(path) | ||
) | ||
) | ||
except (FileNotFoundError, IsADirectoryError, NotADirectoryError): | ||
return None | ||
return value | ||
|
||
return value | ||
except self.exceptions: | ||
return None | ||
|
||
async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None: | ||
async def set( | ||
self, | ||
key: str, | ||
value: Buffer, | ||
byte_range: tuple[int, int] | None = None, | ||
) -> None: | ||
self._check_writable() | ||
assert isinstance(key, str) | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
|
||
path = _dereference_path(self.path, key) | ||
jhamman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# write data | ||
if byte_range: | ||
with fs._open(path, "r+b") as f: | ||
f.seek(byte_range[0]) | ||
f.write(value) | ||
else: | ||
await fs._pipe_file(path, value) | ||
raise NotImplementedError | ||
await self._fs._pipe_file(path, value.to_bytes()) | ||
|
||
async def delete(self, key: str) -> None: | ||
self._check_writable() | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
if await fs._exists(path): | ||
await fs._rm(path) | ||
path = _dereference_path(self.path, key) | ||
jhamman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
await self._fs._rm(path) | ||
except FileNotFoundError: | ||
pass | ||
except self.exceptions: | ||
pass | ||
|
||
async def exists(self, key: str) -> bool: | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
exists: bool = await fs._exists(path) | ||
path = _dereference_path(self.path, key) | ||
exists: bool = await self._fs._exists(path) | ||
return exists | ||
|
||
async def get_partial_values( | ||
self, key_ranges: list[tuple[str, tuple[int | None, int | None]]] | ||
) -> list[Buffer | None]: | ||
paths, starts, stops = zip( | ||
*((_dereference_path(self.path, k[0]), k[1][0], k[1][1]) for k in key_ranges), | ||
strict=False, | ||
) | ||
# TODO: expectations for exceptions or missing keys? | ||
res = await self._fs._cat_ranges(list(paths), starts, stops, on_error="return") | ||
Comment on lines
+161
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think returning the exceptions is the right thing here |
||
for r in res: | ||
if isinstance(r, Exception) and not isinstance(r, self.exceptions): | ||
raise r | ||
|
||
return [None if isinstance(r, Exception) else Buffer.from_bytes(r) for r in res] | ||
|
||
async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None: | ||
raise NotImplementedError | ||
|
||
async def list(self) -> AsyncGenerator[str, None]: | ||
allfiles = await self._fs._find(self.path, detail=False, withdirs=False) | ||
for onefile in (a.replace(self.path + "/", "") for a in allfiles): | ||
yield onefile | ||
|
||
async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: | ||
prefix = prefix.rstrip("/") | ||
allfiles = await self._fs._ls(prefix, detail=False) | ||
for onefile in (a.replace(prefix + "/", "") for a in allfiles): | ||
yield onefile | ||
|
||
async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: | ||
for onefile in await self._fs._ls(prefix, detail=False): | ||
yield onefile |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import os | ||
|
||
import pytest | ||
|
||
from zarr.buffer import Buffer | ||
from zarr.store import RemoteStore | ||
|
||
s3fs = pytest.importorskip("s3fs") | ||
requests = pytest.importorskip("requests") | ||
moto_server = pytest.importorskip("moto.moto_server.threaded_moto_server") | ||
moto = pytest.importorskip("moto") | ||
|
||
# ### amended from s3fs ### # | ||
test_bucket_name = "test" | ||
secure_bucket_name = "test-secure" | ||
port = 5555 | ||
endpoint_uri = f"http://127.0.0.1:{port}/" | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def s3_base(): | ||
# writable local S3 system | ||
|
||
# This fixture is module-scoped, meaning that we can reuse the MotoServer across all tests | ||
server = moto_server.ThreadedMotoServer(ip_address="127.0.0.1", port=port) | ||
server.start() | ||
if "AWS_SECRET_ACCESS_KEY" not in os.environ: | ||
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo" | ||
if "AWS_ACCESS_KEY_ID" not in os.environ: | ||
os.environ["AWS_ACCESS_KEY_ID"] = "foo" | ||
|
||
yield | ||
server.stop() | ||
|
||
|
||
def get_boto3_client(): | ||
from botocore.session import Session | ||
|
||
# NB: we use the sync botocore client for setup | ||
session = Session() | ||
return session.create_client("s3", endpoint_url=endpoint_uri) | ||
|
||
|
||
@pytest.fixture() | ||
def s3(s3_base): | ||
client = get_boto3_client() | ||
client.create_bucket(Bucket=test_bucket_name, ACL="public-read") | ||
s3fs.S3FileSystem.clear_instance_cache() | ||
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri}) | ||
s3.invalidate_cache() | ||
yield s3 | ||
requests.post(f"{endpoint_uri}/moto-api/reset") | ||
|
||
|
||
# ### end from s3fs ### # | ||
|
||
|
||
async def alist(it): | ||
out = [] | ||
async for a in it: | ||
out.append(a) | ||
return out | ||
|
||
|
||
async def test_basic(s3): | ||
store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_uri, anon=False) | ||
assert not await alist(store.list()) | ||
assert not await store.exists("foo") | ||
data = b"hello" | ||
await store.set("foo", Buffer.from_bytes(data)) | ||
assert await store.exists("foo") | ||
assert (await store.get("foo")).to_bytes() == data | ||
out = await store.get_partial_values([("foo", (1, None))]) | ||
assert out[0].to_bytes() == data[1:] |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do this import in
RemoteStore.__init__
? Would help us keep this an optional dep here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since fsspec is very small and itself has no further dependencies, making it required will reduce friction for users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is fine -- at least for now.