Skip to content

Added EnhancedTaskGroup, amap() and as_completed() #890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

agronholm
Copy link
Owner

Changes

Adds an enhanced version of the task group that allows task-by-task cancellation as well as awaiting on the results of individual tasks. Two other convenience functions are also provided:

  • amap(): calls the given one-parameter coroutine function with each item from the given iterable of arguments and runs them concurrently in a task group
  • race() launches all given coroutines as tasks in a task group and returns the return value of whichever task completes first

Concurrency and rate limiting is provided by both functions.

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):

  • You've added tests (in tests/) added which would fail without your patch
  • You've updated the documentation (in docs/, in case of behavior changes or new
    features)
  • You've added a new changelog entry (in docs/versionhistory.rst).

If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.

Updating the changelog

If there are no entries after the last release, use **UNRELEASED** as the version.
If, say, your patch fixes issue #123, the entry should look like this:

- Fix big bad boo-boo in task groups
  (`#123 <https://github.com/agronholm/anyio/issues/123>`_; PR by @yourgithubaccount)

If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.

@@ -156,3 +180,248 @@ def create_task_group() -> TaskGroup:

"""
return get_async_backend().create_task_group()


class TaskHandle(Generic[T]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a TaskHandle could be used to implement a create_future() on the EnhancedTaskGroup, to have something like asyncio's Future.
What do you think?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exact addition are you suggesting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say something like that:

class EnhancedTaskGroup:
    def create_future(self) -> TaskHandle[T]:
        handle = TaskHandle[T]()
        return handle

But now I realize a Future is just a TaskHandle, so there's nothing to do?

@davidbrochart
Copy link
Contributor

A bit far-fetched, but what do you think of a free function create_task() that would use the current EnhancedTaskGroup if there is one, and errors out otherwise?
On one hand this is going against structured concurrency, but on the other hand it removes the need to pass a task group down the stack, when you know there must be one.

kwargs: Mapping[str, Any] | None = None,
) -> TaskHandle[T]:
handle = TaskHandle[T]()
handle._start_value = await self._task_group.start(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to be able to await the start value in the TaskHandle, I guess we should wrap start() with a create_task()?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean. The start value will already be available in the TaskHandle once start() returns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah indeed 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the TaskHandle was returned immediately, but it's returned only when the task has started. Which means we cannot e.g. cancel the task before it has started. Not sure if that should be allowed?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really need to do that, you can just cancel an outer cancel scope.

@smurfix
Copy link
Collaborator

smurfix commented Mar 19, 2025

it removes the need to pass a task group down the stack,

On the other hand, that's the point of passing down the taskgroup -- when it's not there we know that the method we're calling won't start things it doesn't wait for.

If you want to circumvent that, for whatever reason, there's already a fairly-high-performance way to do it -- set a contextvar. So IMHO "explicit is better than implicit" and thus we shouldn't support that natively.

@agronholm agronholm changed the title Added EnhancedTaskGroup, amap() and race() Added EnhancedTaskGroup, amap() and as_completed() Mar 20, 2025
@agronholm agronholm force-pushed the enhanced-taskgroup branch from 6a9bec2 to 33f6919 Compare March 20, 2025 17:27
@agronholm
Copy link
Owner Author

A bit far-fetched, but what do you think of a free function create_task() that would use the current EnhancedTaskGroup if there is one, and errors out otherwise? On one hand this is going against structured concurrency, but on the other hand it removes the need to pass a task group down the stack, when you know there must be one.

I'm -1 on an implicit task group.

@dhirschfeld
Copy link

The one thing I'm after is a nursery/TaskGroup where I can start tasks and then iterate over the results asynchronously as soon as they become available so that I can interleave work and don't have to wait for the slowest task to start the next part of the pipeline.

With this API, race is trivial to implement - you just cancel the TaskGroup as soon as the first task is made available. gather is just collect all results in a list of length n_inputs in the order they were provided then return the results list when the TaskGroup exits.

@agronholm
Copy link
Owner Author

With this API, race is trivial to implement

And how do we deal with exceptions occurring in the child tasks?

@dhirschfeld
Copy link

dhirschfeld commented Apr 15, 2025

With this API, race is trivial to implement

And how do we deal with exceptions occurring in the child tasks?

In my implementation I actually return Outcome instances, so the user processes the Outcome's as they're made available. It's then up to the user to decide what to do with any errors. If they blindly unwrap an error it will raise and cancel the TaskGroup.

It would of course be nice to have this built-in, so I'm lurking here to see if I can replace my own custom solution. For me it's important to be able to process tasks as soon as they're finished and to not have to wait for the slowest.

@agronholm
Copy link
Owner Author

Ok, so help me understand. What do you suggest race() returns then? What if the first result from a child task is an exception? Do you want to return the Outcome (or equivalent) of that?

@dhirschfeld
Copy link

I think that depends on the use-case. If amap returned an asynchronous iterator that returned Outcome wrapped results as soon as they were ready I'd leave it at that and let users implement race themselves with the semantics that made sense for their problem.

Given the amap primitive, race becomes trivial to implement however you want so there's no need to provide an implementation which may not be ideal for all use-cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants