Skip to content

Commit

Permalink
Ref mode (#1224)
Browse files Browse the repository at this point in the history
* safe errors for parquet references

* Should be keyError in the mapper

* another ocurrance

* debug

* fix for metadata

* typo

* more bytes

* debug CI

* debug

* fix dask test
  • Loading branch information
martindurant authored Mar 28, 2023
1 parent f82e73f commit 4576ebe
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
11 changes: 7 additions & 4 deletions fsspec/implementations/dask.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dask
from distributed.client import Client, _get_global_client
from distributed.worker import get_worker
from distributed.worker import Worker

from fsspec import filesystem
from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
Expand All @@ -17,6 +17,10 @@ def _get_client(client):
return Client(client)


def _in_worker():
return bool(Worker._instances)


class DaskWorkerFileSystem(AbstractFileSystem):
"""View files accessible to a worker as any other remote file-system
Expand Down Expand Up @@ -51,14 +55,13 @@ def _get_kwargs_from_urls(path):
return {}

def _determine_worker(self):
try:
get_worker()
if _in_worker():
self.worker = True
if self.fs is None:
self.fs = filesystem(
self.target_protocol, **(self.target_options or {})
)
except ValueError:
else:
self.worker = False
self.client = _get_client(self.client)
self.rfs = dask.delayed(self)
Expand Down
34 changes: 25 additions & 9 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def _protocol_groups(paths, references):

class RefsValuesView(collections.abc.ValuesView):
def __iter__(self):
yield from self._mapping.zmetadata.values()
for val in self._mapping.zmetadata.values():
yield json.dumps(val).encode()
yield from self._mapping._items.values()
for field in self._mapping.listdir():
chunk_sizes = self._mapping._get_chunk_sizes(field)
Expand All @@ -73,8 +74,17 @@ class LazyReferenceMapper(collections.abc.MutableMapping):
references dict."""

# import is class level to prevent numpy dep requirement for fsspec
import numpy as np
import pandas as pd
@property
def np(self):
import numpy as np

return np

@property
def pd(self):
import pandas as pd

return pd

def __init__(
self,
Expand Down Expand Up @@ -136,12 +146,17 @@ def _load_one_key(self, key):
return self._items[key]
elif key in self.zmetadata:
return json.dumps(self.zmetadata[key]).encode()
elif "/" not in key:
raise KeyError(key)
field, sub_key = key.split("/")
# Chunk keys can be loaded from row group and cached in LRU cache
record, ri, chunk_size = self._key_to_record(key)
if chunk_size == 0:
return b""
_, refs = self.open_refs(field, record)
try:
record, ri, chunk_size = self._key_to_record(key)
if chunk_size == 0:
return b""
_, refs = self.open_refs(field, record)
except (ValueError, TypeError, FileNotFoundError):
raise KeyError(key)
columns = ["path", "offset", "size", "raw"]
selection = [refs[c][ri] if c in refs else None for c in columns]
raw = selection[-1]
Expand Down Expand Up @@ -203,7 +218,8 @@ def __getitem__(self, key):
if val is None:
raise KeyError
if key in self.zmetadata:
return self.zmetadata[key]
# spec requires bytes even if we already decoded the metadata
return json.dumps(self.zmetadata[key]).encode()
return self._load_one_key(key)

def __setitem__(self, key, value):
Expand Down Expand Up @@ -429,7 +445,7 @@ def _cat_common(self, path, start=None, end=None):
return part, None, None

if len(part) == 1:
logger.debug(f"Reference: {path}, whole file")
logger.debug(f"Reference: {path}, whole file => {part}")
url = part[0]
start1, end1 = start, end
else:
Expand Down
3 changes: 1 addition & 2 deletions fsspec/implementations/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ def setup():
try:
yield client
finally:
client.close()
client.shutdown()


def test_basic(cli):

fs = fsspec.filesystem("dask", target_protocol="memory")
assert fs.ls("", detail=False) == ["/afile"]
assert fs.cat("/afile") == b"data"

0 comments on commit 4576ebe

Please sign in to comment.