Skip to content

[Bug] Exception in converter encoding doesn't propagate as expected #540

Closed
@assafz-q

Description

@assafz-q

What are you really trying to do?

I am writing a temporal workflow and I want to handle the exceptions that happen during the execution, I want the workflow to catch any unexpected error and fail the entire workflow in a non-retriable way.

Describe the bug

I am catching all the exceptions during workflow run and re-raise them as non-retriable ApplicationErrors and everything works as expected except for the case where the exception is thrown during the encoding of data to an activity.

Running a workflow that sends unserializable obj to an activity results in the following history (which doesn't contain any failure in it except for the timeout [2s]):
image

and the following warning in the logs:

WARN temporal_sdk_core::worker::workflowError while completing workflow activation error=statusInvalidArgumentmessage"invalid TaskQueue on ScheduleActivityTaskCommand: missing task queue name. ActivityId=1 ActivityType=some_activity"details: [], metadataMetadataMap { headers: {"content-type""application/grpc"} }

I would expect the workflow to fail because of the exception (and log it in the history) before the timeout occurs, in the same way it happens if exception is thrown in the middle of the workflow execution.

Minimal Reproduction

import asyncio
import datetime
import uuid

import temporalio.activity
import temporalio.client
import temporalio.common
import temporalio.exceptions
import temporalio.worker
import temporalio.workflow


#### activities
class SomeObj# non serializable by default so senfding it to activities should fail
    def __init__(selfvaluestr):
        self.value = value


@temporalio.activity.defn
async def some_activity(objSomeObj-> str:
    return obj.value


#### workflow
@temporalio.workflow.defn
class FailingWorkflowExceptionDuringActivitySend:
    @temporalio.workflow.run
    async def run(self-> str:
        try:
            obj = SomeObj("value")
            value = await temporalio.workflow.execute_activity(
                some_activity,
                obj,
                start_to_close_timeout=datetime.timedelta(seconds=10),
            )
            return value
        except Exception as e:
            raise temporalio.exceptions.ApplicationError(
                "Failed during activity send as expected"non_retryable=True
            ) from e


####


async def _main():
    client = await temporalio.client.Client.connect(
        target_host="localhost:7233",
    )

    task_queue = "task-queue-name"
    workflow = FailingWorkflowExceptionDuringActivitySend

    async with temporalio.worker.Worker(
        client,
        task_queue=task_queue,
        workflows=[workflow],
        activities=[some_activity],
    ):
        try:
            handle = await client.start_workflow(
                workflow.run,
                id=f"{workflow.__name__}-{uuid.uuid4().hex}",
                task_queue=task_queue,
                run_timeout=datetime.timedelta(seconds=2),
                execution_timeout=datetime.timedelta(seconds=2),
                retry_policy=temporalio.common.RetryPolicy(
                    maximum_attempts=1,
                ),
            )

            result = await handle.result()
            print(f"Result: {result}")
        except Exception as e:
            print(f"Workflow: {workflow.__name__} failed:{e.__cause__}")


if __name__ == "__main__":
    asyncio.run(_main())

Environment/Versions

  • OS and processor: Linux
  • Temporal Version: sdk version: temporalio==1.6.0, temporal cli (temporal --version) temporal version 0.12.0 (server 1.23.0) (ui 2.26.2)
  • Are you using Docker or Kubernetes or building Temporal from source? No

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions