-
Notifications
You must be signed in to change notification settings - Fork 147
feat: actor messages & queues #3989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
🚅 Deployed to the rivet-pr-3989 environment in rivet-frontend
|
How to use the Graphite Merge QueueAdd the label merge-queue to this PR to add it to the merge queue. You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. |
PR Review: Actor Messages & QueuesThis PR implements a message queue system for Rivet Actors, adding the ability to send and receive named messages with persistence and timeout support. Overall, the implementation is solid with good test coverage and proper error handling. Here are my findings: ✅ Strengths
🐛 Potential Issues1. Race Condition in Metadata Updates (Critical)Location: const id = this.#metadata.nextId;
const messageKey = makeQueueMessageKey(id);
// Update metadata before writing so we can batch both writes
this.#metadata.nextId = id + 1n;
this.#metadata.size += 1;
const encodedMetadata = this.#serializeMetadata();
// Batch write message and metadata together
await this.#driver.kvBatchPut(this.#actor.id, [
[messageKey, encodedMessage],
[KEYS.QUEUE_METADATA, encodedMetadata],
]);Issue: If two Recommendation: Either:
2. Inefficient Message LoadingLocation: async #drainMessages(nameSet: Set<string>, count: number): Promise<QueueMessage[]> {
if (this.#metadata.size === 0) {
return [];
}
const entries = await this.#loadQueueMessages(); // Loads ALL messages
const matched = entries.filter((entry) => nameSet.has(entry.name));
// ...
}Issue: Recommendation: Consider:
3. Potential Memory Leak with WaitersLocation: The abort event listeners are registered but only cleaned up when the waiter resolves/rejects. If the actor stops before a waiter times out, the timeout callback on line 184-187 will still fire and try to resolve. Recommendation: Clear the timeout handle in the const onStop = () => {
this.#waiters.delete(waiterId);
if (waiter.timeoutHandle) {
clearTimeout(waiter.timeoutHandle); // Add this
}
reject(new errors.ActorAborted());
};4. Sequential Waiter ResolutionLocation: async #maybeResolveWaiters() {
if (this.#waiters.size === 0) { return; }
const pending = [...this.#waiters.values()];
for (const waiter of pending) {
// ...
const messages = await this.#drainMessages(waiter.nameSet, waiter.count);
// ...
}
}Issue: Waiters are resolved sequentially, each potentially loading all messages from storage. If there are many waiters, this could be slow. Recommendation:
|
20d84dd to
ed996b2
Compare
ed996b2 to
11d2f22
Compare
11d2f22 to
266ebbd
Compare
266ebbd to
2dd6f17
Compare

No description provided.