-
Notifications
You must be signed in to change notification settings - Fork 145
Move to proper async heartbeat queuing #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -370,8 +370,9 @@ async def _run_activities(self) -> None: | |
|
|
||
| if task.HasField("start"): | ||
| # Cancelled event and sync field will be updated inside | ||
| # _run_activity when the activity function is obtained | ||
| activity = _RunningActivity() | ||
| # _run_activity when the activity function is obtained. Max | ||
| # size of 1000 should be plenty for the heartbeat queue. | ||
| activity = _RunningActivity(pending_heartbeats=asyncio.Queue(1000)) | ||
| activity.task = asyncio.create_task( | ||
| self._run_activity(task.task_token, task.start, activity) | ||
| ) | ||
|
|
@@ -409,22 +410,27 @@ def _heartbeat_activity(self, task_token: bytes, *details: Any) -> None: | |
| logger = temporalio.activity.logger | ||
| activity = self._running_activities.get(task_token) | ||
| if activity and not activity.done: | ||
| # Just set as next pending if one is already running | ||
| coro = self._heartbeat_activity_async( | ||
| logger, activity, task_token, *details | ||
| # Put on queue and schedule a task. We will let the queue-full error | ||
| # be thrown here | ||
| activity.pending_heartbeats.put_nowait(details) | ||
| activity.last_heartbeat_task = asyncio.create_task( | ||
| self._heartbeat_activity_async(logger, activity, task_token) | ||
| ) | ||
| if activity.current_heartbeat_task: | ||
| activity.pending_heartbeat = coro | ||
| else: | ||
| activity.current_heartbeat_task = asyncio.create_task(coro) | ||
|
|
||
| async def _heartbeat_activity_async( | ||
| self, | ||
| logger: logging.LoggerAdapter, | ||
| activity: _RunningActivity, | ||
| task_token: bytes, | ||
| *details: Any, | ||
| ) -> None: | ||
| # Drain the queue, only taking the last value to actually heartbeat | ||
| details: Optional[Iterable[Any]] = None | ||
| while not activity.pending_heartbeats.empty(): | ||
| details = activity.pending_heartbeats.get_nowait() | ||
| if details is None: | ||
| return | ||
|
|
||
| # Perform the heartbeat | ||
| try: | ||
| heartbeat = temporalio.bridge.proto.ActivityHeartbeat(task_token=task_token) | ||
| if details: | ||
|
|
@@ -437,16 +443,7 @@ async def _heartbeat_activity_async( | |
| ) | ||
| logger.debug("Recording heartbeat with details %s", details) | ||
| self._bridge_worker.record_activity_heartbeat(heartbeat) | ||
| # If there is one pending, schedule it | ||
| if activity.pending_heartbeat: | ||
| activity.current_heartbeat_task = asyncio.create_task( | ||
| activity.pending_heartbeat | ||
| ) | ||
| activity.pending_heartbeat = None | ||
| else: | ||
| activity.current_heartbeat_task = None | ||
| except Exception as err: | ||
| activity.current_heartbeat_task = None | ||
| # If the activity is done, nothing we can do but log | ||
| if activity.done: | ||
| logger.exception( | ||
|
|
@@ -696,12 +693,12 @@ async def _run_activity( | |
|
|
||
| # Do final completion | ||
| try: | ||
| # We mark the activity as done and let the currently running (and next | ||
| # pending) heartbeat task finish | ||
| # We mark the activity as done and let the currently running | ||
| # heartbeat task finish | ||
| running_activity.done = True | ||
| while running_activity.current_heartbeat_task: | ||
| if running_activity.last_heartbeat_task: | ||
| try: | ||
| await running_activity.current_heartbeat_task | ||
| await running_activity.last_heartbeat_task | ||
| except: | ||
| # Should never happen because it's trapped in-task | ||
| temporalio.activity.logger.exception( | ||
|
|
@@ -749,12 +746,12 @@ class _ActivityDefinition: | |
|
|
||
| @dataclass | ||
| class _RunningActivity: | ||
| pending_heartbeats: asyncio.Queue[Iterable[Any]] | ||
| # Most of these optional values are set before use | ||
| info: Optional[temporalio.activity.Info] = None | ||
| task: Optional[asyncio.Task] = None | ||
| cancelled_event: Optional[temporalio.activity._CompositeEvent] = None | ||
| pending_heartbeat: Optional[Coroutine] = None | ||
| current_heartbeat_task: Optional[asyncio.Task] = None | ||
| last_heartbeat_task: Optional[asyncio.Task] = None | ||
| sync: bool = False | ||
| done: bool = False | ||
| cancelled_by_request: bool = False | ||
|
|
@@ -895,19 +892,16 @@ async def execute_activity(self, input: ExecuteActivityInput) -> Any: | |
| # loop (even though it's sync). So we need a call that puts the | ||
| # context back on the activity and calls heartbeat, then another | ||
| # call schedules it. | ||
| def heartbeat_with_context(*details: Any) -> None: | ||
| async def heartbeat_with_context(*details: Any) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did this become async?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I could use |
||
| temporalio.activity._Context.set(ctx) | ||
| assert orig_heartbeat | ||
| orig_heartbeat(*details) | ||
|
|
||
| def thread_safe_heartbeat(*details: Any) -> None: | ||
| # TODO(cretz): Final heartbeat can be flaky if we don't wait on | ||
| # result here, but waiting on result of | ||
| # asyncio.run_coroutine_threadsafe times out in rare cases. | ||
| # Need more investigation: https://github.com/temporalio/sdk-python/issues/12 | ||
| loop.call_soon_threadsafe(heartbeat_with_context, *details) | ||
|
|
||
| ctx.heartbeat = thread_safe_heartbeat | ||
| # Invoke the async heartbeat waiting a max of 10 seconds for | ||
| # accepting | ||
| ctx.heartbeat = lambda *details: asyncio.run_coroutine_threadsafe( | ||
| heartbeat_with_context(*details), loop | ||
| ).result(10) | ||
|
|
||
| # For heartbeats, we use the existing heartbeat callable for thread | ||
| # pool executors or a multiprocessing queue for others | ||
|
|
@@ -917,7 +911,7 @@ def thread_safe_heartbeat(*details: Any) -> None: | |
| # Should always be present in worker, pre-checked on init | ||
| shared_manager = input._worker._config["shared_state_manager"] | ||
| assert shared_manager | ||
| heartbeat = shared_manager.register_heartbeater( | ||
| heartbeat = await shared_manager.register_heartbeater( | ||
| info.task_token, ctx.heartbeat | ||
| ) | ||
|
|
||
|
|
@@ -935,7 +929,7 @@ def thread_safe_heartbeat(*details: Any) -> None: | |
| ) | ||
| finally: | ||
| if shared_manager: | ||
| shared_manager.unregister_heartbeater(info.task_token) | ||
| await shared_manager.unregister_heartbeater(info.task_token) | ||
|
|
||
| # Otherwise for async activity, just run | ||
| return await input.fn(*input.args) | ||
|
|
@@ -1032,7 +1026,7 @@ def new_event(self) -> threading.Event: | |
| raise NotImplementedError | ||
|
|
||
| @abstractmethod | ||
| def register_heartbeater( | ||
| async def register_heartbeater( | ||
| self, task_token: bytes, heartbeat: Callable[..., None] | ||
| ) -> SharedHeartbeatSender: | ||
| """Register a heartbeat function. | ||
|
|
@@ -1048,7 +1042,7 @@ def register_heartbeater( | |
| raise NotImplementedError | ||
|
|
||
| @abstractmethod | ||
| def unregister_heartbeater(self, task_token: bytes) -> None: | ||
| async def unregister_heartbeater(self, task_token: bytes) -> None: | ||
| """Unregisters a previously registered heartbeater for the task | ||
| token. This should also flush any pending heartbeats. | ||
| """ | ||
|
|
@@ -1084,12 +1078,12 @@ def __init__( | |
| 1000 | ||
| ) | ||
| self._heartbeats: Dict[bytes, Callable[..., None]] = {} | ||
| self._heartbeat_completions: Dict[bytes, Callable[[], None]] = {} | ||
| self._heartbeat_completions: Dict[bytes, Callable] = {} | ||
|
|
||
| def new_event(self) -> threading.Event: | ||
| return self._mgr.Event() | ||
|
|
||
| def register_heartbeater( | ||
| async def register_heartbeater( | ||
| self, task_token: bytes, heartbeat: Callable[..., None] | ||
| ) -> SharedHeartbeatSender: | ||
| self._heartbeats[task_token] = heartbeat | ||
|
|
@@ -1098,17 +1092,19 @@ def register_heartbeater( | |
| self._queue_poller_executor.submit(self._heartbeat_processor) | ||
| return _MultiprocessingSharedHeartbeatSender(self._heartbeat_queue) | ||
|
|
||
| def unregister_heartbeater(self, task_token: bytes) -> None: | ||
| # Put a completion on the queue and wait for it to happen | ||
| flush_complete = threading.Event() | ||
| self._heartbeat_completions[task_token] = flush_complete.set | ||
| async def unregister_heartbeater(self, task_token: bytes) -> None: | ||
| # Put a callback on the queue and wait for it to happen | ||
| loop = asyncio.get_running_loop() | ||
| finish_event = asyncio.Event() | ||
| self._heartbeat_completions[task_token] = lambda: loop.call_soon_threadsafe( | ||
| finish_event.set | ||
| ) | ||
| try: | ||
| # 30 seconds to put complete, 30 to get notified should be plenty | ||
| # We only give the queue a few seconds to have enough room | ||
| self._heartbeat_queue.put( | ||
| (task_token, _multiprocess_heartbeat_complete), True, 30 | ||
| (task_token, _multiprocess_heartbeat_complete), True, 5 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: instead of this string you can use
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My original version had that, but I didn't want to change the type of the queue |
||
| ) | ||
| if not flush_complete.wait(30): | ||
| raise RuntimeError("Timeout waiting for heartbeat flush") | ||
| await finish_event.wait() | ||
| finally: | ||
| del self._heartbeat_completions[task_token] | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably wrap this error with a meaningful error so users can understand this happens because they heartbeat too fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered it, but I was under the impression wrapping was less common in Python. I can do so on my next PR (which will be moving these files to a completely separate place).