Skip to content

Commit 0f2c4da

Browse files
feat: use only loop executor for fsspec source (#999)
* install optional fsspec backends in the CI for some python versions * update submit interface * annotation * do not use named arguments for the path/url * Future init * add s3fs and sshfs as test dependencies (fsspec beckends) * test fsspec s3 for more combination of parameters * remove pip install from ci * style: pre-commit fixes * revert test order * remove dependencies as a test * add s3fs to test * exclude s3fs to python version 3.12 * add sshfs test (skipped) * fix pytest * asyncio not available in 3.12 * asyncio not available in 3.12 * add comment for fsspec threads * attempt to close resources * handle s3fs case separate for now * attempt to pass tests * attempt to pass tests * simplified * remove support for use_threads option, run non-async fs in threads using asyncio * stop the loop on resource shutdown * add skip for xrootd due to server issues * remove skip for xrootd * remove shutdown * understand ci fail --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 3341ce0 commit 0f2c4da

File tree

3 files changed

+72
-63
lines changed

3 files changed

+72
-63
lines changed

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,11 @@ dev = [
6060
test = [
6161
"lz4",
6262
"minio",
63-
"aiohttp; python_version<\"3.12\"",
63+
"aiohttp; python_version<\"3.12\"", # asyncio not available
6464
"fsspec",
6565
"fsspec-xrootd",
66+
"s3fs; python_version<\"3.12\"", # asyncio not available
67+
"sshfs; python_version<\"3.12\"", # asyncio not available
6668
"pytest>=6",
6769
"pytest-timeout",
6870
"pytest-rerunfailures",

src/uproot/source/fsspec.py

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -24,44 +24,19 @@ class FSSpecSource(uproot.source.chunk.Source):
2424
"""
2525

2626
def __init__(self, file_path: str, **options):
27-
import fsspec.asyn
2827
import fsspec.core
2928

3029
default_options = uproot.reading.open.defaults
31-
self._use_threads = options.get("use_threads", default_options["use_threads"])
32-
self._num_workers = options.get("num_workers", default_options["num_workers"])
33-
# Add the possibility to set use_async directly as a hidden option.
34-
# It is not encouraged to do so but may be useful for testing purposes.
35-
self._use_async = options.get("use_async", None) if self._use_threads else False
36-
37-
# TODO: is timeout always valid?
3830

39-
# Remove uproot-specific options (should be done earlier)
4031
exclude_keys = set(default_options.keys())
4132
storage_options = {k: v for k, v in options.items() if k not in exclude_keys}
4233

4334
protocol = fsspec.core.split_protocol(file_path)[0]
44-
fs_has_async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl
45-
# If not explicitly set (default), use async if possible
46-
self._use_async = (
47-
fs_has_async_impl if self._use_async is None else self._use_async
48-
)
49-
if self._use_async and not fs_has_async_impl:
50-
# This should never be triggered unless the user explicitly set the `use_async` flag for a non-async backend
51-
raise ValueError(f"Filesystem {protocol} does not support async")
52-
53-
if not self._use_threads:
54-
self._executor = uproot.source.futures.TrivialExecutor()
55-
elif self._use_async:
56-
self._executor = FSSpecLoopExecutor(fsspec.asyn.get_loop())
57-
else:
58-
self._executor = concurrent.futures.ThreadPoolExecutor(
59-
max_workers=self._num_workers
60-
)
35+
self._async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl
36+
self._executor = FSSpecLoopExecutor()
6137

6238
self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **storage_options)
6339

64-
# TODO: set mode to "read-only" in a way that works for all filesystems
6540
self._file = self._fs.open(self._file_path)
6641
self._fh = None
6742
self._num_requests = 0
@@ -148,28 +123,49 @@ def chunks(
148123
self._num_requested_chunks += len(ranges)
149124
self._num_requested_bytes += sum(stop - start for start, stop in ranges)
150125

126+
try:
127+
# not available in python 3.8
128+
to_thread = asyncio.to_thread
129+
except AttributeError:
130+
import contextvars
131+
import functools
132+
133+
async def to_thread(func, /, *args, **kwargs):
134+
loop = asyncio.get_running_loop()
135+
ctx = contextvars.copy_context()
136+
func_call = functools.partial(ctx.run, func, *args, **kwargs)
137+
return await loop.run_in_executor(None, func_call)
138+
139+
async def async_wrapper_thread(blocking_func, *args, **kwargs):
140+
if not callable(blocking_func):
141+
raise TypeError("blocking_func must be callable")
142+
# TODO: when python 3.8 is dropped, use `asyncio.to_thread` instead (also remove the try/except block above)
143+
return await to_thread(blocking_func, *args, **kwargs)
144+
151145
chunks = []
152146
for start, stop in ranges:
153147
# _cat_file is async while cat_file is not.
154-
# Loop executor takes a coroutine while ThreadPoolExecutor takes a function.
155-
future = self._executor.submit(
156-
self._fs._cat_file if self._use_async else self._fs.cat_file,
157-
# it is assumed that the first argument is the file path / url (can have different names: 'url', 'path')
158-
self._file_path,
159-
start=start,
160-
end=stop,
148+
coroutine = (
149+
self._fs._cat_file(self._file_path, start=start, end=stop)
150+
if self._async_impl
151+
else async_wrapper_thread(
152+
self._fs.cat_file, self._file_path, start=start, end=stop
153+
)
161154
)
155+
156+
future = self._executor.submit(coroutine)
157+
162158
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
163159
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
164160
chunks.append(chunk)
165161
return chunks
166162

167163
@property
168-
def use_async(self) -> bool:
164+
def async_impl(self) -> bool:
169165
"""
170166
True if using an async loop executor; False otherwise.
171167
"""
172-
return self._use_async
168+
return self._async_impl
173169

174170
@property
175171
def num_bytes(self) -> int:
@@ -190,13 +186,13 @@ def closed(self) -> bool:
190186

191187

192188
class FSSpecLoopExecutor(uproot.source.futures.Executor):
193-
def __init__(self, loop: asyncio.AbstractEventLoop):
194-
self.loop = loop
189+
@property
190+
def loop(self) -> asyncio.AbstractEventLoop:
191+
import fsspec.asyn
192+
193+
return fsspec.asyn.get_loop()
195194

196-
def submit(self, coroutine, /, *args, **kwargs) -> concurrent.futures.Future:
197-
if not asyncio.iscoroutinefunction(coroutine):
195+
def submit(self, coroutine) -> concurrent.futures.Future:
196+
if not asyncio.iscoroutine(coroutine):
198197
raise TypeError("loop executor can only submit coroutines")
199-
if not self.loop.is_running():
200-
raise RuntimeError("cannot submit coroutine while loop is not running")
201-
coroutine_object = coroutine(*args, **kwargs)
202-
return asyncio.run_coroutine_threadsafe(coroutine_object, self.loop)
198+
return asyncio.run_coroutine_threadsafe(coroutine, self.loop)

tests/test_0692_fsspec.py

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,13 @@
88
import queue
99

1010

11-
@pytest.mark.parametrize("use_threads", [True, False])
12-
def test_open_fsspec_http(server, use_threads):
11+
def test_open_fsspec_http(server):
1312
pytest.importorskip("aiohttp")
1413

1514
url = f"{server}/uproot-issue121.root"
16-
1715
with uproot.open(
1816
url,
1917
handler=uproot.source.fsspec.FSSpecSource,
20-
use_threads=use_threads,
2118
) as f:
2219
data = f["Events/MET_pt"].array(library="np")
2320
assert len(data) == 40
@@ -36,51 +33,65 @@ def test_open_fsspec_github():
3633
assert len(data) == 40
3734

3835

39-
@pytest.mark.parametrize("use_threads", [True, False])
40-
def test_open_fsspec_local(use_threads):
36+
def test_open_fsspec_local():
4137
local_path = skhep_testdata.data_path("uproot-issue121.root")
4238

4339
with uproot.open(
4440
local_path,
4541
handler=uproot.source.fsspec.FSSpecSource,
46-
use_threads=use_threads,
4742
) as f:
4843
data = f["Events/MET_pt"].array(library="np")
4944
assert len(data) == 40
5045

5146

5247
@pytest.mark.network
53-
def test_open_fsspec_s3():
48+
@pytest.mark.parametrize(
49+
"handler",
50+
[
51+
# uproot.source.fsspec.FSSpecSource,
52+
uproot.source.s3.S3Source,
53+
None,
54+
],
55+
)
56+
def test_open_fsspec_s3(handler):
5457
pytest.importorskip("s3fs")
5558

5659
with uproot.open(
5760
"s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst",
5861
anon=True,
59-
handler=uproot.source.fsspec.FSSpecSource,
62+
handler=handler,
6063
) as f:
6164
data = f["Event/Event.mEventId"].array(library="np")
6265
assert len(data) == 8004
6366

6467

68+
@pytest.mark.parametrize("handler", [uproot.source.fsspec.FSSpecSource, None])
69+
@pytest.mark.skip("you must provide an ssh server to test this")
70+
def test_open_fsspec_ssh(handler):
71+
pytest.importorskip("sshfs")
72+
73+
# change this to a server you have access to
74+
uri = "ssh://user@host:22/tmp/file.root"
75+
with uproot.open(uri, handler=handler) as f:
76+
data = f["Events/MET_pt"].array(library="np")
77+
assert len(data) == 40
78+
79+
6580
@pytest.mark.network
6681
@pytest.mark.xrootd
6782
@pytest.mark.parametrize(
68-
"handler, use_threads",
83+
"handler",
6984
[
70-
(uproot.source.fsspec.FSSpecSource, True),
71-
(uproot.source.fsspec.FSSpecSource, False),
72-
(uproot.source.xrootd.XRootDSource, True),
73-
(uproot.source.xrootd.XRootDSource, False),
74-
(None, True),
75-
(None, False),
85+
uproot.source.fsspec.FSSpecSource,
86+
uproot.source.xrootd.XRootDSource,
87+
None,
7688
],
7789
)
78-
def test_open_fsspec_xrootd(handler, use_threads):
90+
def test_open_fsspec_xrootd(handler):
7991
pytest.importorskip("XRootD")
8092
with uproot.open(
8193
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
8294
handler=handler,
83-
use_threads=use_threads,
8495
) as f:
8596
data = f["Events/run"].array(library="np", entry_stop=20)
8697
assert len(data) == 20

0 commit comments

Comments
 (0)