Skip to content

Question: Logging of uncaught exceptions when running under a threadpool executor #677

Closed
@sarnikowski

Description

@sarnikowski

When one normally wants to override the writing of uncaught exceptions in python, you can use the excepthook of sys.
Below is an example of how one can do this in temporal:

import asyncio
import datetime as dt
import logging
import sys
import traceback

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

logging.basicConfig(
    level=logging.ERROR,
    format="%(asctime)s - %(levelname)s - %(message)s",
)

logger = logging.getLogger(__name__)

def log_exception(exc_type, exc_value, exc_traceback):
    tb_lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
    tb_text = "".join(tb_lines)
    logger.error("Uncaught exception:\n%s", tb_text)

@activity.defn
async def say_hello(name: str) -> str:
    raise Exception("ohh no, I could not say hello")

@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            say_hello,
            name,
            start_to_close_timeout=dt.timedelta(seconds=10),
            retry_policy=RetryPolicy(
                initial_interval=dt.timedelta(seconds=0),
                maximum_attempts=1,
            ),
        )

async def start_temporal_server():
    return await WorkflowEnvironment.start_local(namespace="default", dev_server_log_level="error")

async def run_workflow(client: Client):
    async with Worker(
        client,
        task_queue="hello-task-queue",
        workflows=[GreetingWorkflow],
        activities=[say_hello],
    ):
        result = await client.execute_workflow(
            GreetingWorkflow.run,
            "World",
            id="greeting-workflow",
            task_queue="hello-task-queue",
        )
        print(f"Workflow result: {result}")

async def main():
    sys.excepthook = log_exception
    server = await start_temporal_server()
    await run_workflow(server.client)

if __name__ == "__main__":
    asyncio.run(main())

However, if the workers are running using for example a threadpool executor, this does not work because the hook of sys.excepthook is not propagated to these threads (ref: python/cpython#13515).

I have an application where we do not want to print exceptions and their stacktraces to stderr, but rather log all uncaught exceptions. The problem is that if I want to do this for threads, the usual approach is to simply catch the exception and log the errors instead of raising an exception. However, if I want the temporal workflow to pick up the fact that an activity has failed, I believe I cannot do this ? So in essence I would like:

  1. To log all uncaught exceptions instead of printing these to stderr.
  2. Have temporal read the exceptions and stacktrace etc. so that it is readable on a workflow in the UI for example.
  3. Have activities and workflows behave as usual, with errors triggering retries etc.

Is there a way to achieve this ?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions