Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Path.iterdir/glob/rglob methods do not delegate file system calls to a thread #1308

Closed
vxgmichel opened this issue Nov 15, 2019 · 8 comments
Closed

Comments

@vxgmichel
Copy link
Contributor

Notice how the actual iteration is not performed in the executor thread:

trio/trio/_path.py

Lines 56 to 62 in 20da9af

def iter_wrapper_factory(cls, meth_name):
@async_wraps(cls, cls._wraps, meth_name)
async def wrapper(self, *args, **kwargs):
meth = getattr(self._wrapped, meth_name)
func = partial(meth, *args, **kwargs)
items = await trio.to_thread.run_sync(func)
return (rewrap_path(item) for item in items)

Here's a possible fix:

 def iter_wrapper_factory(cls, meth_name): 
     @async_wraps(cls, cls._wraps, meth_name) 
     async def wrapper(self, *args, **kwargs): 
         meth = getattr(self._wrapped, meth_name) 
         func = partial(meth, *args, **kwargs) 
         items = await trio.to_thread.run_sync(lambda: list(func())) 
         return (rewrap_path(item) for item in items) 
@belm0
Copy link
Member

belm0 commented Nov 17, 2019

To clarify, you mean that these methods return a generator object, and potentially some blocking I/O might be done in the main thread during next() calls within the iteration.

The amount of I/O done in the method call vs. subsequent yields could depend on the pathlib implementation, OS, and size of the data and directories involved.

Expanding all the results immediately into a list doesn't sound good.

Use a memory channel to iterate results from the background thread?

@vxgmichel
Copy link
Contributor Author

To clarify, you mean that these methods return a generator object, and potentially some blocking I/O might be done in the main thread during next() calls within the iteration.

Exactly, this is definitely the case for iterdir (see the implementation).

The amount of I/O done in the method call vs. subsequent yields could depend on the pathlib implementation, OS, and size of the data and directories involved.

At the moment every IO call seems to happen during the iteration but I agree that both the method call and the iteration should run in a thread as this implementation might change in the future.

Expanding all the results immediately into a list doesn't sound good.

Apparently this has been discussed in #501 and implemented using list expension in #502.

Then PR #969 came along and got rid of the list expansion, probably by mistake (if I read this comment in #917 correctly).

@belm0
Copy link
Member

belm0 commented Nov 17, 2019

thank you for investigating

I see that iterdir is actually still documented to read everything up front:

Note that it actually loads the whole directory list into memory immediately, during the initial call.

agreed, just fix everything to do that

@vxgmichel
Copy link
Contributor Author

Now that I think about it, there are two separate issues here:

  1. iterdir that could be fixed with list expansion
  2. glob/rglob that could be fixed with list expansion although this is not so great since the subtree is recursively iterated

Ideally, we'd like to be able to do:

async for item in path.iterdir():
    [...]
async for item in path.glob("..."):
    [...]
async for item in path.rglob("..."):
    [...]

This can be easily achieved with the naive implementation:

    @async_wraps(cls, cls._wraps, meth_name)
    async def wrapper(self, *args, **kwargs):
        meth = getattr(self._wrapped, meth_name)
        func = partial(meth, *args, **kwargs)
        items_iter = await trio.to_thread.run_sync(func)
        stop_iteration = object()

        def safe_next():
            try:
                return items_iter.__next__()
            except StopIteration:
                return stop_iteration

        while True:
            item = await trio.to_thread.run_sync(safe_next)
            if item is stop_iteration:
                return
            yield rewrap_path(item)

However, it would be quite slow as pointed by @njsmith in this comment.

You did mention a more efficient approach:

Use a memory channel to iterate results from the background thread

Here's a possible implementation:

    @async_wraps(cls, cls._wraps, meth_name)
    async def wrapper(self, *args, **kwargs):
        meth = getattr(self._wrapped, meth_name)
        send_channel, receive_channel = trio.open_memory_channel(1)

        def run_in_thread():
            try:
                trio.from_thread.run(send_channel.__aenter__)
                for item in meth(*args, **kwargs):
                    trio.from_thread.run(send_channel.send, item)
            finally:
                trio.from_thread.run(send_channel.__aexit__)

        async with trio.open_nursery() as nursery:
            nursery.start_soon(trio.to_thread.run_sync, run_in_thread)
            async for item in receive_channel:
                yield rewrap_path(item)

... except this falls into the nursery-inside-generator trap.

In any case I think it's an interesting problem for trio in general, Is there another approach that I missed?

@vxgmichel
Copy link
Contributor Author

vxgmichel commented Nov 18, 2019

I ended up benchmarking several ways of wrapping an iterator into an async iterator:

1.77s call     test_trio.py::test_to_aiter[run each next call in a separate thread]
1.29s call     test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=inf)]
1.28s call     test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=0)]
1.28s call     test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=1)]
0.45s call     test_trio.py::test_to_aiter[run the iteration and sleep(0) between each item]
0.02s call     test_trio.py::test_to_aiter[run iteration in a single thread with batches]
0.01s call     test_trio.py::test_to_aiter[run each batch iteration in a separate thread]
0.00s call     test_trio.py::test_to_aiter[run the iteration in a thread and return a list]
0.00s call     test_trio.py::test_to_aiter[run the iteration in a blocking way]

This made me realize that performing a context switch for each item is also a bottleneck (notice the result of 0.45s for run the iteration and sleep(0) between each item)

Maybe an acceptable compromise is to use the following implementation:

import trio
import time

BATCH_TIME_LIMIT = 0.001
BATCH_LENGTH_LIMIT = 1000


async def to_aiter(fn, *args, **kwargs):

    def instantiate():
        return iter(fn(*args, **kwargs))

    items_iter = await trio.to_thread.run_sync(instantiate)

    def run_batch():
        batch = []
        deadline = time.time() + BATCH_TIME_LIMIT
        while time.time() < deadline and len(batch) < BATCH_LENGTH_LIMIT:
            try:
                batch.append((next(items_iter), None))
            except Exception as exc:
                batch.append((None, exc))
                break
        return batch

    while True:
        batch = await trio.to_thread.run_sync(run_batch)
        for result, exception in batch:
            if isinstance(exception, StopIteration):
                return
            if exception is not None:
                raise exception
            yield result

I think it would work fine for glob and rglob (and probably wouldn't hurt for iterdir). What do you guys think?

@belm0
Copy link
Member

belm0 commented Nov 18, 2019

I'd suggest not using time: 1 ms has different meaning for mechanical vs. solid state storage, and for a computer today vs. 20 years from now. Also very difficult to deal with wall time in automated tests (free CI tends to run on VM and can stall arbitrarily).

As a compromise for not watching time, I'd use a batch size of 100. Being 100x faster than the naive implementation seems good enough.

There might be some opportunity to use the outcome API? I haven't used it myself but it has some ability to capture function calls and send them to a generator. https://outcome.readthedocs.io/en/latest/api.html#api-reference

@vxgmichel
Copy link
Contributor Author

vxgmichel commented Nov 19, 2019

I'd suggest not using time: 1 ms has different meaning for mechanical vs. solid state storage, and for a computer today vs. 20 years from now. Also very difficult to deal with wall time in automated tests (free CI tends to run on VM and can stall arbitrarily).

That makes sense. The reason I used both the time limit and the length limit is because I wanted it to be able to handle different kinds iterators such as range(x), but also:

def slow_range(*args):
    for x in range(*args):
        time.sleep(1)
        yield x

In this example the time-limited implementation would produce a batch of size 1 every second, instead of waiting 100 seconds before producing the first value. In the case of glob/rglob, this would be the equivalent of browsing a subtree in a slow file system where running os.listdir on a directory takes a second. You'd have to wait for a hundred files to be found before getting the first item on the trio side.

That being said, I agree that 1 ms is arbitrary. Ideally we'd want this threshold to be a few times the thread synchronization cost. On my machine the full round trip is about 200 us so a threshold of 1ms made sense. In general it could be computed like this:

    start = time.time()
    await trio.to_thread.run_sync(lambda: None)
    threshold = 5 * (time.time() - start)

As a compromise for not watching time, I'd use a batch size of 100. Being 100x faster than the naive implementation seems good enough.

I agree, that should be enough for the case of iterdir, glob and rglob. Maybe the generic problem of wrapping a blocking iterator should go into a separate issue.

There might be some opportunity to use the outcome API? I haven't used it myself but it has some ability to capture function calls and send them to a generator. https://outcome.readthedocs.io/en/latest/api.html#api-reference

I thought about it but the captured result can only be used once. The problem is that if next raises an exception, we probably want to yield the batch with the exception instead of calling next over and over until the batch is full.

I'll wait for your feedback and make a PR if that's ok with you :)

@vxgmichel
Copy link
Contributor Author

vxgmichel commented Dec 20, 2019

Fixed in #1341! I'll take my thoughts about wrapping blocking iterators to another issue. UPDATE: See #1344

vxgmichel added a commit to vxgmichel/trio that referenced this issue Dec 20, 2019
pquentin added a commit that referenced this issue Dec 20, 2019
Add missing newfragment for issue #1308
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants