Skip to content

Commit 4aef4bf

Browse files
authored
Stack trace on deadlock detection exception (#626)
1 parent 7af99d0 commit 4aef4bf

File tree

4 files changed

+157
-7
lines changed

4 files changed

+157
-7
lines changed

temporalio/worker/_workflow.py

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,22 @@
66
import concurrent.futures
77
import logging
88
import os
9+
import sys
910
from datetime import timezone
10-
from typing import Callable, Dict, List, MutableMapping, Optional, Sequence, Set, Type
11+
from threading import get_ident
12+
from types import TracebackType
13+
from typing import (
14+
Callable,
15+
Dict,
16+
List,
17+
Literal,
18+
MutableMapping,
19+
Optional,
20+
Sequence,
21+
Set,
22+
Tuple,
23+
Type,
24+
)
1125

1226
import temporalio.activity
1327
import temporalio.api.common.v1
@@ -250,9 +264,10 @@ async def _handle_activation(
250264
activate_task, self._deadlock_timeout_seconds
251265
)
252266
except asyncio.TimeoutError:
253-
raise RuntimeError(
254-
f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within {self._deadlock_timeout_seconds} second(s)"
255-
)
267+
raise _DeadlockError.from_deadlocked_workflow(
268+
workflow, self._deadlock_timeout_seconds
269+
) from None
270+
256271
except Exception as err:
257272
# We cannot fail a cache eviction, we must just log and not complete
258273
# the activation (failed or otherwise). This should only happen in
@@ -268,6 +283,9 @@ async def _handle_activation(
268283
self._could_not_evict_count += 1
269284
return
270285

286+
if isinstance(err, _DeadlockError):
287+
err.swap_traceback()
288+
271289
logger.exception(
272290
"Failed handling activation on workflow with run ID %s", act.run_id
273291
)
@@ -421,3 +439,77 @@ def nondeterminism_as_workflow_fail_for_types(self) -> Set[str]:
421439
for typ in v.failure_exception_types
422440
)
423441
)
442+
443+
444+
class _DeadlockError(Exception):
445+
"""Exception class for deadlocks. Contains functionality to swap the default traceback for another."""
446+
447+
def __init__(self, message: str, replacement_tb: Optional[TracebackType] = None):
448+
"""Create a new DeadlockError, with message `msg` and optionally a traceback `tb` to be swapped in later.
449+
450+
Args:
451+
message: Message to be presented through exception.
452+
replacement_tb: Optional TracebackType to be swapped later.
453+
"""
454+
super().__init__(message)
455+
self._new_tb = replacement_tb
456+
457+
def swap_traceback(self) -> None:
458+
"""Swap the current traceback for the replacement passed during construction. Used to work around Python adding the current frame to the stack trace.
459+
460+
Returns:
461+
None
462+
"""
463+
if self._new_tb:
464+
self.__traceback__ = self._new_tb
465+
self._new_tb = None
466+
467+
@classmethod
468+
def from_deadlocked_workflow(
469+
cls, workflow: WorkflowInstance, timeout: Optional[int]
470+
):
471+
msg = f"[TMPRL1101] Potential deadlock detected: workflow didn't yield within {timeout} second(s)."
472+
tid = workflow.get_thread_id()
473+
if not tid:
474+
return cls(msg)
475+
476+
try:
477+
tb = cls._gen_tb_helper(tid)
478+
if tb:
479+
return cls(msg, tb)
480+
return cls(f"{msg} (no frames available)")
481+
except Exception as err:
482+
return cls(f"{msg} (failed getting frames: {err})")
483+
484+
@staticmethod
485+
def _gen_tb_helper(
486+
tid: int,
487+
) -> Optional[TracebackType]:
488+
"""Take a thread id and construct a stack trace.
489+
490+
Returns:
491+
<Optional[TracebackType]> the traceback that was constructed, None if the thread could not be found.
492+
"""
493+
frame = sys._current_frames().get(tid)
494+
if not frame:
495+
return None
496+
497+
# not using traceback.extract_stack() because it obfuscates the frame objects (specifically f_lasti)
498+
thread_frames = [frame]
499+
while frame.f_back:
500+
frame = frame.f_back
501+
thread_frames.append(frame)
502+
503+
thread_frames.reverse()
504+
505+
size = 0
506+
tb = None
507+
for frm in thread_frames:
508+
tb = TracebackType(tb, frm, frm.f_lasti, frm.f_lineno)
509+
size += sys.getsizeof(tb)
510+
511+
while size > 200000 and tb:
512+
size -= sys.getsizeof(tb)
513+
tb = tb.tb_next
514+
515+
return tb

temporalio/worker/_workflow_instance.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import logging
1111
import random
1212
import sys
13+
import threading
1314
import traceback
1415
import warnings
1516
from abc import ABC, abstractmethod
@@ -158,6 +159,16 @@ def activate(
158159
"""
159160
raise NotImplementedError
160161

162+
def get_thread_id(self) -> Optional[int]:
163+
"""Return the thread identifier that this workflow is running on.
164+
165+
Not an abstractmethod because it is not mandatory to implement. Used primarily for getting the frames of a deadlocked thread.
166+
167+
Returns:
168+
Thread ID if the workflow is running, None if not.
169+
"""
170+
return None
171+
161172

162173
class UnsandboxedWorkflowRunner(WorkflowRunner):
163174
"""Workflow runner that does not do any sandboxing."""
@@ -300,6 +311,12 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
300311
# We only create the metric meter lazily
301312
self._metric_meter: Optional[_ReplaySafeMetricMeter] = None
302313

314+
# For tracking the thread this workflow is running on (primarily for deadlock situations)
315+
self._current_thread_id: Optional[int] = None
316+
317+
def get_thread_id(self) -> Optional[int]:
318+
return self._current_thread_id
319+
303320
#### Activation functions ####
304321
# These are in alphabetical order and besides "activate", all other calls
305322
# are "_apply_" + the job field name.
@@ -320,6 +337,7 @@ def activate(
320337
self._time_ns = act.timestamp.ToNanoseconds()
321338
self._is_replaying = act.is_replaying
322339

340+
self._current_thread_id = threading.get_ident()
323341
activation_err: Optional[Exception] = None
324342
try:
325343
# Split into job sets with patches, then signals + updates, then

temporalio/worker/workflow_sandbox/_runner.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66

77
from __future__ import annotations
88

9+
import threading
910
from datetime import datetime, timedelta, timezone
10-
from typing import Any, Sequence, Type
11+
from typing import Any, Optional, Sequence, Type
1112

1213
import temporalio.bridge.proto.workflow_activation
1314
import temporalio.bridge.proto.workflow_completion
@@ -112,6 +113,8 @@ def __init__(
112113
self.runner_class = runner_class
113114
self.importer = Importer(restrictions, RestrictionContext())
114115

116+
self._current_thread_id: Optional[int] = None
117+
115118
# Create the instance
116119
self.globals_and_locals = {
117120
"__file__": "workflow_sandbox.py",
@@ -169,8 +172,13 @@ def _run_code(self, code: str, **extra_globals: Any) -> None:
169172
self.globals_and_locals[k] = v
170173
try:
171174
temporalio.workflow.unsafe._set_in_sandbox(True)
175+
self._current_thread_id = threading.get_ident()
172176
exec(code, self.globals_and_locals, self.globals_and_locals)
173177
finally:
174178
temporalio.workflow.unsafe._set_in_sandbox(False)
179+
self._current_thread_id = None
175180
for k, v in extra_globals.items():
176181
self.globals_and_locals.pop(k, None)
182+
183+
def get_thread_id(self) -> Optional[int]:
184+
return self._current_thread_id

tests/worker/test_workflow.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2108,6 +2108,38 @@ async def status() -> str:
21082108

21092109

21102110
async def test_workflow_enhanced_stack_trace(client: Client):
2111+
"""Expected format of __enhanced_stack_trace:
2112+
2113+
EnhancedStackTrace : {
2114+
2115+
sdk (StackTraceSDKInfo) : {
2116+
name: string,
2117+
version: string
2118+
},
2119+
2120+
sources (map<string, StackTraceFileSlice>) : {
2121+
filename: (StackTraceFileSlice) {
2122+
line_offset: int,
2123+
content: string
2124+
},
2125+
...
2126+
},
2127+
2128+
stacks (StackTrace[]) : [
2129+
(StackTraceFileLocation) {
2130+
file_path: string,
2131+
line: int,
2132+
column: int,
2133+
function_name: string,
2134+
internal_code: bool
2135+
},
2136+
...
2137+
]
2138+
}
2139+
2140+
More details available in API repository: temporal/api/sdk/v1/enhanced_stack_trace.proto
2141+
"""
2142+
21112143
async with new_worker(
21122144
client, StackTraceWorkflow, LongSleepWorkflow, activities=[wait_cancel]
21132145
) as worker:
@@ -2570,7 +2602,7 @@ async def last_history_task_failure() -> str:
25702602

25712603
try:
25722604
await assert_eq_eventually(
2573-
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
2605+
"[TMPRL1101] Potential deadlock detected: workflow didn't yield within 1 second(s).",
25742606
last_history_task_failure,
25752607
timeout=timedelta(seconds=5),
25762608
interval=timedelta(seconds=1),
@@ -2627,7 +2659,7 @@ async def last_history_task_failure() -> str:
26272659
return "<no failure>"
26282660

26292661
await assert_eq_eventually(
2630-
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
2662+
"[TMPRL1101] Potential deadlock detected: workflow didn't yield within 1 second(s).",
26312663
last_history_task_failure,
26322664
timeout=timedelta(seconds=5),
26332665
interval=timedelta(seconds=1),

0 commit comments

Comments
 (0)