Skip to content

Add __enhanced_stack_trace query to workers #537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2d623b8
enhanced stack trace:
twin-drill May 31, 2024
4acf6ab
Refactored `@dataclass`es to `temporalio.common`.
twin-drill May 31, 2024
9579582
Added integration test for external file, via added file (externalsta…
twin-drill Jun 3, 2024
3cfcce0
Reformatted with `poe format`.
twin-drill Jun 3, 2024
c72f9d0
Fix typing issues (_FileLocation to _StackTrace) and adjust integ tes…
twin-drill Jun 3, 2024
d9a930a
Fix typing errors in new integ test (Forgot an External-)
twin-drill Jun 3, 2024
16cad2c
refactor dataclasses into `_workflow_instance.py`; use `typing` packa…
twin-drill Jun 4, 2024
21acbda
Merge branch 'main' into main
twin-drill Jun 4, 2024
80bd0e5
Merge branch 'main' of https://github.com/twin-drill/temporal-sdk-python
twin-drill Jun 4, 2024
3e5037b
multi-file stack traces not working atm
twin-drill Jun 7, 2024
ae4f4a9
delete extra file
twin-drill Jun 10, 2024
428fb7f
Merge branch 'main' into main
twin-drill Jun 12, 2024
0c5f175
remove in-file classes
twin-drill Jun 12, 2024
0380f30
update submodule
twin-drill Jun 12, 2024
37b8b33
refactor with proto classes
twin-drill Jun 14, 2024
c8902c7
changed `test_workflow_enhanced_stack_trace` and `test_workflow_exter…
twin-drill Jun 14, 2024
a2c0803
Merge branch 'main' into main
twin-drill Jun 17, 2024
393ca37
Make `FileSlice` a copy of the entire source file, with `line_offset`…
twin-drill Jun 17, 2024
91584e3
add `Cargo.lock`
twin-drill Jun 17, 2024
2440ea8
Skip `test_replayer_workflow_complete` for Python versions < 3.12 -- …
twin-drill Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,36 @@
WorkflowOutboundInterceptor,
)

from temporalio.service import __version__

@dataclass
class SDKInfo:
name : str
version : str

@dataclass
class FileSlice:
content : str
lineOffset : int

@dataclass
class FileLocation:
filePath : str
line : Optional[int] = -1
column : Optional[int] = -1
functionName : Optional[str] = None

@dataclass
class StackTrace:
locations : list[FileLocation]

@dataclass
class EnhancedStackTrace:
sdk : SDKInfo
sources : dict[str, FileSlice]
stacks : list[StackTrace]


logger = logging.getLogger(__name__)

# Set to true to log all cases where we're ignoring things during delete
Expand Down Expand Up @@ -250,6 +280,14 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
ret_type=str,
)

self._queries["__enhanced_stack_trace"] = temporalio.workflow._QueryDefinition(
name="__enhanced_stack_trace",
fn=self._enhanced_stack_trace,
is_method=False,
arg_types=[],
ret_type=EnhancedStackTrace,
)

# Maintain buffered signals for later-added dynamic handlers
self._buffered_signals: Dict[
str, List[temporalio.bridge.proto.workflow_activation.SignalWorkflow]
Expand Down Expand Up @@ -1790,6 +1828,38 @@ def _stack_trace(self) -> str:
+ "\n".join(traceback.format_list(frames))
)
return "\n\n".join(stacks)

def _enhanced_stack_trace(self) -> EnhancedStackTrace:
sdk = SDKInfo("sdk-python", __version__)

sources = dict()
stacks = []

for task in list(self._tasks):
for frame in task.get_stack():
filename = frame.f_code.co_filename
line_number = frame.f_lineno
func_name = frame.f_code.co_name

try:
source = inspect.getsourcelines(frame)
code = ''.join(source[0])
line_number = int(source[1])
except OSError as ose:
code = "Cannot access code.\n---\n%s" % ose.strerror
# TODO possibly include sentinel/property for success of src scrape? work out with ui
except Exception:
code = "Generic Error.\n\n%s" % traceback.format_exc()

file_slice = FileSlice(code, line_number)
file_location = FileLocation(filename, line = line_number, functionName = func_name)

sources["%s %d" % (filename, line_number)] = file_slice
stacks.append(file_location)


est = EnhancedStackTrace(sdk, sources, stacks)
return est

#### asyncio.AbstractEventLoop function impls ####
# These are in the order defined in CPython's impl of the base class. Many
Expand Down
28 changes: 26 additions & 2 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
Runtime,
TelemetryConfig,
)
from temporalio.service import RPCError, RPCStatusCode
from temporalio.service import RPCError, RPCStatusCode, __version__
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import (
UnsandboxedWorkflowRunner,
Expand Down Expand Up @@ -494,7 +494,7 @@ async def test_workflow_signal_and_query_errors(client: Client):
await handle.query("non-existent query")
assert str(rpc_err.value) == (
"Query handler for 'non-existent query' expected but not found,"
" known queries: [__stack_trace bad_query other_query]"
" known queries: [__enhanced_stack_trace __stack_trace bad_query other_query]"
)


Expand Down Expand Up @@ -2097,6 +2097,30 @@ async def status() -> str:
# TODO(cretz): Do more specific checks once we clean up traces
assert "never_completing_coroutine" in trace

async def test_workflow_enhanced_stack_trace(client: Client):
async with new_worker(
client, StackTraceWorkflow, LongSleepWorkflow, activities=[wait_cancel]
) as worker:
handle = await client.start_workflow(
StackTraceWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

# Wait until waiting
async def status() -> str:
return await handle.query(StackTraceWorkflow.status)

await assert_eq_eventually("waiting", status)

# Send stack trace query
trace = await handle.query("__enhanced_stack_trace")

assert "never_completing_coroutine" in [ stack['functionName'] for stack in trace['stacks'] ]
# first line of never_completing_coroutine
assert 'self._status = "waiting"' in str(trace['sources'])
assert trace['sdk']['version'] == __version__


@dataclass
class MyDataClass:
Expand Down