Skip to content

Add deterministic alternatives for asyncio.wait and asyncio.as_completed #533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ informal introduction to the features and their implementation.
- [Invoking Child Workflows](#invoking-child-workflows)
- [Timers](#timers)
- [Conditions](#conditions)
- [Asyncio and Cancellation](#asyncio-and-cancellation)
- [Asyncio and Determinism](#asyncio-and-determinism)
- [Asyncio Cancellation](#asyncio-cancellation)
- [Workflow Utilities](#workflow-utilities)
- [Exceptions](#exceptions)
- [External Workflows](#external-workflows)
Expand Down Expand Up @@ -550,8 +551,9 @@ Some things to note about the above workflow code:
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
a different signal
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
* Workflow code must be deterministic. This means no threading, no randomness, no external calls to processes, no
network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be deterministic.
* Workflow code must be deterministic. This means no `set` iteration, threading, no randomness, no external calls to
processes, no network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be
deterministic. Also see the [Asyncio and Determinism](#asyncio-and-determinism) section later.
* `@activity.defn` is explained in a later section. For normal simple string concatenation, this would just be done in
the workflow. The activity is for demonstration purposes only.
* `workflow.execute_activity(create_greeting_activity, ...` is actually a typed signature, and MyPy will fail if the
Expand Down Expand Up @@ -678,16 +680,26 @@ Some things to note about the above code:
* A `timeout` can optionally be provided which will throw a `asyncio.TimeoutError` if reached (internally backed by
`asyncio.wait_for` which uses a timer)

#### Asyncio and Cancellation
#### Asyncio and Determinism

Workflows are backed by a custom [asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many
of the common `asyncio` calls work as normal. Some asyncio features are disabled such as:
Workflows must be deterministic. Workflows are backed by a custom
[asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many of the common `asyncio` calls work
as normal. Some asyncio features are disabled such as:

* Thread related calls such as `to_thread()`, `run_coroutine_threadsafe()`, `loop.run_in_executor()`, etc
* Calls that alter the event loop such as `loop.close()`, `loop.stop()`, `loop.run_forever()`,
`loop.set_task_factory()`, etc
* Calls that use anything external such as networking, subprocesses, disk IO, etc

Also, there are some `asyncio` utilities that internally use `set()` which can make them non-deterministic from one
worker to the next. Therefore the following `asyncio` functions have `workflow`-module alternatives that are
deterministic:

* `asyncio.as_completed()` - use `workflow.as_completed()`
* `asyncio.wait()` - use `workflow.wait()`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering about naming these workflow.asyncio_wait() andworkflow.asyncio_as_completed().

The main reason is that workflow.wait() will be confused with workflow.wait_condition(). It may just be a hiccup that they'll get over, but it would be nice not to cause users that hiccup.

Another reason is that workflow.asyncio_xxx() is just directly saying exactly what it is: a shim that is basically exactly the same as the utility from asyncio. It makes it clear that they are utilities for dealing with arbitrary tasks/coroutines rather than being a special sort of as_completed or wait that has to do with waiting or completing Temporal-specific entities.

Copy link
Member Author

@cretz cretz May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered it, but we don't have asyncio_sleep, just sleep. I think these don't need to necessarily be seen as asyncio utilities, they can be seen as workflow utilities that just so happen to be in asyncio as well. I think they work well as workflow utilities just like wait_condition. (same for uuid4() and time() and random() and other Python copies we don't prefix with their originating module name + _)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered it, but we don't have asyncio_sleep, just sleep

I don't really see the analogy there: we don't have workflow.sleep(), instead users just use asyncio.sleep(). Here, users would call asyncio.xxx() but they shouldn't, so we point them to workflow.asyncio_xxx().

workflow.wait_condition() is an extremely important API and I really worry that introducing workflow.wait() will confuse users and damage DX. It's a very similar name to workflow.Await() / Workflow.await() that users may have come across in the Go and Java SDKs, and IDEs will offer it first when users are exploring the workflow namespace. I'd like to wait for other team opinions on this.

Copy link
Member Author

@cretz cretz May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see the analogy there

Sorry, correct, I was thinking of other SDKs that do have sleep and don't prefix it. In Python we are able to reuse sleep. We do have time(), time_ns(), random(), and uuid4() though that are copies of other Python module functions and we don't prefix those.

workflow.wait_condition() is an extremely important API and I really worry that introducing workflow.wait() will confuse users and damage DX

Python has wait and wait_for which are different things too and if they can expect users to understand the difference I think we can too.

I'd like to wait for other team opinions on this

👍


#### Asyncio Cancellation

Cancellation is done the same way as `asyncio`. Specifically, a task can be requested to be cancelled but does not
necessarily have to respect that cancellation immediately. This also means that `asyncio.shield()` can be used to
protect against cancellation. The following tasks, when cancelled, perform a Temporal cancellation:
Expand Down
172 changes: 172 additions & 0 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Callable,
Dict,
Generic,
Iterable,
Iterator,
List,
Mapping,
Expand All @@ -31,6 +32,7 @@
Sequence,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
Expand Down Expand Up @@ -4361,6 +4363,176 @@ def set_dynamic_update_handler(
_Runtime.current().workflow_set_update_handler(None, handler, validator)


def as_completed(
fs: Iterable[Awaitable[AnyType]], *, timeout: Optional[float] = None
) -> Iterator[Awaitable[AnyType]]:
"""Return an iterator whose values are coroutines.

This is a deterministic version of :py:func:`asyncio.as_completed`. This
function should be used instead of that one in workflows.
"""
# Taken almost verbatim from
# https://github.com/python/cpython/blob/v3.12.3/Lib/asyncio/tasks.py#L584
# but the "set" is changed out for a "list" and fixed up some typing/format

if asyncio.isfuture(fs) or asyncio.iscoroutine(fs):
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")

done = asyncio.Queue[Optional[asyncio.Future]]()

loop = asyncio.get_event_loop()
todo: List[asyncio.Future] = [asyncio.ensure_future(f, loop=loop) for f in list(fs)]
timeout_handle = None

def _on_timeout():
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.

def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()

async def _wait_for_one():
f = await done.get()
if f is None:
# Dummy value from _on_timeout().
raise asyncio.TimeoutError
return f.result() # May raise f.exception().

for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()


if TYPE_CHECKING:
_FT = TypeVar("_FT", bound=asyncio.Future[Any])
else:
_FT = TypeVar("_FT", bound=asyncio.Future)


@overload
async def wait( # type: ignore[misc]
fs: Iterable[_FT],
*,
timeout: Optional[float] = None,
return_when: str = asyncio.ALL_COMPLETED,
) -> Tuple[List[_FT], List[_FT]]:
...


@overload
async def wait(
fs: Iterable[asyncio.Task[AnyType]],
*,
timeout: Optional[float] = None,
return_when: str = asyncio.ALL_COMPLETED,
) -> Tuple[List[asyncio.Task[AnyType]], set[asyncio.Task[AnyType]]]:
...


async def wait(
fs: Iterable,
*,
timeout: Optional[float] = None,
return_when: str = asyncio.ALL_COMPLETED,
) -> Tuple:
"""Wait for the Futures or Tasks given by fs to complete.

This is a deterministic version of :py:func:`asyncio.wait`. This function
should be used instead of that one in workflows.
"""
# Taken almost verbatim from
# https://github.com/python/cpython/blob/v3.12.3/Lib/asyncio/tasks.py#L435
# but the "set" is changed out for a "list" and fixed up some typing/format

if asyncio.isfuture(fs) or asyncio.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError("Set of Tasks/Futures is empty.")
if return_when not in (
asyncio.FIRST_COMPLETED,
asyncio.FIRST_EXCEPTION,
asyncio.ALL_COMPLETED,
):
raise ValueError(f"Invalid return_when value: {return_when}")

fs = list(fs)

if any(asyncio.iscoroutine(f) for f in fs):
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")

loop = asyncio.get_running_loop()
return await _wait(fs, timeout, return_when, loop)


async def _wait(
fs: Iterable[Union[asyncio.Future, asyncio.Task]],
timeout: Optional[float],
return_when: str,
loop: asyncio.AbstractEventLoop,
) -> Tuple[List, List]:
# Taken almost verbatim from
# https://github.com/python/cpython/blob/v3.12.3/Lib/asyncio/tasks.py#L522
# but the "set" is changed out for a "list" and fixed up some typing/format

assert fs, "Set of Futures is empty."
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs) # type: ignore[arg-type]

def _on_completion(f):
nonlocal counter
counter -= 1
if (
counter <= 0
or return_when == asyncio.FIRST_COMPLETED
or return_when == asyncio.FIRST_EXCEPTION
and (not f.cancelled() and f.exception() is not None)
):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)

for f in fs:
f.add_done_callback(_on_completion)

try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()
for f in fs:
f.remove_done_callback(_on_completion)

done, pending = [], []
for f in fs:
if f.done():
done.append(f)
else:
pending.append(f)
return done, pending


def _release_waiter(waiter: asyncio.Future[Any], *args) -> None:
# Taken almost verbatim from
# https://github.com/python/cpython/blob/v3.12.3/Lib/asyncio/tasks.py#L467

if not waiter.done():
waiter.set_result(None)


def _is_unbound_method_on_cls(fn: Callable[..., Any], cls: Type) -> bool:
# Python 3 does not make this easy, ref https://stackoverflow.com/questions/3589311
return (
Expand Down
81 changes: 81 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4759,3 +4759,84 @@ async def any_task_completed(handle: WorkflowHandle) -> bool:
# Terminate both
await handle1.terminate()
await handle2.terminate()


@activity.defn(dynamic=True)
async def return_name_activity(args: Sequence[RawValue]) -> str:
return activity.info().activity_type


@workflow.defn
class AsCompletedWorkflow:
@workflow.run
async def run(self) -> List[str]:
# Lazily start 10 different activities and wait for each completed
tasks = [
workflow.execute_activity(
f"my-activity-{i}", start_to_close_timeout=timedelta(seconds=1)
)
for i in range(10)
]

# Using asyncio.as_completed like below almost always fails with
# non-determinism error because it uses sets internally, but we can't
# assert on that because it could _technically_ pass though unlikely:
# return [await task for task in asyncio.as_completed(tasks)]

return [await task for task in workflow.as_completed(tasks)]


async def test_workflow_as_completed_utility(client: Client):
# Disable cache to force replay
async with new_worker(
client,
AsCompletedWorkflow,
activities=[return_name_activity],
max_cached_workflows=0,
) as worker:
# This would fail if we used asyncio.as_completed in the workflow
result = await client.execute_workflow(
AsCompletedWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
assert len(result) == 10


@workflow.defn
class WaitWorkflow:
@workflow.run
async def run(self) -> List[str]:
# Create 10 tasks that return activity names, wait on them, then execute
# the activities
async def new_activity_name(index: int) -> str:
return f"my-activity-{index}"

name_tasks = [asyncio.create_task(new_activity_name(i)) for i in range(10)]

# Using asyncio.wait like below almost always fails with non-determinism
# error because it returns sets, but we can't assert on that because it
# could _technically_ pass though unlikely:
# done, _ = await asyncio.wait(name_tasks)

done, _ = await workflow.wait(name_tasks)
return [
await workflow.execute_activity(
await activity_name, start_to_close_timeout=timedelta(seconds=1)
)
for activity_name in done
]


async def test_workflow_wait_utility(client: Client):
# Disable cache to force replay
async with new_worker(
client, WaitWorkflow, activities=[return_name_activity], max_cached_workflows=0
) as worker:
# This would fail if we used asyncio.wait in the workflow
result = await client.execute_workflow(
WaitWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
assert len(result) == 10