Skip to content

Properly handle uncaught child/activity cancel during workflow cancel #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions temporalio/worker/workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._info = det.info
self._primary_task: Optional[asyncio.Task[None]] = None
self._time = 0.0
self._cancel_requested = False
self._current_history_length = 0
# Handles which are ready to run on the next event loop iteration
self._ready: Deque[asyncio.Handle] = collections.deque()
Expand Down Expand Up @@ -345,6 +346,7 @@ def _apply(
def _apply_cancel_workflow(
self, job: temporalio.bridge.proto.workflow_activation.CancelWorkflow
) -> None:
self._cancel_requested = True
# TODO(cretz): Details or cancel message or whatever?
if self._primary_task:
self._primary_task.cancel()
Expand Down Expand Up @@ -1126,6 +1128,20 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
f"Workflow raised failure with run ID {self._info.run_id}",
exc_info=True,
)
# If a cancel was requested, and the failure is from an activity or
# child, and its cause was a cancellation, we want to use that cause
# instead because it means a cancel bubbled up while waiting on an
# activity or child.
if (
self._cancel_requested
and (
isinstance(err, temporalio.exceptions.ActivityError)
or isinstance(err, temporalio.exceptions.ChildWorkflowError)
)
and isinstance(err.cause, temporalio.exceptions.CancelledError)
):
Comment on lines +1135 to +1142
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might make sense to just pull the part inside the first and clause out into a var to make it a bit easier to read

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the auto-formatter we are forced to use makes this look ugly. It gets worse with a var:

            activity_or_child = isinstance(
                err, temporalio.exceptions.ActivityError
            ) or isinstance(err, temporalio.exceptions.ChildWorkflowError)
            if (
                self._cancel_requested
                and activity_or_child
                and isinstance(err.cause, temporalio.exceptions.CancelledError)
            ):
                err = err.cause

The Python SDK looks much uglier because of this formatter, but it's the industry norm. It's also why few-hundred line files look like thousands.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to not loving black but it is the standard now so 🤷

err = err.cause

command = self._add_command()
command.fail_workflow_execution.failure.SetInParent()
try:
Expand Down
80 changes: 72 additions & 8 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@
)
from temporalio.bridge.proto.workflow_activation import WorkflowActivation
from temporalio.bridge.proto.workflow_completion import WorkflowActivationCompletion
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
from temporalio.client import (
Client,
RPCError,
RPCStatusCode,
WorkflowFailureError,
WorkflowHandle,
)
from temporalio.common import RetryPolicy, SearchAttributes
from temporalio.converter import DataConverter, PayloadCodec, decode_search_attributes
from temporalio.exceptions import (
Expand Down Expand Up @@ -690,6 +696,58 @@ async def started() -> bool:
assert isinstance(err.value.cause, CancelledError)


@activity.defn
async def wait_forever() -> NoReturn:
await asyncio.Future()
raise RuntimeError("Unreachable")


@workflow.defn
class UncaughtCancelWorkflow:
@workflow.run
async def run(self, activity: bool) -> NoReturn:
self._started = True
# Wait forever on activity or child workflow
if activity:
await workflow.execute_activity(
wait_forever, start_to_close_timeout=timedelta(seconds=1000)
)
else:
await workflow.execute_child_workflow(
UncaughtCancelWorkflow.run,
True,
id=f"{workflow.info().workflow_id}_child",
)

@workflow.query
def started(self) -> bool:
return self._started


@pytest.mark.parametrize("activity", [True, False])
async def test_workflow_uncaught_cancel(client: Client, activity: bool):
async with new_worker(
client, UncaughtCancelWorkflow, activities=[wait_forever]
) as worker:
# Start workflow waiting on activity or child workflow, cancel it, and
# confirm the workflow is shown as cancelled
handle = await client.start_workflow(
UncaughtCancelWorkflow.run,
activity,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

async def started() -> bool:
return await handle.query(UncaughtCancelWorkflow.started)

await assert_eq_eventually(True, started)
await handle.cancel()
with pytest.raises(WorkflowFailureError) as err:
await handle.result()
assert isinstance(err.value.cause, CancelledError)


@workflow.defn
class CancelChildWorkflow:
def __init__(self) -> None:
Expand Down Expand Up @@ -732,13 +790,19 @@ async def test_workflow_cancel_child_started(client: Client, use_execute: bool):
)
# Wait until child started
async def child_started() -> bool:
return await handle.query(
CancelChildWorkflow.ready
) and await client.get_workflow_handle_for(
LongSleepWorkflow.run, workflow_id=f"{handle.id}_child"
).query(
LongSleepWorkflow.started
)
try:
return await handle.query(
CancelChildWorkflow.ready
) and await client.get_workflow_handle_for(
LongSleepWorkflow.run, workflow_id=f"{handle.id}_child"
).query(
LongSleepWorkflow.started
)
except RPCError as err:
# Ignore not-found because child may not have started yet
if err.status == RPCStatusCode.NOT_FOUND:
return False
raise

await assert_eq_eventually(True, child_started)
# Send cancel signal and wait on the handle
Expand Down