Skip to content

Commit 1199192

Browse files
authored
Merge pull request #1 from firdaus/master
take rebase from main repo
2 parents b119d92 + 6b35da3 commit 1199192

13 files changed

+491
-77
lines changed

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,22 @@ This should be considered EXPERIMENTAL at the moment. At the moment, all I can s
4242
- [x] Workflow client - invoking queries
4343

4444
1.1
45-
- [ ] ActivityStub and Workflow.newUntypedActivityStub
46-
- [ ] Classes as arguments and return values to/from activity and workflow methods
45+
- [x] ActivityStub and Workflow.newUntypedActivityStub
46+
- [ ] Classes as arguments and return values to/from activity and workflow methods (DataConverter)
4747
- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub
4848
- [ ] Custom workflow ids through start() and new_workflow_stub()
4949
- [ ] ContinueAsNew
50+
- [ ] Parallel activity execution (STATUS: there's a working but not finalized API).
5051
- [ ] Compatibility with Java client
5152
- [ ] Compatibility with Golang client
53+
- [ ] Remove threading, use coroutines for everything all concurrency
5254

5355
2.0
5456
- [ ] Sticky workflows
5557

5658
Post 2.0:
5759
- [ ] sideEffect/mutableSideEffect
5860
- [ ] Local activity
59-
- [ ] Parallel activity execution
6061
- [ ] Timers
6162
- [ ] Cancellation Scopes
6263
- [ ] Child Workflows

temporal/activity.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ async def complete_exceptionally(service, task_token, ex: Exception):
144144
respond.task_token = task_token
145145
respond.identity = get_identity()
146146
respond.failure = serialize_exception(ex)
147-
# TODO: error handling
148147
await service.respond_activity_task_failed(request=respond)
149148

150149

@@ -153,5 +152,4 @@ async def complete(service, task_token, return_value: object):
153152
respond.task_token = task_token
154153
respond.result = to_payloads([return_value])
155154
respond.identity = get_identity()
156-
# TODO: error handling
157155
await service.respond_activity_task_completed(request=respond)

temporal/activity_loop.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
import inspect
66
from typing import List
77

8+
from grpclib import GRPCError
9+
810
from temporal.activity import ActivityContext, ActivityTask, complete_exceptionally, complete
911
from temporal.api.taskqueue.v1 import TaskQueue, TaskQueueMetadata
1012
from temporal.conversions import from_payloads
13+
from temporal.retry import retry
1114
from temporal.service_helpers import create_workflow_service, get_identity
1215
from temporal.worker import Worker, StopRequestedException
1316
from temporal.api.workflowservice.v1 import WorkflowServiceStub as WorkflowService, PollActivityTaskQueueRequest, \
@@ -20,6 +23,7 @@ def activity_task_loop(worker: Worker):
2023
asyncio.run(activity_task_loop_func(worker))
2124

2225

26+
@retry(logger=logger)
2327
async def activity_task_loop_func(worker: Worker):
2428
service: WorkflowService = create_workflow_service(worker.host, worker.port, timeout=worker.get_timeout())
2529
worker.manage_service(service)
@@ -43,14 +47,9 @@ async def activity_task_loop_func(worker: Worker):
4347
logger.debug("PollActivityTaskQueue: %dms", (polling_end - polling_start).total_seconds() * 1000)
4448
except StopRequestedException:
4549
return
46-
except Exception as ex:
47-
logger.error("PollActivityTaskQueue error: %s", ex)
50+
except GRPCError as ex:
51+
logger.error("Error invoking poll_activity_task_queue: %s", ex, exc_info=True)
4852
continue
49-
# -----
50-
# if err:
51-
# logger.error("PollActivityTaskQueue failed: %s", err)
52-
# continue
53-
# -----
5453
task_token = task.task_token
5554
if not task_token:
5655
logger.debug("PollActivityTaskQueue has no task_token (expected): %s", task)
@@ -78,20 +77,20 @@ async def activity_task_loop_func(worker: Worker):
7877
if activity_context.do_not_complete:
7978
logger.info(f"Not completing activity {task.activity_type.name}({str(args)[1:-1]})")
8079
continue
81-
await complete(service, task_token, return_value)
82-
# -----
83-
# if error:
84-
# logger.error("Error invoking RespondActivityTaskCompleted: %s", error)
85-
# -----
80+
8681
logger.info(
8782
f"Activity {task.activity_type.name}({str(args)[1:-1]}) returned {json.dumps(return_value)}")
83+
84+
try:
85+
await complete(service, task_token, return_value)
86+
except GRPCError as ex:
87+
logger.error("Error invoking respond_activity_task_completed: %s", ex, exc_info=True)
8888
except Exception as ex:
8989
logger.error(f"Activity {task.activity_type.name} failed: {type(ex).__name__}({ex})", exc_info=True)
90-
await complete_exceptionally(service, task_token, ex)
91-
# -----
92-
# if error:
93-
# logger.error("Error invoking RespondActivityTaskFailed: %s", error)
94-
# -----
90+
try:
91+
await complete_exceptionally(service, task_token, ex)
92+
except GRPCError as ex2:
93+
logger.error("Error invoking respond_activity_task_failed: %s", ex2, exc_info=True)
9594
finally:
9695
ActivityContext.set(None)
9796
process_end = datetime.datetime.now()
@@ -103,3 +102,4 @@ async def activity_task_loop_func(worker: Worker):
103102
except Exception:
104103
logger.warning("service.close() failed", exc_info=True)
105104
worker.notify_thread_stopped()
105+
logger.info("Activity loop ended")

temporal/activity_method.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
import copy
21
import inspect
32
from dataclasses import dataclass, field
43
from datetime import timedelta
54
from typing import Callable, List
65

76
from temporal.api.common.v1 import RetryPolicy, ActivityType, Payloads
8-
from temporal.conversions import to_payloads
7+
98

109

1110
def get_activity_method_name(method: Callable):
@@ -49,17 +48,10 @@ def activity_method(func: Callable = None, name: str = "", schedule_to_close_tim
4948
def wrapper(fn: Callable):
5049
# noinspection PyProtectedMember
5150
async def stub_activity_fn(self, *args):
52-
assert self._decision_context
53-
assert stub_activity_fn._execute_parameters
54-
parameters: ExecuteActivityParameters = copy.deepcopy(stub_activity_fn._execute_parameters)
55-
if hasattr(self, "_activity_options") and self._activity_options:
56-
self._activity_options.fill_execute_activity_parameters(parameters)
57-
if self._retry_parameters:
58-
parameters.retry_parameters = self._retry_parameters
59-
parameters.input = to_payloads(args)
60-
from temporal.decision_loop import DecisionContext
61-
decision_context: DecisionContext = self._decision_context
62-
return await decision_context.schedule_activity_task(parameters=parameters)
51+
from .async_activity import Async
52+
from .decision_loop import ActivityFuture
53+
future: ActivityFuture = Async.function_with_self(stub_activity_fn, self, *args)
54+
return await future.wait_for_result()
6355

6456
if not task_queue:
6557
raise Exception("task_queue parameter is mandatory")
@@ -102,3 +94,21 @@ def fill_execute_activity_parameters(self, execute_parameters: ExecuteActivityPa
10294
execute_parameters.heartbeat_timeout = self.heartbeat_timeout
10395
if self.task_queue is not None:
10496
execute_parameters.task_queue = self.task_queue
97+
98+
99+
@dataclass
100+
class UntypedActivityStub:
101+
_decision_context: object = None
102+
_retry_parameters: RetryParameters = None
103+
_activity_options: ActivityOptions = None
104+
105+
async def execute(self, activity_name: str, *args):
106+
f = await self.execute_async(activity_name, *args)
107+
return await f.wait_for_result()
108+
109+
async def execute_async(self, activity_name: str, *args):
110+
from .async_activity import Async
111+
execute_parameters = ExecuteActivityParameters()
112+
execute_parameters.activity_type = ActivityType()
113+
execute_parameters.activity_type.name = activity_name
114+
return Async.call(self, execute_parameters, args)

temporal/async_activity.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import copy
2+
from asyncio import Future
3+
from typing import List, Union
4+
5+
from temporal.activity_method import ExecuteActivityParameters
6+
from temporal.conversions import to_payloads
7+
from temporal.decision_loop import ActivityFuture
8+
9+
10+
class Async:
11+
@staticmethod
12+
def function(method, *args) -> ActivityFuture:
13+
return Async.function_with_self(method, method.__self__, *args)
14+
15+
@staticmethod
16+
def function_with_self(method, self, *args):
17+
assert self._decision_context
18+
assert method._execute_parameters
19+
parameters: ExecuteActivityParameters = copy.deepcopy(method._execute_parameters)
20+
return Async.call(self, parameters, args)
21+
22+
@staticmethod
23+
def call(self, parameters, args: List[object]):
24+
if hasattr(self, "_activity_options") and self._activity_options:
25+
self._activity_options.fill_execute_activity_parameters(parameters)
26+
if self._retry_parameters:
27+
parameters.retry_parameters = self._retry_parameters
28+
parameters.input = to_payloads(args)
29+
from temporal.decision_loop import DecisionContext
30+
decision_context: DecisionContext = self._decision_context
31+
return decision_context.schedule_activity_task(parameters=parameters)
32+
33+
@staticmethod
34+
async def any_of(futures: List[Union[ActivityFuture, Future]], timeout_seconds=0):
35+
done, pending = [], []
36+
37+
def condition():
38+
done[:] = []
39+
pending[:] = []
40+
for f in futures:
41+
if f.done():
42+
done.append(f)
43+
else:
44+
pending.append(f)
45+
if done:
46+
return True
47+
else:
48+
return False
49+
50+
await Workflow.await_till(condition, timeout_seconds=timeout_seconds)
51+
return done, pending
52+
53+
@staticmethod
54+
async def all_of(futures: List[Union[ActivityFuture, Future]], timeout_seconds=0):
55+
56+
def condition():
57+
for f in futures:
58+
if not f.done():
59+
return False
60+
return True
61+
62+
await Workflow.await_till(condition, timeout_seconds=timeout_seconds)
63+
64+
65+
from temporal.workflow import Workflow

0 commit comments

Comments
 (0)