-
-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic working FsspecStore #1785
Changes from all commits
c21da45
5489408
29fa0a4
c2adda5
320a671
79477f5
9104bdc
581eafe
20c9387
b6bfa11
02d8149
9c402b6
2d2a251
e0c0ee4
0be84d2
682f8e0
5315fda
6eac094
f5114a9
7920195
12baaa7
095d72e
6ab1099
a4ca371
0896cb8
a4e1c20
93a2c6a
7c22f74
3c037c3
3454da9
9b6a6d3
266fa37
221cebc
1bfcbb5
362951e
3fb24da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,103 +1,190 @@ | ||
from __future__ import annotations | ||
|
||
from collections.abc import AsyncGenerator | ||
from typing import TYPE_CHECKING, Any | ||
|
||
import fsspec | ||
|
||
from zarr.abc.store import Store | ||
from zarr.buffer import Buffer, BufferPrototype | ||
from zarr.buffer import Buffer, BufferPrototype, default_buffer_prototype | ||
from zarr.common import OpenMode | ||
from zarr.store.core import _dereference_path | ||
|
||
if TYPE_CHECKING: | ||
from fsspec.asyn import AsyncFileSystem | ||
from upath import UPath | ||
|
||
from zarr.buffer import Buffer | ||
from zarr.common import BytesLike | ||
|
||
|
||
class RemoteStore(Store): | ||
# based on FSSpec | ||
supports_writes: bool = True | ||
supports_partial_writes: bool = False | ||
supports_listing: bool = True | ||
|
||
root: UPath | ||
_fs: AsyncFileSystem | ||
path: str | ||
allowed_exceptions: tuple[type[Exception], ...] | ||
|
||
def __init__( | ||
self, url: UPath | str, *, mode: OpenMode = "r", **storage_options: dict[str, Any] | ||
self, | ||
url: UPath | str, | ||
mode: OpenMode = "r", | ||
allowed_exceptions: tuple[type[Exception], ...] = ( | ||
FileNotFoundError, | ||
IsADirectoryError, | ||
NotADirectoryError, | ||
), | ||
**storage_options: Any, | ||
): | ||
import fsspec | ||
from upath import UPath | ||
""" | ||
Parameters | ||
---------- | ||
url: root of the datastore. In fsspec notation, this is usually like "protocol://path/to". | ||
Can also be a upath.UPath instance/ | ||
allowed_exceptions: when fetching data, these cases will be deemed to correspond to missing | ||
keys, rather than some other IO failure | ||
storage_options: passed on to fsspec to make the filesystem instance. If url is a UPath, | ||
this must not be used. | ||
""" | ||
|
||
super().__init__(mode=mode) | ||
|
||
if isinstance(url, str): | ||
self.root = UPath(url, **storage_options) | ||
self._fs, self.path = fsspec.url_to_fs(url, **storage_options) | ||
elif hasattr(url, "protocol") and hasattr(url, "fs"): | ||
# is UPath-like - but without importing | ||
if storage_options: | ||
raise ValueError( | ||
"If constructed with a UPath object, no additional " | ||
"storage_options are allowed" | ||
) | ||
self.path = url.path | ||
self._fs = url._fs | ||
else: | ||
assert ( | ||
len(storage_options) == 0 | ||
), "If constructed with a UPath object, no additional storage_options are allowed." | ||
self.root = url.rstrip("/") | ||
|
||
raise ValueError("URL not understood, %s", url) | ||
self.allowed_exceptions = allowed_exceptions | ||
# test instantiate file system | ||
fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) | ||
assert fs.__class__.async_impl, "FileSystem needs to support async operations." | ||
if not self._fs.async_impl: | ||
raise TypeError("FileSystem needs to support async operations") | ||
|
||
def __str__(self) -> str: | ||
return str(self.root) | ||
return f"Remote fsspec store: {type(self._fs).__name__} , {self.path}" | ||
|
||
def __repr__(self) -> str: | ||
return f"RemoteStore({str(self)!r})" | ||
|
||
def _make_fs(self) -> tuple[AsyncFileSystem, str]: | ||
import fsspec | ||
|
||
storage_options = self.root._kwargs.copy() | ||
storage_options.pop("_url", None) | ||
fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) | ||
assert fs.__class__.async_impl, "FileSystem needs to support async operations." | ||
return fs, root | ||
return f"<RemoteStore({type(self._fs).__name__} , {self.path})>" | ||
|
||
async def get( | ||
self, | ||
key: str, | ||
prototype: BufferPrototype, | ||
prototype: BufferPrototype = default_buffer_prototype, | ||
byte_range: tuple[int | None, int | None] | None = None, | ||
) -> Buffer | None: | ||
assert isinstance(key, str) | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
path = _dereference_path(self.path, key) | ||
|
||
try: | ||
value: Buffer | None = await ( | ||
fs._cat_file(path, start=byte_range[0], end=byte_range[1]) | ||
if byte_range | ||
else fs._cat_file(path) | ||
if byte_range: | ||
# fsspec uses start/end, not start/length | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I count this as additional evidence that we should switch to start/end semantics for the rest of the stores There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kerchunk is the exception, storing start/length, mostly because length is generally smaller for chunks in big files. |
||
start, length = byte_range | ||
if start is not None and length is not None: | ||
end = start + length | ||
elif length is not None: | ||
end = length | ||
else: | ||
end = None | ||
value: Buffer = prototype.buffer.from_bytes( | ||
await ( | ||
self._fs._cat_file(path, start=byte_range[0], end=end) | ||
if byte_range | ||
else self._fs._cat_file(path) | ||
) | ||
) | ||
except (FileNotFoundError, IsADirectoryError, NotADirectoryError): | ||
return None | ||
return value | ||
|
||
return value | ||
except self.allowed_exceptions: | ||
return None | ||
except OSError as e: | ||
if "not satisfiable" in str(e): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. flagging this as s3 abstraction leakage that we might want to address later on by making an s3-specific storage class |
||
# this is an s3-specific condition we probably don't want to leak | ||
return prototype.buffer.from_bytes(b"") | ||
raise | ||
|
||
async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None: | ||
async def set( | ||
self, | ||
key: str, | ||
value: Buffer, | ||
byte_range: tuple[int, int] | None = None, | ||
) -> None: | ||
self._check_writable() | ||
assert isinstance(key, str) | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
|
||
path = _dereference_path(self.path, key) | ||
jhamman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# write data | ||
if byte_range: | ||
with fs._open(path, "r+b") as f: | ||
f.seek(byte_range[0]) | ||
f.write(value) | ||
else: | ||
await fs._pipe_file(path, value) | ||
raise NotImplementedError | ||
await self._fs._pipe_file(path, value.to_bytes()) | ||
|
||
async def delete(self, key: str) -> None: | ||
self._check_writable() | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
if await fs._exists(path): | ||
await fs._rm(path) | ||
path = _dereference_path(self.path, key) | ||
jhamman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
await self._fs._rm(path) | ||
except FileNotFoundError: | ||
pass | ||
except self.allowed_exceptions: | ||
pass | ||
|
||
async def exists(self, key: str) -> bool: | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
exists: bool = await fs._exists(path) | ||
path = _dereference_path(self.path, key) | ||
exists: bool = await self._fs._exists(path) | ||
return exists | ||
|
||
async def get_partial_values( | ||
self, | ||
prototype: BufferPrototype, | ||
key_ranges: list[tuple[str, tuple[int | None, int | None]]], | ||
) -> list[Buffer | None]: | ||
if key_ranges: | ||
paths, starts, stops = zip( | ||
*( | ||
( | ||
_dereference_path(self.path, k[0]), | ||
k[1][0], | ||
((k[1][0] or 0) + k[1][1]) if k[1][1] is not None else None, | ||
) | ||
for k in key_ranges | ||
), | ||
strict=False, | ||
) | ||
else: | ||
return [] | ||
# TODO: expectations for exceptions or missing keys? | ||
res = await self._fs._cat_ranges(list(paths), starts, stops, on_error="return") | ||
Comment on lines
+161
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think returning the exceptions is the right thing here |
||
# the following is an s3-specific condition we probably don't want to leak | ||
res = [b"" if (isinstance(r, OSError) and "not satisfiable" in str(r)) else r for r in res] | ||
for r in res: | ||
if isinstance(r, Exception) and not isinstance(r, self.allowed_exceptions): | ||
raise r | ||
|
||
return [None if isinstance(r, Exception) else prototype.buffer.from_bytes(r) for r in res] | ||
|
||
async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None: | ||
raise NotImplementedError | ||
|
||
async def list(self) -> AsyncGenerator[str, None]: | ||
allfiles = await self._fs._find(self.path, detail=False, withdirs=False) | ||
for onefile in (a.replace(self.path + "/", "") for a in allfiles): | ||
yield onefile | ||
|
||
async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: | ||
prefix = f"{self.path}/{prefix.rstrip('/')}" | ||
try: | ||
allfiles = await self._fs._ls(prefix, detail=False) | ||
except FileNotFoundError: | ||
return | ||
for onefile in (a.replace(prefix + "/", "") for a in allfiles): | ||
yield onefile | ||
|
||
async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: | ||
for onefile in await self._fs._ls(prefix, detail=False): | ||
yield onefile |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -399,7 +399,9 @@ def test_hierarchy(self): | |
assert [] == store.listdir(self.root + "c/x/y") | ||
assert [] == store.listdir(self.root + "c/d/y") | ||
assert [] == store.listdir(self.root + "c/d/y/z") | ||
assert [] == store.listdir(self.root + "c/e/f") | ||
# the following is listdir(filepath), for which fsspec gives [filepath] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the advantage of going with POSIX semantics here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fsspec tries to adhere to posix as much as possible. If we want to exclude the [file] case, we'd have to code that special case into our store. |
||
# as posix would, but an empty list was previously assumed | ||
# assert [] == store.listdir(self.root + "c/e/f") | ||
|
||
# test rename (optional) | ||
if store.is_erasable(): | ||
|
@@ -1064,9 +1066,8 @@ def test_complex(self): | |
store[self.root + "foo"] = b"hello" | ||
assert "foo" in os.listdir(str(path1) + "/" + self.root) | ||
assert self.root + "foo" in store | ||
assert not os.listdir(str(path2)) | ||
assert store[self.root + "foo"] == b"hello" | ||
assert "foo" in os.listdir(str(path2)) | ||
assert store[self.root + "foo"] == b"hello" | ||
|
||
def test_deep_ndim(self): | ||
import zarr.v2 | ||
|
@@ -1285,6 +1286,8 @@ def create_store(self, normalize_keys=False, dimension_separator=".", path=None, | |
@pytest.fixture() | ||
def s3(request): | ||
# writable local S3 system | ||
pytest.skip("old v3 tests are disabled", allow_module_level=True) | ||
|
||
import shlex | ||
import subprocess | ||
import time | ||
|
@@ -1299,7 +1302,7 @@ def s3(request): | |
s3fs = pytest.importorskip("s3fs") | ||
pytest.importorskip("moto") | ||
|
||
port = 5555 | ||
port = 5556 | ||
endpoint_uri = "http://127.0.0.1:%d/" % port | ||
proc = subprocess.Popen( | ||
shlex.split("moto_server s3 -p %d" % port), | ||
|
@@ -1318,6 +1321,7 @@ def s3(request): | |
timeout -= 0.1 # pragma: no cover | ||
time.sleep(0.1) # pragma: no cover | ||
s3so = dict(client_kwargs={"endpoint_url": endpoint_uri}, use_listings_cache=False) | ||
s3fs.S3FileSystem.clear_instance_cache() | ||
s3 = s3fs.S3FileSystem(anon=False, **s3so) | ||
s3.mkdir("test") | ||
request.cls.s3so = s3so | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do this import in
RemoteStore.__init__
? Would help us keep this an optional dep here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since fsspec is very small and itself has no further dependencies, making it required will reduce friction for users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is fine -- at least for now.