Skip to content

Commit 39f8e84

Browse files
tconley1428cretz
andauthored
Adding trace identification numbers (#888)
* Adding trace identification numbers to select log statements to allow for easier categorization in log handlers * Reformat * Fix import formatting * Remove key and enum definitions * Fix copy paste bug * Switch workflow test to logcapturer * Remove test artifact * Fixing test * Update temporalio/types.py Co-authored-by: Chad Retz <chad@temporal.io> --------- Co-authored-by: Chad Retz <chad@temporal.io>
1 parent be4e099 commit 39f8e84

File tree

4 files changed

+81
-4
lines changed

4 files changed

+81
-4
lines changed

temporalio/worker/_activity.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,15 @@ async def _run_activity(
351351
):
352352
# Downgrade log level to DEBUG for BENIGN application errors.
353353
temporalio.activity.logger.debug(
354-
"Completing activity as failed", exc_info=True
354+
"Completing activity as failed",
355+
exc_info=True,
356+
extra={"__temporal_error_identifier": "ActivityFailure"},
355357
)
356358
else:
357359
temporalio.activity.logger.warning(
358-
"Completing activity as failed", exc_info=True
360+
"Completing activity as failed",
361+
exc_info=True,
362+
extra={"__temporal_error_identifier": "ActivityFailure"},
359363
)
360364
await self._data_converter.encode_failure(
361365
err, completion.result.failed.failure

temporalio/worker/_workflow_instance.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,10 @@ def activate(
447447
logger.warning(
448448
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
449449
exc_info=activation_err,
450-
extra={"temporal_workflow": self._info._logger_details()},
450+
extra={
451+
"temporal_workflow": self._info._logger_details(),
452+
"__temporal_error_identifier": "WorkflowTaskFailure",
453+
},
451454
)
452455
# Set completion failure
453456
self._current_completion.failed.failure.SetInParent()

tests/worker/test_activity.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,3 +1379,40 @@ def assert_activity_application_error(
13791379
ret = assert_activity_error(err)
13801380
assert isinstance(ret, ApplicationError)
13811381
return ret
1382+
1383+
1384+
class CustomLogHandler(logging.Handler):
1385+
def __init__(self):
1386+
super().__init__()
1387+
self._trace_identifiers = 0
1388+
1389+
def emit(self, record: logging.LogRecord) -> None:
1390+
if (
1391+
hasattr(record, "__temporal_error_identifier")
1392+
and getattr(record, "__temporal_error_identifier") == "ActivityFailure"
1393+
):
1394+
assert record.msg.startswith("Completing activity as failed")
1395+
self._trace_identifiers += 1
1396+
return None
1397+
1398+
1399+
async def test_activity_failure_trace_identifier(
1400+
client: Client, worker: ExternalWorker
1401+
):
1402+
@activity.defn
1403+
async def raise_error():
1404+
raise RuntimeError("oh no!")
1405+
1406+
handler = CustomLogHandler()
1407+
activity.logger.base_logger.addHandler(handler)
1408+
1409+
try:
1410+
with pytest.raises(WorkflowFailureError) as err:
1411+
await _execute_workflow_with_activity(client, worker, raise_error)
1412+
assert (
1413+
str(assert_activity_application_error(err.value)) == "RuntimeError: oh no!"
1414+
)
1415+
assert handler._trace_identifiers == 1
1416+
1417+
finally:
1418+
activity.logger.base_logger.removeHandler(CustomLogHandler())

tests/worker/test_workflow.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1961,8 +1961,13 @@ def logs_captured(self, *loggers: logging.Logger):
19611961
l.setLevel(prev_levels[i])
19621962

19631963
def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
1964+
return self.find(lambda l: l.message.startswith(starts_with))
1965+
1966+
def find(
1967+
self, pred: Callable[[logging.LogRecord], bool]
1968+
) -> Optional[logging.LogRecord]:
19641969
for record in cast(List[logging.LogRecord], self.log_queue.queue):
1965-
if record.message.startswith(starts_with):
1970+
if pred(record):
19661971
return record
19671972
return None
19681973

@@ -2058,6 +2063,7 @@ async def run(self) -> None:
20582063
if not task_fail_once_workflow_has_failed:
20592064
task_fail_once_workflow_has_failed = True
20602065
raise RuntimeError("Intentional workflow task failure")
2066+
task_fail_once_workflow_has_failed = False
20612067

20622068
# Execute activity that will fail once
20632069
await workflow.execute_activity(
@@ -7975,6 +7981,33 @@ async def test_quick_activity_swallows_cancellation(client: Client):
79757981
temporalio.worker._workflow_instance._raise_on_cancelling_completed_activity_override = False
79767982

79777983

7984+
async def test_workflow_logging_trace_identifier(client: Client):
7985+
with LogCapturer().logs_captured(
7986+
temporalio.worker._workflow_instance.logger
7987+
) as capturer:
7988+
async with new_worker(
7989+
client,
7990+
TaskFailOnceWorkflow,
7991+
activities=[task_fail_once_activity],
7992+
) as worker:
7993+
await client.execute_workflow(
7994+
TaskFailOnceWorkflow.run,
7995+
id=f"workflow_failure_trace_identifier",
7996+
task_queue=worker.task_queue,
7997+
)
7998+
7999+
def workflow_failure(l: logging.LogRecord):
8000+
if (
8001+
hasattr(l, "__temporal_error_identifier")
8002+
and getattr(l, "__temporal_error_identifier") == "WorkflowTaskFailure"
8003+
):
8004+
assert l.msg.startswith("Failed activation on workflow")
8005+
return True
8006+
return False
8007+
8008+
assert capturer.find(workflow_failure) is not None
8009+
8010+
79788011
@activity.defn
79798012
def use_in_workflow() -> bool:
79808013
return workflow.in_workflow()

0 commit comments

Comments
 (0)