Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scripts/gen_payload_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ def walk(self, desc: Descriptor) -> bool:
if key in self.generated:
return self.generated[key]
if key in self.in_progress:
# Break cycles; if another path proves this node needed, we'll revisit
return False
# Break cycles; Assume the child will be needed (Used by Failure -> Cause)
return True

has_payload = False
self.in_progress.add(key)
Expand Down
2 changes: 2 additions & 0 deletions temporalio/bridge/_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ async def _visit_temporal_api_failure_v1_ResetWorkflowFailureInfo(self, fs, o):
async def _visit_temporal_api_failure_v1_Failure(self, fs, o):
if o.HasField("encoded_attributes"):
await self._visit_temporal_api_common_v1_Payload(fs, o.encoded_attributes)
if o.HasField("cause"):
await self._visit_temporal_api_failure_v1_Failure(fs, o.cause)
if o.HasField("application_failure_info"):
await self._visit_temporal_api_failure_v1_ApplicationFailureInfo(
fs, o.application_failure_info
Expand Down
57 changes: 54 additions & 3 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import typing
import uuid
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from enum import IntEnum
from functools import partial
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Mapping,
Expand Down Expand Up @@ -8383,7 +8381,7 @@ async def test_search_attribute_codec(client: Client, env_type: str):
result = await client.execute_workflow(
SearchAttributeCodecParentWorkflow.run,
"Temporal",
id=f"encryption-workflow-id",
id="encryption-workflow-id",
task_queue=worker.task_queue,
search_attributes=TypedSearchAttributes(
[
Expand All @@ -8393,3 +8391,56 @@ async def test_search_attribute_codec(client: Client, env_type: str):
]
),
)


@activity.defn
async def activity_that_fails_with_details() -> str:
"""Activity that raises an ApplicationError with custom details."""
raise ApplicationError(
"Activity failed intentionally",
"detail1",
{"error_code": "NOT_FOUND", "id": "test-123"},
non_retryable=True,
)


@workflow.defn
class WorkflowWithFailingActivityAndCodec:
@workflow.run
async def run(self) -> str:
try:
return await workflow.execute_activity(
activity_that_fails_with_details,
schedule_to_close_timeout=timedelta(seconds=3),
retry_policy=RetryPolicy(maximum_attempts=1),
)
except ActivityError as err:
assert isinstance(err.cause, ApplicationError)
assert err.cause.message == "Activity failed intentionally"
assert len(err.cause.details) == 2
assert err.cause.details[0] == "detail1"
assert err.cause.details[1] == {"error_code": "NOT_FOUND", "id": "test-123"}
return "Handled encrypted failure successfully"


async def test_activity_failure_with_encoded_payload_is_decoded_in_workflow(
client: Client,
):
config = client.config()
config["data_converter"] = dataclasses.replace(
temporalio.converter.default(), payload_codec=EncryptionCodec()
)
client = Client(**config)

async with new_worker(
client,
WorkflowWithFailingActivityAndCodec,
activities=[activity_that_fails_with_details],
) as worker:
result = await client.execute_workflow(
WorkflowWithFailingActivityAndCodec.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
run_timeout=timedelta(seconds=5),
)
assert result == "Handled encrypted failure successfully"
Loading