Skip to content

Commit 89b721a

Browse files
committed
Serialization context changes in anticipation of Standalone Activity
1 parent cde3427 commit 89b721a

File tree

4 files changed

+62
-44
lines changed

4 files changed

+62
-44
lines changed

temporalio/converter.py

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -93,51 +93,50 @@ class SerializationContext(ABC):
9393

9494

9595
@dataclass(frozen=True)
96-
class BaseWorkflowSerializationContext(SerializationContext):
97-
"""Base serialization context shared by workflow and activity serialization contexts."""
98-
99-
namespace: str
100-
workflow_id: str
101-
102-
103-
@dataclass(frozen=True)
104-
class WorkflowSerializationContext(BaseWorkflowSerializationContext):
96+
class WorkflowSerializationContext(SerializationContext):
10597
"""Serialization context for workflows.
10698
10799
See :py:class:`SerializationContext` for more details.
108-
109-
Attributes:
110-
namespace: The namespace the workflow is running in.
111-
workflow_id: The ID of the workflow. Note that this is the ID of the workflow of which the
112-
payload being operated on is an input or output. Note also that when creating/describing
113-
schedules, this may be the workflow ID prefix as configured, not the final workflow ID
114-
when the workflow is created by the schedule.
115100
"""
116101

117-
pass
102+
namespace: str
103+
"""Namespace used by the worker executing the workflow."""
104+
105+
workflow_id: Optional[str]
106+
"""Workflow ID.
107+
108+
Note that this is the ID of the workflow of which the payload being operated on is an input or
109+
output. When creating/describing schedules, this may be the workflow ID prefix as configured,
110+
not the final workflow ID when the workflow is created by the schedule."""
118111

119112

120113
@dataclass(frozen=True)
121-
class ActivitySerializationContext(BaseWorkflowSerializationContext):
114+
class ActivitySerializationContext(SerializationContext):
122115
"""Serialization context for activities.
123116
124117
See :py:class:`SerializationContext` for more details.
125-
126-
Attributes:
127-
namespace: Workflow/activity namespace.
128-
workflow_id: Workflow ID. Note, when creating/describing schedules,
129-
this may be the workflow ID prefix as configured, not the final workflow ID when the
130-
workflow is created by the schedule.
131-
workflow_type: Workflow Type.
132-
activity_type: Activity Type.
133-
activity_task_queue: Activity task queue.
134-
is_local: Whether the activity is a local activity.
135118
"""
136119

137-
workflow_type: str
120+
namespace: str
121+
"""Namespace used by the worker executing the activity."""
122+
123+
activity_id: Optional[str]
124+
"""Activity ID."""
125+
138126
activity_type: str
127+
"""Activity type."""
128+
139129
activity_task_queue: str
130+
"""Activity task queue."""
131+
132+
workflow_id: Optional[str]
133+
"""Workflow ID."""
134+
135+
workflow_type: Optional[str]
136+
"""Workflow type."""
137+
140138
is_local: bool
139+
"""Whether the activity is a local activity."""
141140

142141

143142
class WithSerializationContext(ABC):

