Skip to content

Conversation

@cretz
Copy link
Member

@cretz cretz commented Mar 21, 2022

What was changed

Heartbeating using a queue and properly doing an async unregister

Why?

We're using queuing just to keep the task-count low for repeated heartbeats. We were getting deadlocks on unregister so we properly made that async.

Checklist

  1. Closes Multiprocess heartbeat and cancel improvement research #12

@cretz cretz requested a review from a team March 21, 2022 20:51
@cretz cretz marked this pull request as ready for review March 21, 2022 20:51
Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to skim this, overall lgtm

# context back on the activity and calls heartbeat, then another
# call schedules it.
def heartbeat_with_context(*details: Any) -> None:
async def heartbeat_with_context(*details: Any) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this become async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I could use asyncio.run_coroutine_threadsafe which returns a future instead of loop.call_soon_threadsafe (which it uses internally anyways). It also helps enforce that an event loop is available.

# We only give the queue a few seconds to have enough room
self._heartbeat_queue.put(
(task_token, _multiprocess_heartbeat_complete), True, 30
(task_token, _multiprocess_heartbeat_complete), True, 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of this string you can use object() to create a unique empty object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original version had that, but I didn't want to change the type of the queue

logger, activity, task_token, *details
# Put on queue and schedule a task. We will let the queue-full error
# be thrown here
activity.pending_heartbeats.put_nowait(details)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably wrap this error with a meaningful error so users can understand this happens because they heartbeat too fast.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered it, but I was under the impression wrapping was less common in Python. I can do so on my next PR (which will be moving these files to a completely separate place).

@cretz cretz merged commit fc528ee into temporalio:main Mar 22, 2022
@cretz cretz deleted the heartbeat-queue branch March 22, 2022 13:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Multiprocess heartbeat and cancel improvement research

2 participants