Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic working FsspecStore #1785

Merged
merged 36 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c21da45
Basic working FsspecStore
martindurant Apr 10, 2024
5489408
Merge branch 'v3' into v3_fsspec
d-v-b Apr 11, 2024
29fa0a4
upath to be optional
martindurant Apr 12, 2024
c2adda5
Merge branch 'v3_fsspec' of https://github.com/martindurant/zarr into…
martindurant Apr 12, 2024
320a671
fill out methods
martindurant Apr 13, 2024
79477f5
add fsspec to deps (I believe we want this)
martindurant Apr 13, 2024
9104bdc
fixes
martindurant Apr 14, 2024
581eafe
Merge branch 'v3' into v3_fsspec
martindurant May 29, 2024
20c9387
importable
martindurant May 29, 2024
b6bfa11
exceptions
martindurant May 29, 2024
02d8149
Merge branch 'v3' into v3_fsspec
martindurant May 29, 2024
9c402b6
Merge branch 'v3' into v3_fsspec
martindurant Jun 4, 2024
2d2a251
Merge branch 'v3' into v3_fsspec
jhamman Jun 4, 2024
e0c0ee4
Add simple test
martindurant Jun 4, 2024
0be84d2
Merge branch 'v3_fsspec' of https://github.com/martindurant/zarr into…
martindurant Jun 4, 2024
682f8e0
Add to test env
martindurant Jun 4, 2024
5315fda
fix typing
normanrz Jun 5, 2024
6eac094
Update src/zarr/store/remote.py
martindurant Jun 5, 2024
f5114a9
Merge remote-tracking branch 'origin/v3' into v3_fsspec
normanrz Jun 5, 2024
7920195
BufferPrototype
normanrz Jun 5, 2024
12baaa7
set up testing infrastructure for remote store
d-v-b Jun 9, 2024
095d72e
broken tests but get and set are implemented correctly for TestRemote…
d-v-b Jun 9, 2024
6ab1099
remove implementation of test_get, and make s3 fixture autoused, to r…
d-v-b Jun 9, 2024
a4ca371
Update tests/v3/test_store/test_remote.py
d-v-b Jun 10, 2024
0896cb8
don't use fsmap, and don't use os.path.join
d-v-b Jun 10, 2024
a4e1c20
Merge branch 'mdurant_v3_fsspec' of https://github.com/d-v-b/zarr-pyt…
d-v-b Jun 10, 2024
93a2c6a
scope s3 fixture to session, mark test_store_supports_partial_writes …
d-v-b Jun 10, 2024
7c22f74
Merge pull request #23 from d-v-b/mdurant_v3_fsspec
martindurant Jun 10, 2024
3c037c3
Update src/zarr/store/remote.py
martindurant Jun 10, 2024
3454da9
Fix most
martindurant Jun 10, 2024
9b6a6d3
fixed more
martindurant Jun 10, 2024
266fa37
fix rest
martindurant Jun 10, 2024
221cebc
Massage old v2 tests
martindurant Jun 10, 2024
1bfcbb5
just skip them..
martindurant Jun 10, 2024
362951e
Attribute rename to allowed_exceptions
martindurant Jun 11, 2024
3fb24da
Merge branch 'v3' into v3_fsspec
jhamman Jun 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
'numpy>=1.24',
'fasteners',
'numcodecs>=0.10.0',
'fsspec>2024',
'crc32c',
'zstandard',
'typing_extensions',
Expand Down Expand Up @@ -112,6 +113,8 @@ extra-dependencies = [
"msgpack",
"lmdb",
"pytest-asyncio",
"moto",
"requests",
"mypy"
]
features = ["extra"]
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def _dereference_path(root: str, path: str) -> str:
assert isinstance(root, str)
assert isinstance(path, str)
root = root.rstrip("/")
path = f"{root}/{path}" if root != "" else path
path = f"{root}/{path}" if root else path
path = path.rstrip("/")
return path

Expand Down
156 changes: 106 additions & 50 deletions src/zarr/store/remote.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any

