Skip to content

Instantiate interceptors later in workflow instance construction #887

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,24 +301,6 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
str, List[temporalio.bridge.proto.workflow_activation.SignalWorkflow]
] = {}

# Create interceptors. We do this with our runtime on the loop just in
# case they want to access info() during init().
temporalio.workflow._Runtime.set_on_loop(asyncio.get_running_loop(), self)
try:
root_inbound = _WorkflowInboundImpl(self)
self._inbound: WorkflowInboundInterceptor = root_inbound
for interceptor_class in reversed(list(det.interceptor_classes)):
self._inbound = interceptor_class(self._inbound)
# During init we set ourselves on the current loop
self._inbound.init(_WorkflowOutboundImpl(self))
self._outbound = root_inbound._outbound
finally:
# Remove our runtime from the loop
temporalio.workflow._Runtime.set_on_loop(asyncio.get_running_loop(), None)

# Set ourselves on our own loop
temporalio.workflow._Runtime.set_on_loop(self, self)

# When we evict, we have to mark the workflow as deleting so we don't
# add any commands and we swallow exceptions on tear down
self._deleting = False
Expand All @@ -342,6 +324,24 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
Sequence[type[BaseException]]
] = None

# Create interceptors. We do this with our runtime on the loop just in
# case they want to access info() during init(). This should remain at the end of the constructor so that variables are defined during interceptor creation
temporalio.workflow._Runtime.set_on_loop(asyncio.get_running_loop(), self)
try:
root_inbound = _WorkflowInboundImpl(self)
self._inbound: WorkflowInboundInterceptor = root_inbound
for interceptor_class in reversed(list(det.interceptor_classes)):
self._inbound = interceptor_class(self._inbound)
# During init we set ourselves on the current loop
self._inbound.init(_WorkflowOutboundImpl(self))
self._outbound = root_inbound._outbound
finally:
# Remove our runtime from the loop
temporalio.workflow._Runtime.set_on_loop(asyncio.get_running_loop(), None)

# Set ourselves on our own loop
temporalio.workflow._Runtime.set_on_loop(self, self)

def get_thread_id(self) -> Optional[int]:
return self._current_thread_id

Expand Down
30 changes: 30 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7973,3 +7973,33 @@ async def test_quick_activity_swallows_cancellation(client: Client):
assert cause.message == "Workflow cancelled"

temporalio.worker._workflow_instance._raise_on_cancelling_completed_activity_override = False


class SignalInterceptor(temporalio.worker.Interceptor):
def workflow_interceptor_class(
self, input: temporalio.worker.WorkflowInterceptorClassInput
) -> Type[SignalInboundInterceptor]:
return SignalInboundInterceptor


class SignalInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor):
def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None:
def unblock() -> None:
return None

workflow.set_signal_handler("my_random_signal", unblock)
super().init(outbound)


async def test_signal_handler_in_interceptor(client: Client):
async with new_worker(
client,
HelloWorkflow,
interceptors=[SignalInterceptor()],
) as worker:
await client.execute_workflow(
HelloWorkflow.run,
"Temporal",
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
Loading