Skip to content

[Feature Request] Support raising cancellation in sync multiprocessed activities #1048

@gregbrowndev

Description

@gregbrowndev

Is your feature request related to a problem? Please describe.

Currently, activities running on a multi-processed worker can not be cancelled, either voluntarily by the user/workflow or involuntarily e.g. by a missed heartbeat (or activity timeout, I think?).

This leads to wasted resources and potential pool starvation, as the activity will run to completion anyway just to get the error below and subsequently retried:

Activity not found on completion. This may happen if the activity has already been cancelled but completed anyway.

Additionally, in the case of missed heartbeats / timeout, the server will show the activity has failed and will retry the activity. Users might not expect or protect against this behaviour, leading to possible data corruption (e.g. if the activity was trying to write to a specific file) or duplicate side effects (e.g. if the activity did something at the end, like send an email).

Reproduction: hello_cancellation.py shows a sync, multi-processed activity running forever in a while loop, preventing the worker from shutting down after the workflow was cancelled.

Describe the solution you'd like

  • Cancellation should be best effort in sync, multi-processed workers, like it is for async and multithreaded workers, with the known expectation that a cancellation cannot be received unless the activity is instrumented with heartbeats.

  • Provide clearer documentation in Interrupt a Workflow Execution and/or Python SDK sync-vs-async that sync, multiprocess workers do not currently support cancellation

  • Provide guidance to avoid problematic side effects in such activities, e.g.

    • ensure output is written to a unique location or some kind of resource lock is acquired by the activity,
    • trigger "only-once" side effects in subsequent activities in the workflow, e.g. sending an email (and that this requires a separate async or multithreaded worker / task queue)
    • etc.

Additional context

Other relevant work/issues:

Root cause:

  • Activity cancellation upon a missed heartbeat or timeout happens here in _ActivityWorker
  • This sets various cancellation properties/events in _RunningActivity
  • For sync, multiprocessed activities, the cancelled_event is initialised with a multiprocessing Manager event that can communicate across the process boundary.
  • Multi-threaded activities initialise the _ThreadExceptionRaiser in _execute_sync_activity with the thread ID, but this isn't done for the multi-processed case (I imagine it doesn't work the same way).
  • So it looks like the cancelled_event is set and can be observed within the child process via _Context, but there is currently no built-in way for the user activity implementation to react to this event.

I also assume this affects the "cancelled due to activity pause", at least after the activity has already started?

Out of interest, how does the the _handle_start_activity_task work in this case? Does the _ActivityWorker continue to poll for tasks for an activity it has already picked up, as seems to be suggested by the cancel task? Does the server somehow ensure these cancel tasks are sticky, so the correct worker instance that picked up the start task can process them? I'm assuming this mechanism only works for newly picked up activities, so start was never triggered. If not though, why isn't this mechanism also used for heartbeat failures?

Cheers,

I hope this assessment is accurate 🙏🏻

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions