Skip to content

Commit 2c1ac54

Browse files
authored
New API to wait for handler executions to complete and warnings on unfinished handler executions (#556)
* Implement warning on unfinished signals and updates * Implement all_handlers_finished
1 parent 2331aa4 commit 2c1ac54

File tree

4 files changed

+570
-27
lines changed

4 files changed

+570
-27
lines changed

pyproject.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,19 @@ env = { TEMPORAL_INTEGRATION_TEST = "1" }
9999
cmd = "pip uninstall temporalio -y"
100100

101101
[tool.pytest.ini_options]
102-
addopts = "-p no:warnings"
103102
asyncio_mode = "auto"
104103
log_cli = true
105104
log_cli_level = "INFO"
106105
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
107106
testpaths = ["tests"]
108107
timeout = 600
109108
timeout_func_only = true
109+
filterwarnings = [
110+
"error::temporalio.workflow.UnfinishedUpdateHandlersWarning",
111+
"error::temporalio.workflow.UnfinishedSignalHandlersWarning",
112+
"ignore::pytest.PytestDeprecationWarning",
113+
"ignore::DeprecationWarning",
114+
]
110115

111116
[tool.cibuildwheel]
112117
# We only want the 3.8 64-bit build of each type. However, due to

temporalio/worker/_workflow_instance.py

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
import collections
77
import contextvars
88
import inspect
9+
import json
910
import logging
1011
import random
1112
import sys
1213
import traceback
13-
import typing
1414
import warnings
1515
from abc import ABC, abstractmethod
1616
from contextlib import contextmanager
@@ -25,6 +25,7 @@
2525
Dict,
2626
Generator,
2727
Generic,
28+
Iterable,
2829
Iterator,
2930
List,
3031
Mapping,
@@ -240,6 +241,14 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
240241
self._queries = dict(self._defn.queries)
241242
self._updates = dict(self._defn.updates)
242243

244+
# We record in-progress signals and updates in order to support waiting for handlers to
245+
# finish, and issuing warnings when the workflow exits with unfinished handlers. Since
246+
# signals lack a unique per-invocation identifier, we introduce a sequence number for the
247+
# purpose.
248+
self._handled_signals_seq = 0
249+
self._in_progress_signals: Dict[int, HandlerExecution] = {}
250+
self._in_progress_updates: Dict[str, HandlerExecution] = {}
251+
243252
# Add stack trace handler
244253
# TODO(cretz): Is it ok that this can be forcefully overridden by the
245254
# workflow author? They could technically override in interceptor
@@ -406,12 +415,15 @@ def activate(
406415
command.HasField("complete_workflow_execution")
407416
or command.HasField("continue_as_new_workflow_execution")
408417
or command.HasField("fail_workflow_execution")
418+
or command.HasField("cancel_workflow_execution")
409419
)
410420
elif not command.HasField("respond_to_query"):
411421
del self._current_completion.successful.commands[i]
412422
continue
413423
i += 1
414424

425+
if seen_completion:
426+
self._warn_if_unfinished_handlers()
415427
return self._current_completion
416428

417429
def _apply(
@@ -490,6 +502,9 @@ async def run_update() -> None:
490502
f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. "
491503
f"known updates: [{' '.join(known_updates)}]"
492504
)
505+
self._in_progress_updates[job.id] = HandlerExecution(
506+
job.name, defn.unfinished_policy, job.id
507+
)
493508
args = self._process_handler_args(
494509
job.name,
495510
job.input,
@@ -572,6 +587,8 @@ async def run_update() -> None:
572587
)
573588
return
574589
raise
590+
finally:
591+
self._in_progress_updates.pop(job.id, None)
575592

576593
self.create_task(
577594
run_update(),
@@ -869,6 +886,9 @@ def _apply_update_random_seed(
869886
#### _Runtime direct workflow call overrides ####
870887
# These are in alphabetical order and all start with "workflow_".
871888

889+
def workflow_all_handlers_finished(self) -> bool:
890+
return not self._in_progress_updates and not self._in_progress_signals
891+
872892
def workflow_continue_as_new(
873893
self,
874894
*args: Any,
@@ -1596,6 +1616,31 @@ def _is_workflow_failure_exception(self, err: BaseException) -> bool:
15961616
)
15971617
)
15981618

1619+
def _warn_if_unfinished_handlers(self) -> None:
1620+
def warnable(handler_executions: Iterable[HandlerExecution]):
1621+
return [
1622+
ex
1623+
for ex in handler_executions
1624+
if ex.unfinished_policy
1625+
== temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON
1626+
]
1627+
1628+
warnable_updates = warnable(self._in_progress_updates.values())
1629+
if warnable_updates:
1630+
warnings.warn(
1631+
temporalio.workflow.UnfinishedUpdateHandlersWarning(
1632+
_make_unfinished_update_handler_message(warnable_updates)
1633+
)
1634+
)
1635+
1636+
warnable_signals = warnable(self._in_progress_signals.values())
1637+
if warnable_signals:
1638+
warnings.warn(
1639+
temporalio.workflow.UnfinishedSignalHandlersWarning(
1640+
_make_unfinished_signal_handler_message(warnable_signals)
1641+
)
1642+
)
1643+
15991644
def _next_seq(self, type: str) -> int:
16001645
seq = self._curr_seqs.get(type, 0) + 1
16011646
self._curr_seqs[type] = seq
@@ -1646,10 +1691,21 @@ def _process_signal_job(
16461691
input = HandleSignalInput(
16471692
signal=job.signal_name, args=args, headers=job.headers
16481693
)
1649-
self.create_task(
1694+
1695+
self._handled_signals_seq += 1
1696+
id = self._handled_signals_seq
1697+
self._in_progress_signals[id] = HandlerExecution(
1698+
job.signal_name, defn.unfinished_policy
1699+
)
1700+
1701+
def done_callback(f):
1702+
self._in_progress_signals.pop(id, None)
1703+
1704+
task = self.create_task(
16501705
self._run_top_level_workflow_function(self._inbound.handle_signal(input)),
16511706
name=f"signal: {job.signal_name}",
16521707
)
1708+
task.add_done_callback(done_callback)
16531709

16541710
def _register_task(
16551711
self,
@@ -2686,3 +2742,55 @@ def set(
26862742

26872743
class _WorkflowBeingEvictedError(BaseException):
26882744
pass
2745+
2746+
2747+
@dataclass
2748+
class HandlerExecution:
2749+
"""Information about an execution of a signal or update handler."""
2750+
2751+
name: str
2752+
unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy
2753+
id: Optional[str] = None
2754+
2755+
2756+
def _make_unfinished_update_handler_message(
2757+
handler_executions: List[HandlerExecution],
2758+
) -> str:
2759+
message = """
2760+
Workflow finished while update handlers are still running. This may have interrupted work that the
2761+
update handler was doing, and the client that sent the update will receive a 'workflow execution
2762+
already completed' RPCError instead of the update result. You can wait for all update and signal
2763+
handlers to complete by using `await workflow.wait_condition(lambda:
2764+
workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the update
2765+
are okay with interrupting running handlers when the workflow finishes, and causing clients to
2766+
receive errors, then you can disable this warning via the update handler decorator:
2767+
`@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`.
2768+
""".replace(
2769+
"\n", " "
2770+
).strip()
2771+
return (
2772+
f"{message} The following updates were unfinished (and warnings were not disabled for their handler): "
2773+
+ json.dumps([{"name": ex.name, "id": ex.id} for ex in handler_executions])
2774+
)
2775+
2776+
2777+
def _make_unfinished_signal_handler_message(
2778+
handler_executions: List[HandlerExecution],
2779+
) -> str:
2780+
message = """
2781+
Workflow finished while signal handlers are still running. This may have interrupted work that the
2782+
signal handler was doing. You can wait for all update and signal handlers to complete by using
2783+
`await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both
2784+
you and the clients sending the signal are okay with interrupting running handlers when the workflow
2785+
finishes, and causing clients to receive errors, then you can disable this warning via the signal
2786+
handler decorator: `@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`.
2787+
""".replace(
2788+
"\n", " "
2789+
).strip()
2790+
names = collections.Counter(ex.name for ex in handler_executions)
2791+
return (
2792+
f"{message} The following signals were unfinished (and warnings were not disabled for their handler): "
2793+
+ json.dumps(
2794+
[{"name": name, "count": count} for name, count in names.most_common()]
2795+
)
2796+
)

0 commit comments

Comments
 (0)