Skip to content

Adding trace identification numbers #888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 10, 2025
8 changes: 6 additions & 2 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,15 @@ async def _run_activity(
):
# Downgrade log level to DEBUG for BENIGN application errors.
temporalio.activity.logger.debug(
"Completing activity as failed", exc_info=True
"Completing activity as failed",
exc_info=True,
extra={"__temporal_error_identifier": "ActivityFailure"},
)
else:
temporalio.activity.logger.warning(
"Completing activity as failed", exc_info=True
"Completing activity as failed",
exc_info=True,
extra={"__temporal_error_identifier": "ActivityFailure"},
)
await self._data_converter.encode_failure(
err, completion.result.failed.failure
Expand Down
5 changes: 4 additions & 1 deletion temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,10 @@ def activate(
logger.warning(
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
exc_info=activation_err,
extra={"temporal_workflow": self._info._logger_details()},
extra={
"temporal_workflow": self._info._logger_details(),
"__temporal_error_identifier": "WorkflowTaskFailure",
},
)
# Set completion failure
self._current_completion.failed.failure.SetInParent()
Expand Down
37 changes: 37 additions & 0 deletions tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1379,3 +1379,40 @@ def assert_activity_application_error(
ret = assert_activity_error(err)
assert isinstance(ret, ApplicationError)
return ret


class CustomLogHandler(logging.Handler):
def __init__(self):
super().__init__()
self._trace_identifiers = 0

def emit(self, record: logging.LogRecord) -> None:
if (
hasattr(record, "__temporal_error_identifier")
and getattr(record, "__temporal_error_identifier") == "ActivityFailure"
):
assert record.msg.startswith("Completing activity as failed")
self._trace_identifiers += 1
return None


async def test_activity_failure_trace_identifier(
client: Client, worker: ExternalWorker
):
@activity.defn
async def raise_error():
raise RuntimeError("oh no!")

handler = CustomLogHandler()
activity.logger.base_logger.addHandler(handler)

try:
with pytest.raises(WorkflowFailureError) as err:
await _execute_workflow_with_activity(client, worker, raise_error)
assert (
str(assert_activity_application_error(err.value)) == "RuntimeError: oh no!"
)
assert handler._trace_identifiers == 1

finally:
activity.logger.base_logger.removeHandler(CustomLogHandler())
35 changes: 34 additions & 1 deletion tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1961,8 +1961,13 @@ def logs_captured(self, *loggers: logging.Logger):
l.setLevel(prev_levels[i])

def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
return self.find(lambda l: l.message.startswith(starts_with))

def find(
self, pred: Callable[[logging.LogRecord], bool]
) -> Optional[logging.LogRecord]:
for record in cast(List[logging.LogRecord], self.log_queue.queue):
if record.message.startswith(starts_with):
if pred(record):
return record
return None

Expand Down Expand Up @@ -2058,6 +2063,7 @@ async def run(self) -> None:
if not task_fail_once_workflow_has_failed:
task_fail_once_workflow_has_failed = True
raise RuntimeError("Intentional workflow task failure")
task_fail_once_workflow_has_failed = False

# Execute activity that will fail once
await workflow.execute_activity(
Expand Down Expand Up @@ -7975,6 +7981,33 @@ async def test_quick_activity_swallows_cancellation(client: Client):
temporalio.worker._workflow_instance._raise_on_cancelling_completed_activity_override = False


async def test_workflow_logging_trace_identifier(client: Client):
with LogCapturer().logs_captured(
temporalio.worker._workflow_instance.logger
) as capturer:
async with new_worker(
client,
TaskFailOnceWorkflow,
activities=[task_fail_once_activity],
) as worker:
await client.execute_workflow(
TaskFailOnceWorkflow.run,
id=f"workflow_failure_trace_identifier",
task_queue=worker.task_queue,
)

def workflow_failure(l: logging.LogRecord):
if (
hasattr(l, "__temporal_error_identifier")
and getattr(l, "__temporal_error_identifier") == "WorkflowTaskFailure"
):
assert l.msg.startswith("Failed activation on workflow")
return True
return False

assert capturer.find(workflow_failure) is not None


@activity.defn
def use_in_workflow() -> bool:
return workflow.in_workflow()
Expand Down
Loading