Skip to content

New API to wait for handler executions to complete and warnings on unfinished handler executions #556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ed5bd1a
Add failing test
dandavison Jun 19, 2024
f675216
Refactor test
dandavison Jun 19, 2024
c2a1961
Implement warning on unfinished signals and updates
dandavison Jun 19, 2024
8945ea6
Implement all_handlers_finished
dandavison Jun 19, 2024
8a5f432
Test waiting for all_handlers_finished
dandavison Jun 19, 2024
8a1c7e6
Run in same worker
dandavison Jun 19, 2024
8eef8a6
Refactor
dandavison Jun 19, 2024
ee4fe48
Refactor test
dandavison Jun 19, 2024
4c6bb27
Store policy with in-progress job
dandavison Jun 19, 2024
916cd72
Add new parameter to signal and update decorators
dandavison Jun 19, 2024
9c34eae
Store (job, defn)
dandavison Jun 19, 2024
6feed46
Format warning messages
dandavison Jun 19, 2024
e024ef2
Improve test
dandavison Jun 20, 2024
2f96acd
Clean up warning implementation
dandavison Jun 20, 2024
43387e4
s/finished/complete/
dandavison Jun 20, 2024
359ce60
Revert "s/finished/complete/"
dandavison Jun 20, 2024
bb2bf5c
Wrap signal handling without introducing new yield point
dandavison Jun 20, 2024
fd1a269
List default value first in enum
dandavison Jun 20, 2024
e218fe2
s/IntEnum/Enum/
dandavison Jun 20, 2024
866b9be
Maintain alphabetical method order
dandavison Jun 20, 2024
7d13e1d
Simplify overloads
dandavison Jun 21, 2024
ec318ae
Abbreviate decorator argument
dandavison Jun 21, 2024
8e1c30e
Drive-by removal of unused imports
dandavison Jun 21, 2024
1b42175
Use custom warnings in test
dandavison Jun 21, 2024
4acc28b
Convert unfinished handler warnings to errors during test suite
dandavison Jun 21, 2024
48fb622
Satisfy pydocstyle linter
dandavison Jun 21, 2024
67e5611
Use unfinished_policy=HandlerUnfinishedPolicy.XXX
dandavison Jun 21, 2024
f0b1739
Test that unfinished handlers cause exceptions in the test suite
dandavison Jun 21, 2024
572cf9a
Allow use of generics with asyncio.Future in Python 3.8
dandavison Jun 21, 2024
04373d8
Move warning classes to temporalio.workflow
dandavison Jun 21, 2024
b1ee630
Pluralize warning names and fix docstrings
dandavison Jun 21, 2024
3e2181f
Turn enum comments into docstrings
dandavison Jun 21, 2024
fcabe21
Use default timeout and interval
dandavison Jun 21, 2024
ea63920
Clean up imports
dandavison Jun 21, 2024
7063c17
Edit warning messages
dandavison Jun 24, 2024
8162e31
Refactor test
dandavison Jun 24, 2024
15ed853
Test cancellation
dandavison Jun 25, 2024
3f0b11c
Bug fix: include cancellation command in set of "completion" commands
dandavison Jun 25, 2024
d6ca246
Add coverage for workflow failure
dandavison Jun 25, 2024
5795269
Lint fixes
dandavison Jun 25, 2024
f769bc0
Skip update tests under Java time-skipping server
dandavison Jun 25, 2024
cf5b158
Remove unused imports
dandavison Jun 25, 2024
caed0ff
Try increasing timeout
dandavison Jun 25, 2024
97fc45f
Remove spurious asyncio.gather calls in test
dandavison Jun 25, 2024
b9e07cd
Skip test due to putative Java test server bug
dandavison Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,19 @@ env = { TEMPORAL_INTEGRATION_TEST = "1" }
cmd = "pip uninstall temporalio -y"

[tool.pytest.ini_options]
addopts = "-p no:warnings"
asyncio_mode = "auto"
log_cli = true
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
testpaths = ["tests"]
timeout = 600
timeout_func_only = true
filterwarnings = [
"error::temporalio.workflow.UnfinishedUpdateHandlersWarning",
"error::temporalio.workflow.UnfinishedSignalHandlersWarning",
"ignore::pytest.PytestDeprecationWarning",
"ignore::DeprecationWarning",
]

[tool.cibuildwheel]
# We only want the 3.8 64-bit build of each type. However, due to
Expand Down
112 changes: 110 additions & 2 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import collections
import contextvars
import inspect
import json
import logging
import random
import sys
import traceback
import typing
import warnings
from abc import ABC, abstractmethod
from contextlib import contextmanager
Expand All @@ -25,6 +25,7 @@
Dict,
Generator,
Generic,
Iterable,
Iterator,
List,
Mapping,
Expand Down Expand Up @@ -240,6 +241,14 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._queries = dict(self._defn.queries)
self._updates = dict(self._defn.updates)

# We record in-progress signals and updates in order to support waiting for handlers to
# finish, and issuing warnings when the workflow exits with unfinished handlers. Since
# signals lack a unique per-invocation identifier, we introduce a sequence number for the
# purpose.
self._handled_signals_seq = 0
self._in_progress_signals: Dict[int, HandlerExecution] = {}
self._in_progress_updates: Dict[str, HandlerExecution] = {}

