Description
What are you really trying to do?
I am writing a temporal workflow and I want to handle the exceptions that happen during the execution, I want the workflow to catch any unexpected error and fail the entire workflow in a non-retriable way.
Describe the bug
I am catching all the exceptions during workflow run and re-raise them as non-retriable ApplicationErrors and everything works as expected except for the case where the exception is thrown during the encoding of data to an activity.
Running a workflow that sends unserializable obj to an activity results in the following history (which doesn't contain any failure in it except for the timeout [2s]):
and the following warning in the logs:
WARN temporal_sdk_core::worker::workflow: Error while completing workflow activation error=status: InvalidArgument, message: "invalid TaskQueue on ScheduleActivityTaskCommand: missing task queue name. ActivityId=1 ActivityType=some_activity", details: [], metadata: MetadataMap { headers: {"content-type": "application/grpc"} }
I would expect the workflow to fail because of the exception (and log it in the history) before the timeout occurs, in the same way it happens if exception is thrown in the middle of the workflow execution.
Minimal Reproduction
import asyncio
import datetime
import uuid
import temporalio.activity
import temporalio.client
import temporalio.common
import temporalio.exceptions
import temporalio.worker
import temporalio.workflow
#### activities
class SomeObj: # non serializable by default so senfding it to activities should fail
def __init__(self, value: str):
self.value = value
@temporalio.activity.defn
async def some_activity(obj: SomeObj) -> str:
return obj.value
#### workflow
@temporalio.workflow.defn
class FailingWorkflowExceptionDuringActivitySend:
@temporalio.workflow.run
async def run(self) -> str:
try:
obj = SomeObj("value")
value = await temporalio.workflow.execute_activity(
some_activity,
obj,
start_to_close_timeout=datetime.timedelta(seconds=10),
)
return value
except Exception as e:
raise temporalio.exceptions.ApplicationError(
"Failed during activity send as expected", non_retryable=True
) from e
####
async def _main():
client = await temporalio.client.Client.connect(
target_host="localhost:7233",
)
task_queue = "task-queue-name"
workflow = FailingWorkflowExceptionDuringActivitySend
async with temporalio.worker.Worker(
client,
task_queue=task_queue,
workflows=[workflow],
activities=[some_activity],
):
try:
handle = await client.start_workflow(
workflow.run,
id=f"{workflow.__name__}-{uuid.uuid4().hex}",
task_queue=task_queue,
run_timeout=datetime.timedelta(seconds=2),
execution_timeout=datetime.timedelta(seconds=2),
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=1,
),
)
result = await handle.result()
print(f"Result: {result}")
except Exception as e:
print(f"Workflow: {workflow.__name__} failed:{e.__cause__}")
if __name__ == "__main__":
asyncio.run(_main())
Environment/Versions
- OS and processor: Linux
- Temporal Version: sdk version: temporalio==1.6.0, temporal cli (
temporal --version
)temporal version 0.12.0 (server 1.23.0) (ui 2.26.2)
- Are you using Docker or Kubernetes or building Temporal from source? No