Skip to content

Run event loop after job application #717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 55 additions & 39 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._worker_level_failure_exception_types = (
det.worker_level_failure_exception_types
)
self._primary_task_initter: Optional[Callable[[], None]] = None
self._primary_task: Optional[asyncio.Task[None]] = None
self._time_ns = 0
self._cancel_requested = False
Expand Down Expand Up @@ -356,39 +357,38 @@ def activate(
self._current_thread_id = threading.get_ident()
activation_err: Optional[Exception] = None
try:
# Split into job sets with patches, then signals + updates, then
# non-queries, then queries
start_job = None
job_sets: List[
List[temporalio.bridge.proto.workflow_activation.WorkflowActivationJob]
] = [[], [], [], []]
# Apply every job, running the loop afterward
no_queries = True
for job in act.jobs:
if job.HasField("notify_has_patch"):
job_sets[0].append(job)
elif job.HasField("signal_workflow") or job.HasField("do_update"):
job_sets[1].append(job)
elif not job.HasField("query_workflow"):
if job.HasField("initialize_workflow"):
start_job = job.initialize_workflow
job_sets[2].append(job)
else:
job_sets[3].append(job)

if start_job:
self._workflow_input = self._make_workflow_input(start_job)

# Apply every job set, running after each set
for index, job_set in enumerate(job_sets):
if not job_set:
continue
for job in job_set:
# Let errors bubble out of these to the caller to fail the task
self._apply(job)

# Run one iteration of the loop. We do not allow conditions to
# be checked in patch jobs (first index) or query jobs (last
# index).
self._run_once(check_conditions=index == 1 or index == 2)
if job.HasField("initialize_workflow"):
self._workflow_input = self._make_workflow_input(
job.initialize_workflow
)
elif job.HasField("query_workflow"):
no_queries = False
# Let errors bubble out of these to the caller to fail the task
self._apply(job)

# Conditions are not checked on query activations. Query activations always come without
# any other jobs.
first_task = False
try:
self._run_once(check_conditions=no_queries)
finally:
# Ensure the main workflow function task is initialized after a first run of the
# event loop, which might execute before-start signals/updates. This is behind
# a finally because if those handlers fail, we need still need the main routine
# to be initialized in order to fail tasks properly.
Comment on lines +378 to +381
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it weren't for this bit we would've ended up with net-less code here. A bit too bad, but, I think still nicer than grouping things into batches.

if (
self._primary_task_initter is not None
and self._primary_task is None
):
self._primary_task_initter()
first_task = True
# Because we want any before-start signals/updates to fully process before running
# the main routine for the first time, we run the loop again if this is the first time.
if first_task:
self._run_once(check_conditions=True)
except Exception as err:
# We want some errors during activation, like those that can happen
# during payload conversion, to be able to fail the workflow not the
Expand Down Expand Up @@ -508,6 +508,17 @@ def _apply_cancel_workflow(
# workflow the ability to receive the cancellation, so we must defer
# this cancellation to the next iteration of the event loop.
self.call_soon(self._primary_task.cancel)
elif self._primary_task_initter:
# If we're being cancelled before ever being started, we need to run the cancel
# after initialization
old_initter = self._primary_task_initter

def init_then_cancel():
old_initter()
if self._primary_task:
self.call_soon(self._primary_task.cancel)

self._primary_task_initter = init_then_cancel

def _apply_do_update(
self, job: temporalio.bridge.proto.workflow_activation.DoUpdate
Expand Down Expand Up @@ -885,14 +896,19 @@ async def run_workflow(input: ExecuteWorkflowInput) -> None:
return
raise

if not self._workflow_input:
raise RuntimeError(
"Expected workflow input to be set. This is an SDK Python bug."
def primary_initter():
if not self._workflow_input:
raise RuntimeError(
"Expected workflow input to be set. This is an SDK Python bug."
)
self._primary_task = self.create_task(
self._run_top_level_workflow_function(
run_workflow(self._workflow_input)
),
name="run",
)
self._primary_task = self.create_task(
self._run_top_level_workflow_function(run_workflow(self._workflow_input)),
name="run",
)

self._primary_task_initter = primary_initter

def _apply_update_random_seed(
self, job: temporalio.bridge.proto.workflow_activation.UpdateRandomSeed
Expand Down
99 changes: 97 additions & 2 deletions tests/worker/test_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import Dict
from typing import Any, Dict, Optional, Type

import pytest

from temporalio import activity, workflow
from temporalio.client import Client, WorkflowFailureError, WorkflowHistory
from temporalio.exceptions import ApplicationError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Replayer, Worker
from temporalio.worker import (
ExecuteWorkflowInput,
Interceptor,
Replayer,
Worker,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)
from tests.helpers import assert_eq_eventually
from tests.worker.test_workflow import SignalsActivitiesTimersUpdatesTracingWorkflow, \
ActivityAndSignalsWhileWorkflowDown


@activity.defn
Expand Down Expand Up @@ -385,3 +394,89 @@ async def test_replayer_command_reordering_backward_compatibility() -> None:
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
WorkflowHistory.from_json("fake", history)
)


workflow_res = None


class WorkerWorkflowResultInterceptor(Interceptor):
def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return WorkflowResultInterceptor


class WorkflowResultInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
global workflow_res
res = await super().execute_workflow(input)
workflow_res = res
return res


async def test_replayer_async_ordering() -> None:
"""
This test verifies that the order that asyncio tasks/coroutines are woken up matches the
order they were before changes to apply all jobs and then run the event loop, where previously
the event loop was ran after each "batch" of jobs.
"""
histories_and_expecteds = [
(
"test_replayer_event_tracing.json",
[
"sig-before-sync",
"sig-before-1",
"sig-before-2",
"timer-sync",
"act-sync",
"act-1",
"act-2",
"sig-1-sync",
"sig-1-1",
"sig-1-2",
"update-1-sync",
"update-1-1",
"update-1-2",
"timer-1",
"timer-2",
],
),
(
"test_replayer_event_tracing_double_sig_at_start.json",
[
"sig-before-sync",
"sig-before-1",
"sig-1-sync",
"sig-1-1",
"sig-before-2",
"sig-1-2",
"timer-sync",
"act-sync",
"update-1-sync",
"update-1-1",
"update-1-2",
"act-1",
"act-2",
"timer-1",
"timer-2",
],
),
]
for history, expected in histories_and_expecteds:
with Path(__file__).with_name(history).open() as f:
history = f.read()
await Replayer(
workflows=[SignalsActivitiesTimersUpdatesTracingWorkflow],
interceptors=[WorkerWorkflowResultInterceptor()],
).replay_workflow(WorkflowHistory.from_json("fake", history))
assert workflow_res == expected


async def test_replayer_alternate_async_ordering() -> None:
with Path(__file__).with_name("test_replayer_event_tracing_alternate.json").open() as f:
history = f.read()
await Replayer(
workflows=[ActivityAndSignalsWhileWorkflowDown],
interceptors=[WorkerWorkflowResultInterceptor()],
).replay_workflow(WorkflowHistory.from_json("fake", history))
assert workflow_res == ["act-start", "sig-1", "sig-2", "counter-2", "act-done"]
Loading
Loading