# Add stack trace handler
# TODO(cretz): Is it ok that this can be forcefully overridden by the
# workflow author? They could technically override in interceptor
Expand Down Expand Up @@ -406,12 +415,15 @@ def activate(
command.HasField("complete_workflow_execution")
or command.HasField("continue_as_new_workflow_execution")
or command.HasField("fail_workflow_execution")
or command.HasField("cancel_workflow_execution")
)
elif not command.HasField("respond_to_query"):
del self._current_completion.successful.commands[i]
continue
i += 1

if seen_completion:
self._warn_if_unfinished_handlers()
return self._current_completion

def _apply(
Expand Down Expand Up @@ -490,6 +502,9 @@ async def run_update() -> None:
f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. "
f"known updates: [{' '.join(known_updates)}]"
)
self._in_progress_updates[job.id] = HandlerExecution(
job.name, defn.unfinished_policy, job.id
)
args = self._process_handler_args(
job.name,
job.input,
Expand Down Expand Up @@ -572,6 +587,8 @@ async def run_update() -> None:
)
return
raise
finally:
self._in_progress_updates.pop(job.id, None)

self.create_task(
run_update(),
Expand Down Expand Up @@ -869,6 +886,9 @@ def _apply_update_random_seed(
#### _Runtime direct workflow call overrides ####
# These are in alphabetical order and all start with "workflow_".

def workflow_all_handlers_finished(self) -> bool:
return not self._in_progress_updates and not self._in_progress_signals

def workflow_continue_as_new(
self,
*args: Any,
Expand Down Expand Up @@ -1596,6 +1616,31 @@ def _is_workflow_failure_exception(self, err: BaseException) -> bool:
)
)

def _warn_if_unfinished_handlers(self) -> None:
def warnable(handler_executions: Iterable[HandlerExecution]):
return [
ex
for ex in handler_executions
if ex.unfinished_policy
== temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON
]

warnable_updates = warnable(self._in_progress_updates.values())
if warnable_updates:
warnings.warn(
temporalio.workflow.UnfinishedUpdateHandlersWarning(
_make_unfinished_update_handler_message(warnable_updates)
)
)

warnable_signals = warnable(self._in_progress_signals.values())
if warnable_signals:
warnings.warn(
temporalio.workflow.UnfinishedSignalHandlersWarning(
_make_unfinished_signal_handler_message(warnable_signals)
)
)

def _next_seq(self, type: str) -> int:
seq = self._curr_seqs.get(type, 0) + 1
self._curr_seqs[type] = seq
Expand Down Expand Up @@ -1646,10 +1691,21 @@ def _process_signal_job(
input = HandleSignalInput(
signal=job.signal_name, args=args, headers=job.headers
)
self.create_task(

self._handled_signals_seq += 1
id = self._handled_signals_seq
self._in_progress_signals[id] = HandlerExecution(
job.signal_name, defn.unfinished_policy
)

def done_callback(f):
self._in_progress_signals.pop(id, None)

task = self.create_task(
self._run_top_level_workflow_function(self._inbound.handle_signal(input)),
name=f"signal: {job.signal_name}",
)
task.add_done_callback(done_callback)

def _register_task(
self,
Expand Down Expand Up @@ -2686,3 +2742,55 @@ def set(

class _WorkflowBeingEvictedError(BaseException):
pass


@dataclass
class HandlerExecution:
"""Information about an execution of a signal or update handler."""

name: str
unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy
id: Optional[str] = None


def _make_unfinished_update_handler_message(
handler_executions: List[HandlerExecution],
) -> str:
message = """
Workflow finished while update handlers are still running. This may have interrupted work that the
update handler was doing, and the client that sent the update will receive a 'workflow execution
already completed' RPCError instead of the update result. You can wait for all update and signal
handlers to complete by using `await workflow.wait_condition(lambda:
workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the update
are okay with interrupting running handlers when the workflow finishes, and causing clients to
receive errors, then you can disable this warning via the update handler decorator:
`@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`.
""".replace(
"\n", " "
).strip()
return (
f"{message} The following updates were unfinished (and warnings were not disabled for their handler): "
+ json.dumps([{"name": ex.name, "id": ex.id} for ex in handler_executions])
)


def _make_unfinished_signal_handler_message(
handler_executions: List[HandlerExecution],
) -> str:
message = """
Workflow finished while signal handlers are still running. This may have interrupted work that the
signal handler was doing. You can wait for all update and signal handlers to complete by using
`await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both
you and the clients sending the signal are okay with interrupting running handlers when the workflow
finishes, and causing clients to receive errors, then you can disable this warning via the signal
handler decorator: `@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`.
""".replace(
"\n", " "
).strip()
names = collections.Counter(ex.name for ex in handler_executions)
return (
f"{message} The following signals were unfinished (and warnings were not disabled for their handler): "
+ json.dumps(
[{"name": name, "count": count} for name, count in names.most_common()]
)
)
Loading
Loading