Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions tests/worker/test_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from temporalio import activity, workflow
from temporalio.client import Client, WorkflowUpdateFailedError
from temporalio.exceptions import ApplicationError
from temporalio.exceptions import ApplicationError, NexusOperationError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import (
ActivityInboundInterceptor,
Expand All @@ -27,6 +27,8 @@
WorkflowInterceptorClassInput,
WorkflowOutboundInterceptor,
)
from temporalio.worker._interceptor import StartNexusOperationInput
from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name

# Passing through because Python 3.9 has an import bug at
# https://github.com/python/cpython/issues/91351
Expand Down Expand Up @@ -127,6 +129,12 @@ def start_local_activity(
interceptor_traces.append(("workflow.start_local_activity", input))
return super().start_local_activity(input)

async def start_nexus_operation(
self, input: StartNexusOperationInput
) -> workflow.NexusOperationHandle:
interceptor_traces.append(("workflow.start_nexus_operation", input))
return await super().start_nexus_operation(input)


@activity.defn
async def intercepted_activity(param: str) -> str:
Expand Down Expand Up @@ -169,6 +177,24 @@ async def run(self, style: str) -> None:
)
await child_handle

nexus_client = workflow.create_nexus_client(
endpoint=make_nexus_endpoint_name(workflow.info().task_queue),
service="non-existent-nexus-service",
)
try:
await nexus_client.start_operation(
operation="non-existent-nexus-operation",
input={"test": "data"},
schedule_to_close_timeout=timedelta(microseconds=1),
)
raise Exception("unreachable")
except NexusOperationError:
# The test requires only that the workflow attempts to schedule the nexus operation.
# Instead of setting up a nexus service, we deliberately schedule a call to a
# non-existent nexus operation with an insufficiently long timeout, and expect this
# error.
pass

await self.finish.wait()
workflow.continue_as_new("continue-as-new")

Expand Down Expand Up @@ -200,7 +226,9 @@ async def test_worker_interceptor(client: Client, env: WorkflowEnvironment):
pytest.skip(
"Java test server: https://github.com/temporalio/sdk-java/issues/1424"
)
task_queue = f"task_queue_{uuid.uuid4()}"
task_queue = f"task-queue-{uuid.uuid4()}"
await create_nexus_endpoint(task_queue, client)

async with Worker(
client,
task_queue=task_queue,
Expand Down Expand Up @@ -276,6 +304,8 @@ def pop_trace(name: str, filter: Optional[Callable[[Any], bool]] = None) -> Any:
"workflow.signal_external_workflow",
lambda v: v.args[0] == "external-signal-val",
)
assert pop_trace("workflow.info")
assert pop_trace("workflow.start_nexus_operation")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Interceptor Missing Method, Duplicate Trace Assertion

The test has two assertion failures related to interceptor traces:

  1. TracingWorkflowOutboundInterceptor lacks a start_nexus_operation method, preventing interception of nexus_client.start_operation and causing the pop_trace("workflow.start_nexus_operation") assertion to fail.
  2. A duplicate pop_trace("workflow.info") assertion exists. The first call consumes the trace, leading to the second assertion's failure.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a valid finding; I'd acidentally lost some of the commit in a git rebasing operation.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Trace Assertion Order Mismatch

The test's trace assertions for workflow.info (during Nexus client creation) and workflow.start_nexus_operation are ordered incorrectly. These events happen after the external workflow signal is delivered, but the assertions check for them before the external signal trace.

Fix in Cursor Fix in Web

assert pop_trace(
"workflow.signal", lambda v: v.args[0] == "external-signal-val"
)
Expand Down