Skip to content

Commit b2043bd

Browse files
committed
Implement Workflow.new_untyped_activity_stub().
1 parent 56e761f commit b2043bd

File tree

4 files changed

+84
-0
lines changed

4 files changed

+84
-0
lines changed

temporal/activity_method.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,21 @@ def fill_execute_activity_parameters(self, execute_parameters: ExecuteActivityPa
9494
execute_parameters.heartbeat_timeout = self.heartbeat_timeout
9595
if self.task_queue is not None:
9696
execute_parameters.task_queue = self.task_queue
97+
98+
99+
@dataclass
100+
class UntypedActivityStub:
101+
_decision_context: object = None
102+
_retry_parameters: RetryParameters = None
103+
_activity_options: ActivityOptions = None
104+
105+
async def execute(self, activity_name: str, *args):
106+
f = await self.execute_async(activity_name, *args)
107+
return await f.wait_for_result()
108+
109+
async def execute_async(self, activity_name: str, *args):
110+
from .async_activity import Async
111+
execute_parameters = ExecuteActivityParameters()
112+
execute_parameters.activity_type = ActivityType()
113+
execute_parameters.activity_type.name = activity_name
114+
return Async.call(self, execute_parameters, args)

temporal/async_activity.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ def function_with_self(method, self, *args):
1717
assert self._decision_context
1818
assert method._execute_parameters
1919
parameters: ExecuteActivityParameters = copy.deepcopy(method._execute_parameters)
20+
return Async.call(self, parameters, args)
21+
22+
@staticmethod
23+
def call(self, parameters, args: List[object]):
2024
if hasattr(self, "_activity_options") and self._activity_options:
2125
self._activity_options.fill_execute_activity_parameters(parameters)
2226
if self._retry_parameters:

temporal/workflow.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,19 @@ def new_activity_stub(activities_cls, retry_parameters: RetryParameters = None,
4141
cls._activity_options = activity_options
4242
return cls
4343

44+
@staticmethod
45+
def new_untyped_activity_stub(retry_parameters: RetryParameters = None,
46+
activity_options: ActivityOptions = None):
47+
from .decision_loop import ITask
48+
from .activity_method import UntypedActivityStub
49+
task: ITask = ITask.current()
50+
assert task
51+
cls = UntypedActivityStub()
52+
cls._decision_context = task.decider.decision_context
53+
cls._retry_parameters = retry_parameters # type: ignore
54+
cls._activity_options = activity_options
55+
return cls
56+
4457
@staticmethod
4558
async def await_till(c: Callable, timeout_seconds: int = 0) -> bool:
4659
from .decision_loop import ITask
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import pytest
2+
from datetime import timedelta
3+
4+
from temporal.workflow import workflow_method, WorkflowClient, Workflow
5+
from temporal.activity_method import activity_method, ActivityOptions
6+
7+
TASK_QUEUE = "test_workflow_untyped_activity_stub_tq"
8+
NAMESPACE = "default"
9+
10+
11+
class GreetingActivities:
12+
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=1000))
13+
async def compose_greeting(self, arg1) -> str:
14+
raise NotImplementedError
15+
16+
17+
class GreetingActivitiesImpl:
18+
19+
def compose_greeting(self, arg1):
20+
return "from-activity: " + arg1
21+
22+
23+
class GreetingWorkflow:
24+
@workflow_method(task_queue=TASK_QUEUE)
25+
async def get_greeting(self) -> None:
26+
raise NotImplementedError
27+
28+
29+
class GreetingWorkflowImpl(GreetingWorkflow):
30+
31+
def __init__(self):
32+
self.stub = Workflow.new_untyped_activity_stub(
33+
activity_options=ActivityOptions(task_queue=TASK_QUEUE,
34+
schedule_to_close_timeout=timedelta(seconds=1000))
35+
)
36+
37+
async def get_greeting(self):
38+
return await self.stub.execute("GreetingActivities::compose_greeting", "blah")
39+
40+
41+
@pytest.mark.asyncio
42+
@pytest.mark.worker_config(NAMESPACE, TASK_QUEUE, activities=[(GreetingActivitiesImpl(), "GreetingActivities")],
43+
workflows=[GreetingWorkflowImpl])
44+
async def test(worker):
45+
client = WorkflowClient.new_client(namespace=NAMESPACE)
46+
greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
47+
ret_value = await greeting_workflow.get_greeting()
48+
49+
assert ret_value == "from-activity: blah"

0 commit comments

Comments
 (0)