import fsspec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this import in RemoteStore.__init__? Would help us keep this an optional dep here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since fsspec is very small and itself has no further dependencies, making it required will reduce friction for users

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is fine -- at least for now.


from zarr.abc.store import Store
from zarr.buffer import Buffer
from zarr.common import OpenMode
Expand All @@ -11,90 +14,143 @@
from fsspec.asyn import AsyncFileSystem
from upath import UPath

from zarr.buffer import Buffer
from zarr.common import BytesLike


class RemoteStore(Store):
# based on FSSpec
supports_writes: bool = True
supports_partial_writes: bool = False
supports_listing: bool = True

root: UPath
_fs: AsyncFileSystem
path: str
exceptions: tuple[type[Exception], ...]
martindurant marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self, url: UPath | str, *, mode: OpenMode = "r", **storage_options: dict[str, Any]
self,
url: UPath | str,
mode: OpenMode = "r",
allowed_exceptions: tuple[type[Exception], ...] = (
FileNotFoundError,
IsADirectoryError,
NotADirectoryError,
),
**storage_options: Any,
):
import fsspec
from upath import UPath
"""
Parameters
----------
url: root of the datastore. In fsspec notation, this is usually like "protocol://path/to".
Can also be a upath.UPath instance/
allowed_exceptions: when fetching data, these cases will be deemed to correspond to missinf
martindurant marked this conversation as resolved.
Show resolved Hide resolved
keys, rather than some other IO failure
storage_options: passed on to fsspec to make the filesystem instance. If url is a UPath,
this must not be used.
"""

super().__init__(mode=mode)

if isinstance(url, str):
self.root = UPath(url, **storage_options)
self._fs, self.path = fsspec.url_to_fs(url, **storage_options)
elif hasattr(url, "protocol") and hasattr(url, "fs"):
# is UPath-like - but without importing
if storage_options:
raise ValueError(
"If constructed with a UPath object, no additional "
"storage_options are allowed"
)
self.path = url.path
self._fs = url._fs
else:
assert (
len(storage_options) == 0
), "If constructed with a UPath object, no additional storage_options are allowed."
self.root = url.rstrip("/")

raise ValueError("URL not understood, %s", url)
self.exceptions = allowed_exceptions
normanrz marked this conversation as resolved.
Show resolved Hide resolved
# test instantiate file system
fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs)
assert fs.__class__.async_impl, "FileSystem needs to support async operations."
if not self._fs.async_impl:
raise TypeError("FileSystem needs to support async operations")

def __str__(self) -> str:
return str(self.root)
return f"Remote fsspec store: {self.path}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should put the protocol in here


def __repr__(self) -> str:
return f"RemoteStore({str(self)!r})"

def _make_fs(self) -> tuple[AsyncFileSystem, str]:
import fsspec

storage_options = self.root._kwargs.copy()
storage_options.pop("_url", None)
fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs)
assert fs.__class__.async_impl, "FileSystem needs to support async operations."
return fs, root
return f"<FsspecStore({self.path})>"
martindurant marked this conversation as resolved.
Show resolved Hide resolved

async def get(
self, key: str, byte_range: tuple[int | None, int | None] | None = None
) -> Buffer | None:
assert isinstance(key, str)
fs, root = self._make_fs()
path = _dereference_path(root, key)
path = _dereference_path(self.path, key)

