Closed
Description
Describe the bug
It appears during GC, finally clauses in Python will end up on a different workflow's even loop. This is very bad. Here is a replication:
import asyncio
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from uuid import uuid4
from temporalio import activity, workflow
from temporalio.client import Client, WorkflowHandle
from temporalio.worker import Worker
@activity.defn
def waiting_activity() -> str:
time.sleep(1)
return "done"
@activity.defn
def unrelated_activity() -> str:
return "I should only be in FinallyWorkflow"
@workflow.defn
class DummyWorkflow:
@workflow.run
async def run(self) -> None:
await workflow.start_activity(
waiting_activity, start_to_close_timeout=timedelta(seconds=10)
)
await workflow.start_activity(
waiting_activity, start_to_close_timeout=timedelta(seconds=10)
)
@workflow.defn
class FinallyWorkflow:
@workflow.run
async def run(self) -> None:
try:
await workflow.start_activity(
waiting_activity, start_to_close_timeout=timedelta(seconds=10)
)
await workflow.start_activity(
waiting_activity, start_to_close_timeout=timedelta(seconds=10)
)
finally:
await workflow.start_activity(
unrelated_activity, start_to_close_timeout=timedelta(seconds=10)
)
async def main():
logging.basicConfig(level=logging.INFO)
task_queue = f"tq-{uuid4()}"
logging.info(f"Starting on task queue {task_queue}")
# Connect
client = await Client.connect("localhost:7233")
# Run with worker
with ThreadPoolExecutor(100) as activity_executor:
async with Worker(
client=client,
task_queue=task_queue,
workflows=[FinallyWorkflow, DummyWorkflow],
activities=[waiting_activity, unrelated_activity],
activity_executor=activity_executor,
max_concurrent_workflow_tasks=5,
max_cached_workflows=10,
):
# Create 1000 dummy and finally workflows
dummy_handles: list[WorkflowHandle] = []
logging.info("Starting dummy and finally workflows")
for i in range(1000):
dummy_handles.append(
await client.start_workflow(
DummyWorkflow.run, id=f"dummy-{uuid4()}", task_queue=task_queue
)
)
await client.start_workflow(
FinallyWorkflow.run, id=f"finally-{uuid4()}", task_queue=task_queue
)
# Wait on every dummy handle, which basically waits forever if/when
# one hits non-determinism
logging.info("Checking dummy and finally workflows")
for _, handle in enumerate(dummy_handles):
logging.info(f"Checking dummy result for {handle.id}")
await handle.result()
logging.info("No dummy workflows had finally activities")
if __name__ == "__main__":
asyncio.run(main())
Running that against a localhost server will start to spit out non-determinism errors. And after a few seconds, if you look at the still-running DummyWorkflow
s, you'll see they run unrelated_activity
which is not even in their code. It is suspected this is caused by GeneratorExit
happening with finally upon cache eviction, and that is being interleaved in the same thread as another workflow that has asyncio._set_running_loop
on the thread.
We may need to implement a special async gen capture/finalizer, see https://peps.python.org/pep-0525/#finalization.