Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[discussion] Mechanism for "sharing" tasks between different branches of the task tree #266

Open
njsmith opened this issue Aug 4, 2017 · 11 comments

Comments

@njsmith
Copy link
Member

njsmith commented Aug 4, 2017

Use case 1: there's an expensive idempotent operation that multiple tasks might want to call. Examples within trio itself would include getaddrinfo for a particular host, or WaitForSingleObject on a single object (as needed for e.g. the windows version of Popen.wait). If two tasks make the same call at the same time, then we'd like to coalesce those into a single underlying call and then report the result back to both callers.

Use case 2: there are multiple user-level objects that underneath are multiplexed onto a single object. (For example: multiple HTTP/2 channels on top of a single TCP connection.) Now you need the invariant: whenever at least one task is blocked in await channel.receive_some(), there needs to be a task reading from the underlying TCP connection -- but there should never be more than one task receiving from the underlying TCP connection. In some cases the right solution to this is probably to create one background task to handle receiving from the TCP connection. For example, in HTTP/2 it's expected that you can respond to pings at any time, even if all the logical channels are paused, so HTTP/2 isn't really a good example here -- the TCP receiver needs to be running constantly for as long as the connection is open. But in other protocols you might want to only receive on the TCP channel when there are tasks waiting for data. (IIRC something like this shows up in ZeroMQ's API, or the SSH protocol has logical channels but I don't think it has any pings.) In this case it's possible in principle to have the first task that arrives in channel.receive_some initiate a tcp_transport.receive_some, and later tasks block waiting for it, and then if the first task gets cancelled then it hands off the job of calling tcp_transport.receive_some to someone else.... but this is super complicated. It would be nice if there were some standard way for all the tasks in channel.receive_some to simply "share" a single call to tcp_transport.receive_some (or more realistically: protocol_object.pump_underlying_transport).

The interesting thing about these is that they don't fit well into trio's nursery system, BUT they're still possible to do without violating the invariants that the nursery system was created to enforce -- in particular, if the shared call crashes, then we have somewhere to report that. (Though there might be a minor issue with throwing the same exception into multiple receivers and still getting a sensible traceback -- maybe Error.unwrap should copy.copy the exception object before raising it?)

I was originally thinking about something like allowing tasks to be in a special "coparented" state, where multiple nurseries supervise them at the same time. But this gets complicated quickly: you need special cases in the nursery code, and there are issues like... if this is a background task providing a service to the code in this nursery (and also other nurseries), then do we need to make sure that it stays alive as long as the non-shared tasks? So should we shield it from cancellation until after all the other tasks exit? That seems tough.

I think a better idea is: don't model it as a task, just model it as a call (and it might be combined with other identical calls). This is exactly what you want for "use case 1" anyway, not a background task. For "use case 2", there's some subtlety because when the protocol pump receives some data on channel 7, it probably shouldn't exit immediately and wake everyone up; instead it just wants to wake up the task that was waiting for data on channel 7. But this can be done by having that task spawn one task to run the pump, and a second task to wait on a Queue or whatever, and then when the second task gets woken by the pump it cancels the first task, which either leaves the shared task to keep pumping on behalf of other tasks, or else if this was the only task waiting on a channel then it cancels the pump task. (So obviously the pump task needs to be cancel-safe, but that's probably needed anyway.)

Maybe:

call = CoalescableCall(afn, *args)
await call.run()

Attributes: call.started, call.finished, call.cancelled (I guess we want to not hold onto the result, and make run an error if finished or cancelled?). And we punt to the user to figure out how they want to pass these things between tasks, what key to use to look it up, etc.

CoalescableCall is a terrible name. CombinedRun?

Bonus: doing it this way should actually be pretty straightforward using spawn_system_task. (It's sort of like run_in_trio_thread.)

Prior art / inspiration: Flatt & Findler (2004), Kill-safe synchronization abstractions. They're worried about cases like a queue that has a worker task attached to it, and then is accessed from multiple tasks; if a task that's doing queue.put gets cancelled, you don't want this to kill the worker thread or allow the queue object to be gc'ed, because it might leave the object in an inconsistent state. OTOH if both the queue.put and the queue.get tasks get cancelled, then it's ok (in fact mandatory) to kill the worker thread and gc the queue object. So they define a kind of co-parenting primitive.

@njsmith
Copy link
Member Author

njsmith commented Aug 10, 2017

@njsmith
Copy link
Member Author

njsmith commented Aug 16, 2017

Something this API will want to think about: how to handle TaskLocal inheritance. Probably the rule is just that these tasks don't inherit locals. See also #289.

njsmith added a commit to njsmith/trio that referenced this issue Aug 21, 2017
See python-triogh-266

This is surprisingly interesting and tricky.
njsmith added a commit to njsmith/trio that referenced this issue Aug 21, 2017
See python-triogh-266

This is surprisingly interesting and tricky.
@cjerdonek
Copy link
Contributor

Haha, found a rare datetime bug :)
github

