Description
What are you really trying to do?
Split a list of incoming events into concurrently executed activity streams. The repro below mimics a stripped down version of the original code structure.
Describe the bug
When using asyncio.wait()
with a list of async method calls, which contain a series of activity executions, an error can occur which results in later activities receiving the return values of different concurrent executions.
Here is a table to illustrate what I mean. Given an activity that executes f(x) -> x
, if we pass the numbers 1-3 concurrently, we might see this:
Activity Input | Expected Output | Actual Output |
---|---|---|
1 | 1 | 1 |
2 | 2 | 3 |
3 | 3 | 2 |
This issue is random, and rare. The reproduction below is designed to maximize the chance of running into it.
Minimal Reproduction
Below is a self-contained reproduction of the issue. Using this input should encounter the error fairly consistently, a little less than once per run:
{"execution_iterations": 20,"activity_iterations": 5,"concurrency": 100,"wait_time": 0.1}
The temporal setup I used to reproduce this issue is a vanilla Temporal server install running via temporal server start-dev
, with two worker instances running the below python file using python -m main
. I primarily tested using Python 3.8.
When there is a mismatch, it is printed to the console.
import asyncio
from datetime import timedelta
from typing import List
from temporalio import workflow, activity
from temporalio.worker import Worker
from temporalio.client import Client
from dataclasses import dataclass
@dataclass
class EchoRequest:
execution_iteration: int
activity_iteration: int
input: str
wait_time: float
@dataclass
class EchoResponse:
output: str
@activity.defn(name="echo")
async def echo(
echo: EchoRequest
) -> EchoResponse:
await asyncio.sleep(echo.wait_time)
return EchoResponse(output=echo.input)
@dataclass
class RaceConditionIteration:
execution_id: int
iterations: int
wait_time: float
@dataclass
class RaceConditionTestInput:
execution_iterations: int
activity_iterations: int
concurrency: int
wait_time: float
@activity.defn(name="race_condition_get_input_events")
async def race_condition_get_input_events(
input: RaceConditionTestInput
) -> List[RaceConditionIteration]:
return [
RaceConditionIteration(
execution_id=id,
iterations=input.activity_iterations,
wait_time=input.wait_time,
)
for id in range(input.concurrency)
]
@workflow.defn(name="RaceConditionTestWorkflow")
class RaceConditionTestWorkflow:
@workflow.run
async def run(self, event: RaceConditionTestInput) -> str:
print('Starting workflow...')
# Simulate a list of input events
generated_input_data = await workflow.execute_activity(
race_condition_get_input_events,
event,
start_to_close_timeout=timedelta(seconds=10),
schedule_to_close_timeout=timedelta(seconds=60)
)
for iteration in range(event.execution_iterations):
_, _ = await asyncio.wait(
[
self.execute_activities(
iteration=iteration,
input=input
)
for input in generated_input_data
],
return_when=asyncio.ALL_COMPLETED
)
return "Done."
async def execute_activities(
self,
iteration: int,
input: RaceConditionIteration,
) -> None:
for iter in range(input.iterations):
iter_id = f"{iteration}.{input.execution_id}.{iter}"
result = await workflow.execute_activity(
echo,
EchoRequest(
execution_iteration=iteration,
activity_iteration=iter,
input=iter_id,
wait_time=input.wait_time
),
start_to_close_timeout=timedelta(seconds=10),
schedule_to_close_timeout=timedelta(seconds=60)
)
if iter_id != result.output:
print(f"Expected: {iter_id}, Actual: {result.output}")
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="race-condition",
workflows=[RaceConditionTestWorkflow],
activities=[echo, race_condition_get_input_events],
)
print('Starting worker...')
await worker.run()
if __name__ == '__main__':
asyncio.run(main())
Environment/Versions
- OS and processor: Tested on M2 Mac, Linux
- Temporal SDK Versions: Tested on 1.13, 1.14
- Python: <3.11 (mitigated in 3.11 due to disallowing coroutines, see below)
- Occurs within Kubernetes deployment as well as local install
Additional context
It appears that this bug occurs after this warning is printed to the worker console:
2023-11-15T05:38:09.387580Z WARN temporal_sdk_core::worker::workflow: Task not found when completing error=status: NotFound, message: "Workflow task not found.", details: [8, 5, 18, 24, 87, 111, 114, 107, 102, 108, 111, 119, 32, 116, 97, 115, 107, 32, 110, 111, 116, 32, 102, 111, 117, 110, 100, 46, 26, 66, 10, 64, 116, 121, 112, 101, 46, 103, 111, 111, 103, 108, 101, 97, 112, 105, 115, 46, 99, 111, 109, 47, 116, 101, 109, 112, 111, 114, 97, 108, 46, 97, 112, 105, 46, 101, 114, 114, 111, 114, 100, 101, 116, 97, 105, 108, 115, 46, 118, 49, 46, 78, 111, 116, 70, 111, 117, 110, 100, 70, 97, 105, 108, 117, 114, 101], metadata: MetadataMap { headers: {"content-type": "application/grpc"} } run_id="eb9b8aed-c730-4fe6-a7e5-a772f6757be2"
I can't confirm whether this warning was also appearing when this issue was happening in the real code. It also appears to result in the workflow task restarting, which seems correlated with the determinism breakdown.
Additionally, it appears that this issue has indirectly been mitigated in 3.11, as asyncio.wait()
no longer allows passing coroutines directly. When the method is wrapped in asyncio.create_task()
, this issue disappears. The above warning is still printed, but doesn't result in disordered activity results.
This code change in the workflow removes the issue:
_, _ = await asyncio.wait(
[
- self.execute_activities(
+ asyncio.create_task(self.execute_activities(
iteration=iteration,
input=input
- )
+ ))
for input in generated_input_data
],
return_when=asyncio.ALL_COMPLETED
)