forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hello_exception.py
92 lines (76 loc) · 2.96 KB
/
hello_exception.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
import logging
from dataclasses import dataclass
from datetime import timedelta
from typing import NoReturn, Optional
from temporalio import activity, workflow
from temporalio.client import Client, WorkflowFailureError
from temporalio.common import RetryPolicy
from temporalio.exceptions import FailureError
from temporalio.worker import Worker
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> NoReturn:
# Always raise exception
raise RuntimeError(f"Greeting exception: {input.greeting}, {input.name}!")
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
# We'll only retry once
retry_policy=RetryPolicy(maximum_attempts=2),
)
async def main():
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-exception-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
#
# This will raise a WorkflowFailureError with cause ActivityError with
# cause ApplicationError with the error message and stack trace.
try:
await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-exception-workflow-id",
task_queue="hello-exception-task-queue",
)
except WorkflowFailureError as err:
# Python does not support deserializing the traceback and putting it
# back on the exception so we cannot repopulate the stack. Instead,
# users can use the helper below to append the stack to the message.
#
# See https://github.com/temporalio/sdk-python/issues/58.
append_temporal_stack(err)
# Log the exception
logging.exception("Got workflow failure")
# This is an example of appending the stack to every Temporal failure error
def append_temporal_stack(exc: Optional[BaseException]) -> None:
while exc:
# Only append if it doesn't appear already there
if (
isinstance(exc, FailureError)
and exc.failure
and exc.failure.stack_trace
and len(exc.args) == 1
and "\nStack:\n" not in exc.args[0]
):
exc.args = (f"{exc}\nStack:\n{exc.failure.stack_trace.rstrip()}",)
exc = exc.__cause__
if __name__ == "__main__":
asyncio.run(main())