Skip to content

Commit 6e3112a

Browse files
authored
add activity loop retry
1 parent 6c1b344 commit 6e3112a

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

temporal/activity_loop.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from temporal.activity import ActivityContext, ActivityTask, complete_exceptionally, complete
1010
from temporal.api.taskqueue.v1 import TaskQueue, TaskQueueMetadata
1111
from temporal.converter import get_fn_args_type_hints
12-
from temporal.retry import retry
12+
from temporal.retry import RetryException, retry
1313
from temporal.service_helpers import get_identity
1414
from temporal.worker import Worker, StopRequestedException
1515
from temporal.api.workflowservice.v1 import WorkflowServiceStub as WorkflowService, PollActivityTaskQueueRequest, \
@@ -93,3 +93,4 @@ async def activity_task_loop_func(worker: Worker):
9393
finally:
9494
worker.notify_thread_stopped()
9595
logger.info("Activity loop ended")
96+
raise RetryException('sleep')

temporal/retry.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
MAX_DELAY_SECONDS = 5 * 60
88
RESET_DELAY_AFTER_SECONDS = 10 * 60
99

10+
class RetryException(Exception):
11+
pass
1012

1113
def retry(logger=None):
1214
def wrapper(fp):
@@ -17,6 +19,9 @@ async def retry_loop(*args, **kwargs):
1719
await fp(*args, **kwargs)
1820
logger.debug("@retry decorated function %s exited, ending retry loop", fp.__name__)
1921
break
22+
except RetryException:
23+
logger.info('sleeping...')
24+
await asyncio.sleep(INITIAL_DELAY_SECONDS)
2025
except Exception as ex:
2126
now = calendar.timegm(time.gmtime())
2227
if last_failed_time == -1 or (now - last_failed_time) > RESET_DELAY_AFTER_SECONDS:

0 commit comments

Comments
 (0)