-
Notifications
You must be signed in to change notification settings - Fork 141
Description
Is your feature request related to a problem? Please describe.
Currently, activities running on a multi-processed worker can not be cancelled, either voluntarily by the user/workflow or involuntarily e.g. by a missed heartbeat (or activity timeout, I think?).
This leads to wasted resources and potential pool starvation, as the activity will run to completion anyway just to get the error below and subsequently retried:
Activity not found on completion. This may happen if the activity has already been cancelled but completed anyway.
Additionally, in the case of missed heartbeats / timeout, the server will show the activity has failed and will retry the activity. Users might not expect or protect against this behaviour, leading to possible data corruption (e.g. if the activity was trying to write to a specific file) or duplicate side effects (e.g. if the activity did something at the end, like send an email).
Reproduction: hello_cancellation.py shows a sync, multi-processed activity running forever in a while loop, preventing the worker from shutting down after the workflow was cancelled.
Describe the solution you'd like
-
Cancellation should be best effort in sync, multi-processed workers, like it is for async and multithreaded workers, with the known expectation that a cancellation cannot be received unless the activity is instrumented with heartbeats.
-
Provide clearer documentation in Interrupt a Workflow Execution and/or Python SDK sync-vs-async that sync, multiprocess workers do not currently support cancellation
-
Provide guidance to avoid problematic side effects in such activities, e.g.
- ensure output is written to a unique location or some kind of resource lock is acquired by the activity,
- trigger "only-once" side effects in subsequent activities in the workflow, e.g. sending an email (and that this requires a separate async or multithreaded worker / task queue)
- etc.
Additional context
Other relevant work/issues:
- Support raising cancellation in sync multithreaded activities #217
- [Feature Request] Set OpenTelemetry span status to failed for heartbeat timeout cancellations #1047
Root cause:
- Activity cancellation upon a missed heartbeat or timeout happens here in _ActivityWorker
- This sets various cancellation properties/events in _RunningActivity
- For sync, multiprocessed activities, the
cancelled_eventis initialised with a multiprocessing Manager event that can communicate across the process boundary. - Multi-threaded activities initialise the
_ThreadExceptionRaiserin _execute_sync_activity with the thread ID, but this isn't done for the multi-processed case (I imagine it doesn't work the same way). - So it looks like the
cancelled_eventis set and can be observed within the child process via _Context, but there is currently no built-in way for the user activity implementation to react to this event.
I also assume this affects the "cancelled due to activity pause", at least after the activity has already started?
Out of interest, how does the the _handle_start_activity_task work in this case? Does the _ActivityWorker continue to poll for tasks for an activity it has already picked up, as seems to be suggested by the cancel task? Does the server somehow ensure these cancel tasks are sticky, so the correct worker instance that picked up the start task can process them? I'm assuming this mechanism only works for newly picked up activities, so start was never triggered. If not though, why isn't this mechanism also used for heartbeat failures?
Cheers,
I hope this assessment is accurate 🙏🏻