-
Notifications
You must be signed in to change notification settings - Fork 149
Added EnhancedTaskGroup, amap() and as_completed() #890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -156,3 +180,248 @@ def create_task_group() -> TaskGroup: | |||
|
|||
""" | |||
return get_async_backend().create_task_group() | |||
|
|||
|
|||
class TaskHandle(Generic[T]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like a TaskHandle
could be used to implement a create_future()
on the EnhancedTaskGroup
, to have something like asyncio's Future
.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exact addition are you suggesting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to say something like that:
class EnhancedTaskGroup:
def create_future(self) -> TaskHandle[T]:
handle = TaskHandle[T]()
return handle
But now I realize a Future
is just a TaskHandle
, so there's nothing to do?
A bit far-fetched, but what do you think of a free function |
kwargs: Mapping[str, Any] | None = None, | ||
) -> TaskHandle[T]: | ||
handle = TaskHandle[T]() | ||
handle._start_value = await self._task_group.start( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to be able to await the start value in the TaskHandle
, I guess we should wrap start()
with a create_task()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you mean. The start value will already be available in the TaskHandle
once start()
returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah indeed 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that the TaskHandle
was returned immediately, but it's returned only when the task has started. Which means we cannot e.g. cancel the task before it has started. Not sure if that should be allowed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you really need to do that, you can just cancel an outer cancel scope.
On the other hand, that's the point of passing down the taskgroup -- when it's not there we know that the method we're calling won't start things it doesn't wait for. If you want to circumvent that, for whatever reason, there's already a fairly-high-performance way to do it -- set a contextvar. So IMHO "explicit is better than implicit" and thus we shouldn't support that natively. |
6a9bec2
to
33f6919
Compare
I'm -1 on an implicit task group. |
The one thing I'm after is a nursery/TaskGroup where I can start tasks and then iterate over the results asynchronously as soon as they become available so that I can interleave work and don't have to wait for the slowest task to start the next part of the pipeline. With this API, |
And how do we deal with exceptions occurring in the child tasks? |
In my implementation I actually return It would of course be nice to have this built-in, so I'm lurking here to see if I can replace my own custom solution. For me it's important to be able to process tasks as soon as they're finished and to not have to wait for the slowest. |
Ok, so help me understand. What do you suggest |
I think that depends on the use-case. If Given the |
this doesn't help with the refcycles
Changes
Adds an enhanced version of the task group that allows task-by-task cancellation as well as awaiting on the results of individual tasks. Two other convenience functions are also provided:
amap()
: calls the given one-parameter coroutine function with each item from the given iterable of arguments and runs them concurrently in a task grouprace()
launches all given coroutines as tasks in a task group and returns the return value of whichever task completes firstConcurrency and rate limiting is provided by both functions.
Checklist
If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):
tests/
) added which would fail without your patchdocs/
, in case of behavior changes or newfeatures)
docs/versionhistory.rst
).If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.
Updating the changelog
If there are no entries after the last release, use
**UNRELEASED**
as the version.If, say, your patch fixes issue #123, the entry should look like this:
If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.