-
Notifications
You must be signed in to change notification settings - Fork 144
Move to proper async heartbeat queuing #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
bergundy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I managed to skim this, overall lgtm
| # context back on the activity and calls heartbeat, then another | ||
| # call schedules it. | ||
| def heartbeat_with_context(*details: Any) -> None: | ||
| async def heartbeat_with_context(*details: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this become async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I could use asyncio.run_coroutine_threadsafe which returns a future instead of loop.call_soon_threadsafe (which it uses internally anyways). It also helps enforce that an event loop is available.
| # We only give the queue a few seconds to have enough room | ||
| self._heartbeat_queue.put( | ||
| (task_token, _multiprocess_heartbeat_complete), True, 30 | ||
| (task_token, _multiprocess_heartbeat_complete), True, 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of this string you can use object() to create a unique empty object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original version had that, but I didn't want to change the type of the queue
| logger, activity, task_token, *details | ||
| # Put on queue and schedule a task. We will let the queue-full error | ||
| # be thrown here | ||
| activity.pending_heartbeats.put_nowait(details) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably wrap this error with a meaningful error so users can understand this happens because they heartbeat too fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered it, but I was under the impression wrapping was less common in Python. I can do so on my next PR (which will be moving these files to a completely separate place).
What was changed
Heartbeating using a queue and properly doing an async unregister
Why?
We're using queuing just to keep the task-count low for repeated heartbeats. We were getting deadlocks on unregister so we properly made that async.
Checklist