temporalio/worker/_activity.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,11 +256,12 @@ async def _heartbeat_async(
256256
if activity.info:
257257
context = temporalio.converter.ActivitySerializationContext(
258258
namespace=activity.info.workflow_namespace,
259-
workflow_id=activity.info.workflow_id,
260-
workflow_type=activity.info.workflow_type,
259+
activity_id=activity.info.activity_id,
261260
activity_type=activity.info.activity_type,
262261
activity_task_queue=self._task_queue,
263262
is_local=activity.info.is_local,
263+
workflow_id=activity.info.workflow_id,
264+
workflow_type=activity.info.workflow_type,
264265
)
265266
data_converter = data_converter.with_context(context)
266267

@@ -308,11 +309,12 @@ async def _handle_start_activity_task(
308309
# Create serialization context for the activity
309310
context = temporalio.converter.ActivitySerializationContext(
310311
namespace=start.workflow_namespace,
311-
workflow_id=start.workflow_execution.workflow_id,
312-
workflow_type=start.workflow_type,
312+
activity_id=start.activity_id,
313313
activity_type=start.activity_type,
314314
activity_task_queue=self._task_queue,
315315
is_local=start.is_local,
316+
workflow_id=start.workflow_execution.workflow_id,
317+
workflow_type=start.workflow_type,
316318
)
317319
data_converter = self._data_converter.with_context(context)
318320
try:

temporalio/worker/_workflow_instance.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -788,15 +788,16 @@ def _apply_resolve_activity(
788788
raise RuntimeError(f"Failed finding activity handle for sequence {job.seq}")
789789
activity_context = temporalio.converter.ActivitySerializationContext(
790790
namespace=self._info.namespace,
791-
workflow_id=self._info.workflow_id,
792-
workflow_type=self._info.workflow_type,
791+
activity_id=handle._input.activity_id,
793792
activity_type=handle._input.activity,
794793
activity_task_queue=(
795794
handle._input.task_queue or self._info.task_queue
796795
if isinstance(handle._input, StartActivityInput)
797796
else self._info.task_queue
798797
),
799798
is_local=isinstance(handle._input, StartLocalActivityInput),
799+
workflow_id=self._info.workflow_id,
800+
workflow_type=self._info.workflow_type,
800801
)
801802
payload_converter = self._payload_converter_with_context(activity_context)
802803
failure_converter = self._failure_converter_with_context(activity_context)
@@ -2127,8 +2128,7 @@ def get_serialization_context(
21272128
activity_handle = self._pending_activities[command_info.command_seq]
21282129
return temporalio.converter.ActivitySerializationContext(
21292130
namespace=self._info.namespace,
2130-
workflow_id=self._info.workflow_id,
2131-
workflow_type=self._info.workflow_type,
2131+
activity_id=activity_handle._input.activity_id,
21322132
activity_type=activity_handle._input.activity,
21332133
activity_task_queue=(
21342134
activity_handle._input.task_queue
@@ -2137,6 +2137,8 @@ def get_serialization_context(
21372137
else self._info.task_queue
21382138
),
21392139
is_local=isinstance(activity_handle._input, StartLocalActivityInput),
2140+
workflow_id=self._info.workflow_id,
2141+
workflow_type=self._info.workflow_type,
21402142
)
21412143

21422144
elif (
@@ -2921,15 +2923,16 @@ def __init__(
29212923
self._payload_converter = self._instance._payload_converter_with_context(
29222924
temporalio.converter.ActivitySerializationContext(
29232925
namespace=self._instance._info.namespace,
2924-
workflow_id=self._instance._info.workflow_id,
2925-
workflow_type=self._instance._info.workflow_type,
2926+
activity_id=self._input.activity_id,
29262927
activity_type=self._input.activity,
29272928
activity_task_queue=(
29282929
self._input.task_queue or self._instance._info.task_queue
29292930
if isinstance(self._input, StartActivityInput)
29302931
else self._instance._info.task_queue
29312932
),
29322933
is_local=isinstance(self._input, StartLocalActivityInput),
2934+
workflow_id=self._instance._info.workflow_id,
2935+
workflow_type=self._instance._info.workflow_type,
29332936
)
29342937
)
29352938

tests/test_serialization_context.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ async def run(self, data: TraceData) -> TraceData:
179179
data,
180180
start_to_close_timeout=timedelta(seconds=10),
181181
heartbeat_timeout=timedelta(seconds=2),
182+
activity_id="activity-id",
182183
)
183184
data = await workflow.execute_child_workflow(
184185
EchoWorkflow.run, data, id=f"{workflow.info().workflow_id}_child"
@@ -231,6 +232,7 @@ async def test_payload_conversion_calls_follow_expected_sequence_and_contexts(
231232
workflow_id=workflow_id,
232233
workflow_type=PayloadConversionWorkflow.__name__,
233234
activity_type=passthrough_activity.__name__,
235+
activity_id="activity-id",
234236
activity_task_queue=task_queue,
235237
is_local=False,
236238
)
@@ -328,6 +330,7 @@ async def run(self) -> TraceData:
328330
initial_interval=timedelta(milliseconds=100),
329331
maximum_attempts=2,
330332
),
333+
activity_id="activity-id",
331334
)
332335

333336

@@ -370,6 +373,7 @@ async def test_heartbeat_details_payload_conversion(client: Client):
370373
workflow_id=workflow_id,
371374
workflow_type=HeartbeatDetailsSerializationContextTestWorkflow.__name__,
372375
activity_type=activity_with_heartbeat_details.__name__,
376+
activity_id="activity-id",
373377
activity_task_queue=task_queue,
374378
is_local=False,
375379
)
@@ -419,6 +423,7 @@ async def run(self, data: TraceData) -> TraceData:
419423
local_activity,
420424
data,
421425
start_to_close_timeout=timedelta(seconds=10),
426+
activity_id="activity-id",
422427
)
423428

424429

@@ -459,6 +464,7 @@ async def test_local_activity_payload_conversion(client: Client):
459464
workflow_id=workflow_id,
460465
workflow_type=LocalActivityWorkflow.__name__,
461466
activity_type=local_activity.__name__,
467+
activity_id="activity-id",
462468
activity_task_queue=task_queue,
463469
is_local=True,
464470
)
@@ -504,7 +510,7 @@ async def test_local_activity_payload_conversion(client: Client):
504510

505511

506512
@workflow.defn
507-
class EventWorkflow:
513+
class WaitForSignalWorkflow:
508514
# Like a global asyncio.Event()
509515

510516
def __init__(self) -> None:
@@ -521,10 +527,11 @@ def signal(self) -> None:
521527

522528
@activity.defn
523529
async def async_activity() -> TraceData:
530+
# Notify test that the activity has started and is ready to be completed manually
524531
await (
525532
activity.client()
526533
.get_workflow_handle("activity-started-wf-id")
527-
.signal(EventWorkflow.signal)
534+
.signal(WaitForSignalWorkflow.signal)
528535
)
529536
activity.raise_complete_async()
530537

@@ -558,7 +565,7 @@ async def test_async_activity_completion_payload_conversion(
558565
task_queue=task_queue,
559566
workflows=[
560567
AsyncActivityCompletionSerializationContextTestWorkflow,
561-
EventWorkflow,
568+
WaitForSignalWorkflow,
562569
],
563570
activities=[async_activity],
564571
workflow_runner=UnsandboxedWorkflowRunner(), # so that we can use isinstance
@@ -572,12 +579,13 @@ async def test_async_activity_completion_payload_conversion(
572579
workflow_id=workflow_id,
573580
workflow_type=AsyncActivityCompletionSerializationContextTestWorkflow.__name__,
574581
activity_type=async_activity.__name__,
582+
activity_id="async-activity-id",
575583
activity_task_queue=task_queue,
576584
is_local=False,
577585
)
578586

579587
act_started_wf_handle = await client.start_workflow(
580-
EventWorkflow.run,
588+
WaitForSignalWorkflow.run,
581589
id="activity-started-wf-id",
582590
task_queue=task_queue,
583591
)
@@ -644,6 +652,7 @@ def test_subclassed_async_activity_handle(client: Client):
644652
workflow_id="workflow-id",
645653
workflow_type="workflow-type",
646654
activity_type="activity-type",
655+
activity_id="activity-id",
647656
activity_task_queue="activity-task-queue",
648657
is_local=False,
649658
)
@@ -1058,11 +1067,12 @@ async def run(self) -> Never:
10581067
failing_activity,
10591068
start_to_close_timeout=timedelta(seconds=10),
10601069
retry_policy=RetryPolicy(maximum_attempts=1),
1070+
activity_id="activity-id",
10611071
)
10621072
raise Exception("Unreachable")
10631073

10641074

1065-
test_traces: dict[str, list[TraceItem]] = defaultdict(list)
1075+
test_traces: dict[Optional[str], list[TraceItem]] = defaultdict(list)
10661076

10671077

10681078
class FailureConverterWithContext(DefaultFailureConverter, WithSerializationContext):
@@ -1154,6 +1164,7 @@ async def test_failure_converter_with_context(client: Client):
11541164
workflow_id=workflow_id,
11551165
workflow_type=FailureConverterTestWorkflow.__name__,
11561166
activity_type=failing_activity.__name__,
1167+
activity_id="activity-id",
11571168
activity_task_queue=task_queue,
11581169
is_local=False,
11591170
)
@@ -1322,6 +1333,7 @@ async def run(self, data: str) -> str:
13221333
codec_test_local_activity,
13231334
data,
13241335
start_to_close_timeout=timedelta(seconds=10),
1336+
activity_id="activity-id",
13251337
)
13261338

13271339

@@ -1360,6 +1372,7 @@ async def test_local_activity_codec_with_context(client: Client):
13601372
workflow_id=workflow_id,
13611373
workflow_type=LocalActivityCodecTestWorkflow.__name__,
13621374
activity_type=codec_test_local_activity.__name__,
1375+
activity_id="activity-id",
13631376
activity_task_queue=task_queue,
13641377
is_local=True,
13651378
)
@@ -1593,6 +1606,7 @@ async def run(self, data: str) -> str:
15931606
payload_encryption_activity,
15941607
"outbound",
15951608
start_to_close_timeout=timedelta(seconds=10),
1609+
activity_id="activity-id",
15961610
),
15971611
workflow.execute_child_workflow(
15981612
PayloadEncryptionChildWorkflow.run,

0 commit comments

Comments
 (0)