Skip to content

[Bug] asyncio.wait is non-deterministic when used with coroutines instead of tasks #429

Closed
@ntessman-capsule

Description

@ntessman-capsule

What are you really trying to do?

Split a list of incoming events into concurrently executed activity streams. The repro below mimics a stripped down version of the original code structure.

Describe the bug

When using asyncio.wait() with a list of async method calls, which contain a series of activity executions, an error can occur which results in later activities receiving the return values of different concurrent executions.

Here is a table to illustrate what I mean. Given an activity that executes f(x) -> x, if we pass the numbers 1-3 concurrently, we might see this:

Activity Input Expected Output Actual Output
1 1 1
2 2 3
3 3 2

This issue is random, and rare. The reproduction below is designed to maximize the chance of running into it.

Minimal Reproduction

Below is a self-contained reproduction of the issue. Using this input should encounter the error fairly consistently, a little less than once per run:

{"execution_iterations": 20,"activity_iterations": 5,"concurrency": 100,"wait_time": 0.1}

The temporal setup I used to reproduce this issue is a vanilla Temporal server install running via temporal server start-dev, with two worker instances running the below python file using python -m main. I primarily tested using Python 3.8.

When there is a mismatch, it is printed to the console.

import asyncio
from datetime import timedelta
from typing import List
from temporalio import workflow, activity
from temporalio.worker import Worker
from temporalio.client import Client

from dataclasses import dataclass

@dataclass
class EchoRequest:
    execution_iteration: int
    activity_iteration: int
    input: str
    wait_time: float

@dataclass
class EchoResponse:
    output: str

@activity.defn(name="echo")
async def echo(
    echo: EchoRequest
) -> EchoResponse:
    await asyncio.sleep(echo.wait_time)
    return EchoResponse(output=echo.input)


@dataclass
class RaceConditionIteration:
    execution_id: int
    iterations: int
    wait_time: float


@dataclass
class RaceConditionTestInput:
    execution_iterations: int
    activity_iterations: int
    concurrency: int
    wait_time: float


@activity.defn(name="race_condition_get_input_events")
async def race_condition_get_input_events(
    input: RaceConditionTestInput
) -> List[RaceConditionIteration]:
    return [
        RaceConditionIteration(
            execution_id=id,
            iterations=input.activity_iterations,
            wait_time=input.wait_time,
        )
        for id in range(input.concurrency)
    ]


@workflow.defn(name="RaceConditionTestWorkflow")
class RaceConditionTestWorkflow:
    @workflow.run
    async def run(self, event: RaceConditionTestInput) -> str:
        print('Starting workflow...')
        
        # Simulate a list of input events
        generated_input_data = await workflow.execute_activity(
            race_condition_get_input_events,
            event,
            start_to_close_timeout=timedelta(seconds=10),
            schedule_to_close_timeout=timedelta(seconds=60)
        )

        for iteration in range(event.execution_iterations):
            _, _ = await asyncio.wait(
                [
                    self.execute_activities(
                        iteration=iteration,
                        input=input
                    )
                    for input in generated_input_data
                ],
                return_when=asyncio.ALL_COMPLETED
            )
        
        return "Done."

    async def execute_activities(
        self,
        iteration: int,
        input: RaceConditionIteration,
    ) -> None:
        for iter in range(input.iterations):
            iter_id = f"{iteration}.{input.execution_id}.{iter}"
            result = await workflow.execute_activity(
                echo,
                EchoRequest(
                    execution_iteration=iteration,
                    activity_iteration=iter,
                    input=iter_id,
                    wait_time=input.wait_time
                ),
                start_to_close_timeout=timedelta(seconds=10),
                schedule_to_close_timeout=timedelta(seconds=60)
            )

            if iter_id != result.output:
                print(f"Expected: {iter_id}, Actual: {result.output}")
            

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="race-condition",
        workflows=[RaceConditionTestWorkflow],
        activities=[echo, race_condition_get_input_events],
    )

    print('Starting worker...')

    await worker.run()

if __name__ == '__main__':
    asyncio.run(main())

Environment/Versions

  • OS and processor: Tested on M2 Mac, Linux
  • Temporal SDK Versions: Tested on 1.13, 1.14
  • Python: <3.11 (mitigated in 3.11 due to disallowing coroutines, see below)
  • Occurs within Kubernetes deployment as well as local install

Additional context

It appears that this bug occurs after this warning is printed to the worker console:

2023-11-15T05:38:09.387580Z  WARN temporal_sdk_core::worker::workflow: Task not found when completing error=status: NotFound, message: "Workflow task not found.", details: [8, 5, 18, 24, 87, 111, 114, 107, 102, 108, 111, 119, 32, 116, 97, 115, 107, 32, 110, 111, 116, 32, 102, 111, 117, 110, 100, 46, 26, 66, 10, 64, 116, 121, 112, 101, 46, 103, 111, 111, 103, 108, 101, 97, 112, 105, 115, 46, 99, 111, 109, 47, 116, 101, 109, 112, 111, 114, 97, 108, 46, 97, 112, 105, 46, 101, 114, 114, 111, 114, 100, 101, 116, 97, 105, 108, 115, 46, 118, 49, 46, 78, 111, 116, 70, 111, 117, 110, 100, 70, 97, 105, 108, 117, 114, 101], metadata: MetadataMap { headers: {"content-type": "application/grpc"} } run_id="eb9b8aed-c730-4fe6-a7e5-a772f6757be2"

I can't confirm whether this warning was also appearing when this issue was happening in the real code. It also appears to result in the workflow task restarting, which seems correlated with the determinism breakdown.

Additionally, it appears that this issue has indirectly been mitigated in 3.11, as asyncio.wait() no longer allows passing coroutines directly. When the method is wrapped in asyncio.create_task(), this issue disappears. The above warning is still printed, but doesn't result in disordered activity results.

This code change in the workflow removes the issue:

_, _ = await asyncio.wait(
    [
-      self.execute_activities(
+      asyncio.create_task(self.execute_activities(
            iteration=iteration,
            input=input
-       )
+       ))
        for input in generated_input_data
    ],
    return_when=asyncio.ALL_COMPLETED
)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions