Skip to content

Fix cancel before run #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion temporalio/worker/workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,10 @@ def _apply_cancel_workflow(
self._cancel_requested = True
# TODO(cretz): Details or cancel message or whatever?
if self._primary_task:
self._primary_task.cancel()
# The primary task may not have started yet and we want to give the
# workflow the ability to receive the cancellation, so we must defer
# this cancellation to the next iteration of the event loop.
self.call_soon(self._primary_task.cancel)

def _apply_fire_timer(
self, job: temporalio.bridge.proto.workflow_activation.FireTimer
Expand Down
25 changes: 25 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,31 @@ async def started() -> bool:
assert isinstance(err.value.cause, CancelledError)


@workflow.defn
class TrapCancelWorkflow:
@workflow.run
async def run(self) -> str:
try:
await asyncio.Future()
raise RuntimeError("should not get here")
except asyncio.CancelledError:
return "cancelled"


async def test_workflow_cancel_before_run(client: Client):
# Start the workflow _and_ send cancel before even starting the workflow
task_queue = str(uuid.uuid4())
handle = await client.start_workflow(
TrapCancelWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=task_queue,
)
await handle.cancel()
# Start worker and wait for result
async with new_worker(client, TrapCancelWorkflow, task_queue=task_queue):
assert "cancelled" == await handle.result()


@activity.defn
async def wait_forever() -> NoReturn:
await asyncio.Future()
Expand Down