Skip to content

Commit 53462cc

Browse files
committed
Ensure compatibility with old approach & add tests
1 parent a901b4d commit 53462cc

File tree

5 files changed

+1001
-31
lines changed

5 files changed

+1001
-31
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -358,23 +358,36 @@ def activate(
358358
activation_err: Optional[Exception] = None
359359
try:
360360
# Apply every job, running the loop afterward
361-
is_query = False
361+
no_queries = True
362362
for job in act.jobs:
363363
if job.HasField("initialize_workflow"):
364364
self._workflow_input = self._make_workflow_input(
365365
job.initialize_workflow
366366
)
367367
elif job.HasField("query_workflow"):
368-
is_query = True
368+
no_queries = False
369369
# Let errors bubble out of these to the caller to fail the task
370370
self._apply(job)
371371

372372
# Conditions are not checked on query activations. Query activations always come without
373373
# any other jobs.
374-
self._run_once(check_conditions=not is_query)
375-
# Ensure the main workflow function is called on first task, and called last.
376-
if self._primary_task_initter is not None and self._primary_task is None:
377-
self._primary_task_initter()
374+
first_task = False
375+
try:
376+
self._run_once(check_conditions=no_queries)
377+
finally:
378+
# Ensure the main workflow function task is initialized after a first run of the
379+
# event loop, which might execute before-start signals/updates. This is behind
380+
# a finally because if those handlers fail, we need still need the main routine
381+
# to be initialized in order to fail tasks properly.
382+
if (
383+
self._primary_task_initter is not None
384+
and self._primary_task is None
385+
):
386+
self._primary_task_initter()
387+
first_task = True
388+
# Because we want any before-start signals/updates to fully process before running
389+
# the main routine for the first time, we run the loop again if this is the first time.
390+
if first_task:
378391
self._run_once(check_conditions=True)
379392
except Exception as err:
380393
# We want some errors during activation, like those that can happen

tests/worker/test_replayer.py

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,24 @@
44
from dataclasses import dataclass
55
from datetime import timedelta
66
from pathlib import Path
7-
from typing import Dict
7+
from typing import Any, Dict, Optional, Type
88

99
import pytest
1010

1111
from temporalio import activity, workflow
1212
from temporalio.client import Client, WorkflowFailureError, WorkflowHistory
1313
from temporalio.exceptions import ApplicationError
1414
from temporalio.testing import WorkflowEnvironment
15-
from temporalio.worker import Replayer, Worker
15+
from temporalio.worker import (
16+
ExecuteWorkflowInput,
17+
Interceptor,
18+
Replayer,
19+
Worker,
20+
WorkflowInboundInterceptor,
21+
WorkflowInterceptorClassInput,
22+
)
1623
from tests.helpers import assert_eq_eventually
24+
from tests.worker.test_workflow import SignalsActivitiesTimersUpdatesTracingWorkflow
1725

1826

1927
@activity.defn
@@ -385,3 +393,79 @@ async def test_replayer_command_reordering_backward_compatibility() -> None:
385393
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
386394
WorkflowHistory.from_json("fake", history)
387395
)
396+
397+
398+
workflow_res = None
399+
400+
401+
class WorkerWorkflowResultInterceptor(Interceptor):
402+
def workflow_interceptor_class(
403+
self, input: WorkflowInterceptorClassInput
404+
) -> Optional[Type[WorkflowInboundInterceptor]]:
405+
return WorkflowResultInterceptor
406+
407+
408+
class WorkflowResultInterceptor(WorkflowInboundInterceptor):
409+
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
410+
global workflow_res
411+
res = await super().execute_workflow(input)
412+
workflow_res = res
413+
return res
414+
415+
416+
async def test_replayer_async_ordering() -> None:
417+
"""
418+
This test verifies that the order that asyncio tasks/coroutines are woken up matches the
419+
order they were before changes to apply all jobs and then run the event loop, where previously
420+
the event loop was ran after each "batch" of jobs.
421+
"""
422+
histories_and_expecteds = [
423+
(
424+
"test_replayer_event_tracing.json",
425+
[
426+
"sig-before-sync",
427+
"sig-before-1",
428+
"sig-before-2",
429+
"timer-sync",
430+
"act-sync",
431+
"act-1",
432+
"act-2",
433+
"sig-1-sync",
434+
"sig-1-1",
435+
"sig-1-2",
436+
"update-1-sync",
437+
"update-1-1",
438+
"update-1-2",
439+
"timer-1",
440+
"timer-2",
441+
],
442+
),
443+
(
444+
"test_replayer_event_tracing_double_sig_at_start.json",
445+
[
446+
"sig-before-sync",
447+
"sig-before-1",
448+
"sig-1-sync",
449+
"sig-1-1",
450+
"sig-before-2",
451+
"sig-1-2",
452+
"timer-sync",
453+
"act-sync",
454+
"update-1-sync",
455+
"update-1-1",
456+
"update-1-2",
457+
"act-1",
458+
"act-2",
459+
"timer-1",
460+
"timer-2",
461+
],
462+
),
463+
]
464+
for history, expected in histories_and_expecteds:
465+
with Path(__file__).with_name(history).open() as f:
466+
history = f.read()
467+
await Replayer(
468+
workflows=[SignalsActivitiesTimersUpdatesTracingWorkflow],
469+
interceptors=[WorkerWorkflowResultInterceptor()],
470+
).replay_workflow(WorkflowHistory.from_json("fake", history))
471+
assert workflow_res == expected

0 commit comments

Comments
 (0)