@cjerdonek
Copy link
Contributor

cjerdonek commented Nov 5, 2017

Would use of this feature (at least for use case 1) imply that the return value is cached for some (configurable) period of time? It would seem a bit arbitrary to me if a call takes exactly 1 second, and if a second call comes 0.99 seconds later, it would return the value for the first call, whereas if it came 1.01 seconds later, it would trigger a new call. If 1.01 seconds is long enough to want a new call, it seems like you'd want the one 0.99 seconds later to trigger a new call as well. Basically, I'm suggesting that the "staleness" threshold be provided by the user rather than using however long the first call takes, which could vary.

@njsmith
Copy link
Member Author

njsmith commented Dec 14, 2017

@cjerdonek Whoops, sorry I missed this comment! My assumption was that if you also want some traditional caching, that's something that's pretty easy to implement given a basic coalsed-call primitive? And there are tons of possible caching strategies you might want (fixed size with some replacement policy like LRU or ARC, fixed time like in your comment, ...), so better if our primitive doesn't bake it in? Really I'm not sure though -- I don't know how people will use actually use this if we make it :-).

@njsmith
Copy link
Member Author

njsmith commented Apr 26, 2018

Sudden insight:

In the simplest case, nurseries are a way to make tasks lexically scoped.

If you pass nursery objects around (the "escape hatch"), then this gives you arena-scoped tasks (where the arena itself has to be lexically scoped).

What we're talking about in this issue is reference-counted tasks (where the references have to be lexically scoped).

So basically, all the standard ways to deterministically manage object lifetime also apply to tasks, with the extra proviso that they need to be stack-rooted.

Are there any other deterministic heap management strategies I'm forgetting about?

Is there any value in having "strong" vs "weak" task sharing?

@N-Coder
Copy link

N-Coder commented May 20, 2018

Taking a completely different perspective on this issue, there is a library doing something similar for sync IO, dogpile.cache:

dogpile provides the concept of a “dogpile lock”, a control structure which allows a single thread of execution to be selected as the “creator” of some resource, while allowing other threads of execution to refer to the previous version of this resource as the creation proceeds; if there is no previous version, then those threads block until the object is available.

dogpile.cache is a caching API which provides a generic interface to caching backends of any variety, and additionally provides API hooks which integrate these cache backends with the locking mechanism of dogpile.