try:
value: Buffer | None = await (
fs._cat_file(path, start=byte_range[0], end=byte_range[1])
if byte_range
else fs._cat_file(path)
value: Buffer = Buffer.from_bytes(
await (
self._fs._cat_file(path, start=byte_range[0], end=byte_range[1])
if byte_range
else self._fs._cat_file(path)
)
)
except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
return None
return value

return value
except self.exceptions:
return None

async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
async def set(
self,
key: str,
value: Buffer,
byte_range: tuple[int, int] | None = None,
) -> None:
self._check_writable()
assert isinstance(key, str)
fs, root = self._make_fs()
path = _dereference_path(root, key)

path = _dereference_path(self.path, key)
jhamman marked this conversation as resolved.
Show resolved Hide resolved
# write data
if byte_range:
with fs._open(path, "r+b") as f:
f.seek(byte_range[0])
f.write(value)
else:
await fs._pipe_file(path, value)
raise NotImplementedError
await self._fs._pipe_file(path, value.to_bytes())

async def delete(self, key: str) -> None:
self._check_writable()
fs, root = self._make_fs()
path = _dereference_path(root, key)
if await fs._exists(path):
await fs._rm(path)
path = _dereference_path(self.path, key)
jhamman marked this conversation as resolved.
Show resolved Hide resolved
try:
await self._fs._rm(path)
except FileNotFoundError:
pass
except self.exceptions:
pass

async def exists(self, key: str) -> bool:
fs, root = self._make_fs()
path = _dereference_path(root, key)
exists: bool = await fs._exists(path)
path = _dereference_path(self.path, key)
exists: bool = await self._fs._exists(path)
return exists

async def get_partial_values(
self, key_ranges: list[tuple[str, tuple[int | None, int | None]]]
) -> list[Buffer | None]:
paths, starts, stops = zip(
*((_dereference_path(self.path, k[0]), k[1][0], k[1][1]) for k in key_ranges),
strict=False,
)
# TODO: expectations for exceptions or missing keys?
res = await self._fs._cat_ranges(list(paths), starts, stops, on_error="return")
Comment on lines +161 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think returning the exceptions is the right thing here

for r in res:
if isinstance(r, Exception) and not isinstance(r, self.exceptions):
raise r

return [None if isinstance(r, Exception) else Buffer.from_bytes(r) for r in res]

async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None:
raise NotImplementedError

async def list(self) -> AsyncGenerator[str, None]:
allfiles = await self._fs._find(self.path, detail=False, withdirs=False)
for onefile in (a.replace(self.path + "/", "") for a in allfiles):
yield onefile

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
prefix = prefix.rstrip("/")
allfiles = await self._fs._ls(prefix, detail=False)
for onefile in (a.replace(prefix + "/", "") for a in allfiles):
yield onefile

async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
for onefile in await self._fs._ls(prefix, detail=False):
yield onefile
74 changes: 74 additions & 0 deletions tests/v3/test_remote_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os

import pytest

from zarr.buffer import Buffer
from zarr.store import RemoteStore

s3fs = pytest.importorskip("s3fs")
requests = pytest.importorskip("requests")
moto_server = pytest.importorskip("moto.moto_server.threaded_moto_server")
moto = pytest.importorskip("moto")

# ### amended from s3fs ### #
test_bucket_name = "test"
secure_bucket_name = "test-secure"
port = 5555
endpoint_uri = f"http://127.0.0.1:{port}/"


@pytest.fixture(scope="module")
def s3_base():
# writable local S3 system

# This fixture is module-scoped, meaning that we can reuse the MotoServer across all tests
server = moto_server.ThreadedMotoServer(ip_address="127.0.0.1", port=port)
server.start()
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "foo"

yield
server.stop()


def get_boto3_client():
from botocore.session import Session

# NB: we use the sync botocore client for setup
session = Session()
return session.create_client("s3", endpoint_url=endpoint_uri)


@pytest.fixture()
def s3(s3_base):
client = get_boto3_client()
client.create_bucket(Bucket=test_bucket_name, ACL="public-read")
s3fs.S3FileSystem.clear_instance_cache()
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri})
s3.invalidate_cache()
yield s3
requests.post(f"{endpoint_uri}/moto-api/reset")


# ### end from s3fs ### #


async def alist(it):
out = []
async for a in it:
out.append(a)
return out


async def test_basic(s3):
store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_uri, anon=False)
assert not await alist(store.list())
assert not await store.exists("foo")
data = b"hello"
await store.set("foo", Buffer.from_bytes(data))
assert await store.exists("foo")
assert (await store.get("foo")).to_bytes() == data
out = await store.get_partial_values([("foo", (1, None))])
assert out[0].to_bytes() == data[1:]
Loading