The concept of this Lock (even though I think it's kind of a misnomer) might be interesting here, as it provides some abstraction of the sharing of tasks without being biased towards any caching it would be used for. The Lock takes a creator function returning one specific value (with possible arguments already applied as partial) and a cache retrieval function for reacquiring a previous value, without imposing any restrictions on the implementation details of the 'cache' (for more details see the linked code).

A CacheRegion then combines methods for wrapping function calls, generating keys from arguments, managing the per-key Locks and controlling the execution of tasks together with interchangeable cache backends.

It might be helpful to take a more detailed look at its architecture and design, as it provides a very good abstraction of the problem with the presence of a multitude of different caching paradigms.
If we were to write an async equivalent of the Lock it would probably be very easy to async-ify the rest of dogpile.cache and combine it with backends from e.g. aiocache (or something trio-compatible). This could then cover a lot of different use-cases, including the two you listed. If somebody needs caching behaviour that isn't covered by our version of dogpile.cache, it would still be easy to implement that on top of the generic Lock.

The only thing I'm missing in dogpile.cache (in addition to async support) is the possibility to customize the behaviour when a second execution comes in when a first one is already running: to either return the old value, wait for the current execution, raise a marker exception etc.

@njsmith
Copy link
Member Author

njsmith commented May 20, 2018

@N-Coder oh awesome, thanks for the link – looking at prior art is super helpful for this kind of discussion

@njsmith
Copy link
Member Author

njsmith commented May 25, 2018

Another real use case that came up in chat today: an async version of itertools.tee would want to use this when advancing the underlying iterator, because cancelling one of the tee'd iterators shouldn't break the underlying iterator.

@N-Coder
Copy link

N-Coder commented May 27, 2018

I made my own attempt at implementing this functionality as generic as possible and ended up with the following snippet, which is somewhat in between the dogpile.cache Lock and the SharedTask in your PR:

@attr.s()
class CacheValue(object):
    value = attr.ib()  # type: Result
    creator = attr.ib()  # type: Callable
    create_lock = attr.ib()  # type: Lock

    def is_set(self):
        return self.value is not None

    def is_creating(self):
        return self.create_lock.locked()

    async def get_or_create(self):
        if not self.is_set():
            async with self.create_lock:
                if not self.is_set():
                    self.value = await Result.acapture(self.creator())

        return self.value.unwrap()

    async def reset(self):
        async with self.create_lock:
            self.value = None

Basically, you need some variant of the following functionality:

  • some kind of per-value synchronization primitives (i.e. Event or Lock, possibly created lazily using a global Lock, but possibly also inter-process synchronization)
  • a way of capturing / caching a single outcome (e.g. Result.acapture), with various customizations:
    • immediately discarding certain results, e.g, (certain) exceptions, invalid values, etc.
    • storing the result externally (e.g. DBM file, redis, memcached,...)
    • expiring values immediately after creation (so that only concurrent calls will be coalesced) or after some timeout after its creation or last access (resulting in a time or size limited cache)
    • retrying on invalid results
    • further cache-related features... (compare e.g. the Guava Cache for Java)
  • cancellation handling (propagate the exception to all waiters / only the first one and let the second one retry / ...)
  • a way of mapping function calls (i.e. function + self + *args + **kwargs) to cache keys

With all the different use-cases and requirements that may arise I'm unsure whether a really generic implementation in the trio library would be that easy to make. Maybe some examples for basic functionality (like my snippet above, with some more thoughts and explanations put into cancellation) plus libraries for more targeted use-cases (like aiocache, which actually provides a synchronized cached_stampede decorator -- which still seems to have cancellation problems though) would be a better approach.

For my use-case, I'll probably go with some combination of pyfailsafe and the aiocache cached_stampede, using the in-memory backend.

@njsmith
Copy link
Member Author

njsmith commented Sep 2, 2018

A note on exception handling: #303 (comment)

Further note: injected cancellation exceptions will need some careful handling – we don't want to convert them into SharedTaskErrors, but we also don't want to let them propagate – we want to convert them into the local kind of cancellation. I guess try: checkpoint() except BaseException as exc: exc.__cause__ = shared_task_exc; raise? Or maybe this is another of those cases where it would be better to let Cancelleds move between scopes, as long as both are cancelled? Though KeyboardInterrupt is tricky as usual. Actually, this is identical to the situation in #606... What we want to do is somehow hand off the cancellation to the system task, and then I guess if we're going with wrapped exceptions to handle the traceback corruption problem, then we'd special-case Cancelled and KeyboardInterrupt to pass through without being wrapped, which is OK since in the case where we've handed off cancellation, we know that we're the only one waiting for this task to finish, so unwrapped exceptions won't get mangled. ...Though KeyboardInterrupt is a problem for the wrapping approach in general, because KeyboardInterrupt can legitimately happen when there are multiple tasks waiting for the outcome of a shared task, and you don't really want to convert KeyboardInterrupt into a SharedTaskException! Maybe copy.copy is the lesser evil...?


An interesting feature of dogpile.cache is that if a task is already working on refreshing a key, then new tasks looking for that key simply get the old key. (And if there is no old key, of course, then they all wait for it.) This suggests that there should be a way to query whether a task is currently running to do something.

(Of course, in real life dogpile.cache coordinates the refresh using an out-of-process cache, so only one process needs to try to recreate the value, and it wouldn't use this feature at all.)

The core thing that makes this API tricky is that it's fundamentally a reference-counted API, so we need some way to be clear about where we acquire and release references. And when you release a reference, that might need to block waiting for the task to be cancelled. A simple CoalescableCall object like in my first comment is not great, because you could check call.running and it says True so you're like cool let me join in! call.run() → Error it already finished. TOCTTOU. Or for the dogpile.cache case, a task sees that there's already a task working on it... but whoops actually that task got cancelled. Though I guess in that particular case it would actually be OK because it just means one extra task returns a stale value, and then the next task that comes along will refresh it. OK, the worse case for dogpile.cache is: two tasks both observe that no task is running, so they both try to start one.

So any implementation needs some way to atomically: find any other instance of the task that we want to join up with, and if one exists then join it, or otherwise create one and register it for others to find. And we need to atomically unregister it when the last task cancels out. So by far the most natural way to do this is to have a single object that holds a dict internally, and combines the (dict lookup, dict mutation, actually running the task) steps into single operations. Which is what #303 does. But maybe it also needs a way to query whether a task is currently running under a given key and then... what? For dogpile it's either launch a task or not. I guess we could have a run(..., only_if_not_already_running=True) operation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants