Skip to content

Conversation

@NathanFlurry
Copy link
Member

No description provided.

@railway-app
Copy link

railway-app bot commented Jan 21, 2026

🚅 Deployed to the rivet-pr-3989 environment in rivet-frontend

Service Status Web Updated (UTC)
website 😴 Sleeping (View Logs) Web Jan 22, 2026 at 5:56 pm
frontend-cloud ❌ Build Failed (View Logs) Web Jan 22, 2026 at 5:46 pm
frontend-inspector ❌ Build Failed (View Logs) Web Jan 22, 2026 at 5:46 pm

Copy link
Member Author

NathanFlurry commented Jan 21, 2026


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Jan 21, 2026

PR Review: Actor Messages & Queues

This PR implements a message queue system for Rivet Actors, adding the ability to send and receive named messages with persistence and timeout support. Overall, the implementation is solid with good test coverage and proper error handling. Here are my findings:

✅ Strengths

  1. Well-structured queue management: The QueueManager class properly separates concerns with clear initialization, message persistence, and waiter resolution logic
  2. Comprehensive test coverage: The test suite covers key scenarios including timeout behavior, abort handling, size limits, and concurrent operations
  3. Proper error handling: Good use of custom error classes (QueueFull, QueueMessageTooLarge, QueueMessageInvalid, ActorAborted) with appropriate metadata
  4. Clean API design: The proxy-based queue sender API (handle.queue.messageName.send()) provides excellent developer ergonomics
  5. Versioned serialization: Proper use of BARE schema versioning for forward/backward compatibility
  6. Abort signal handling: Correctly propagates actor abort signals to waiting receivers

🐛 Potential Issues

1. Race Condition in Metadata Updates (Critical)

Location: queue-manager.ts:129-138

const id = this.#metadata.nextId;
const messageKey = makeQueueMessageKey(id);

// Update metadata before writing so we can batch both writes
this.#metadata.nextId = id + 1n;
this.#metadata.size += 1;
const encodedMetadata = this.#serializeMetadata();

// Batch write message and metadata together
await this.#driver.kvBatchPut(this.#actor.id, [
    [messageKey, encodedMessage],
    [KEYS.QUEUE_METADATA, encodedMetadata],
]);

Issue: If two enqueue() calls run concurrently, they could both read the same nextId, leading to message key collisions. The in-memory metadata is updated before the write completes, but there's no locking mechanism to prevent concurrent access.

Recommendation: Either:

  • Add a mutex/lock around the enqueue operation
  • Move to an atomic increment pattern if the KV driver supports it
  • Document that the actor runtime ensures single-threaded execution (if that's the case)

2. Inefficient Message Loading

Location: queue-manager.ts:228-243

async #drainMessages(nameSet: Set<string>, count: number): Promise<QueueMessage[]> {
    if (this.#metadata.size === 0) {
        return [];
    }
    const entries = await this.#loadQueueMessages(); // Loads ALL messages
    const matched = entries.filter((entry) => nameSet.has(entry.name));
    // ...
}

Issue: #drainMessages loads ALL queue messages from storage on every receive call, even when only requesting 1 message. This is O(n) for every receive operation and could be expensive for large queues.

Recommendation: Consider:

  • Adding an in-memory index of message names → IDs
  • Using a prefix scan with early termination
  • Only loading full messages after finding matching keys

3. Potential Memory Leak with Waiters

Location: queue-manager.ts:190-220

The abort event listeners are registered but only cleaned up when the waiter resolves/rejects. If the actor stops before a waiter times out, the timeout callback on line 184-187 will still fire and try to resolve.

Recommendation: Clear the timeout handle in the onStop handler:

const onStop = () => {
    this.#waiters.delete(waiterId);
    if (waiter.timeoutHandle) {
        clearTimeout(waiter.timeoutHandle);  // Add this
    }
    reject(new errors.ActorAborted());
};

4. Sequential Waiter Resolution

Location: queue-manager.ts:300-325

async #maybeResolveWaiters() {
    if (this.#waiters.size === 0) { return; }
    const pending = [...this.#waiters.values()];
    for (const waiter of pending) {
        // ...
        const messages = await this.#drainMessages(waiter.nameSet, waiter.count);
        // ...
    }
}

Issue: Waiters are resolved sequentially, each potentially loading all messages from storage. If there are many waiters, this could be slow.

Recommendation:

  • Load messages once and distribute to waiters
  • Or process waiters in parallel when they don't conflict

⚠️ Minor Issues

5. Inconsistent Return Types

Location: queue.ts:31-73

The overloaded next() function returns QueueMessage | undefined for single names but QueueMessage[] | undefined for arrays. The single-message case returns undefined when no messages are available, but it could also return an empty array on timeout (line 68).

Recommendation: Consider making the return type more consistent or add clear JSDoc comments explaining the behavior.

6. Magic Number for Timeout

Location: fixtures/driver-test-suite/queue.ts:57

await c.queue.next("abort", { timeout: 10_000 });

The test uses a 10-second timeout which seems arbitrary. Consider using a constant or explaining why this specific value.

7. Missing Validation

Location: queue-manager.ts:156-161

The receive() function accepts names: string[] but doesn't validate that it's non-empty. An empty array would create a waiter that never resolves.

Recommendation: Add validation:

if (names.length === 0) {
    throw new Error("At least one queue name must be specified");
}

8. Metadata Size Mismatch Silent Recovery

Location: queue-manager.ts:274-277

if (this.#metadata.size \!== decoded.length) {
    this.#metadata.size = decoded.length;
    this.#actor.inspector.updateQueueSize(this.#metadata.size);
}

Size mismatches are silently corrected without logging. This could hide persistent corruption issues.

Recommendation: Add a warning log when size mismatches are detected.

🎯 Performance Considerations

  1. Batch Operations: Good use of kvBatchPut and kvBatchDelete to minimize round trips
  2. CBOR Encoding: Appropriate for binary efficiency, though adds CPU overhead
  3. Inspector Updates: Queue size is properly updated on every change for observability

🔒 Security Considerations

  1. Size Limits: Properly enforced at both queue level and message level
  2. Serialization Safety: isCborSerializable check prevents unsafe types
  3. No Injection Risks: Message names are used as-is without string interpolation

📝 Code Style & Conventions

  1. ✅ Uses tracing with structured logging (e.g., this.#actor.rLog.error({ msg: "...", error }))
  2. ✅ Private fields use # prefix
  3. ✅ Error messages are lowercase as per CLAUDE.md
  4. ✅ Good TypeScript typing throughout
  5. ⚠️ Some comments could be more sentence-like (e.g., line 129: "Update metadata before writing so we can batch both writes" vs "Updates metadata before writing to batch both writes")

🧪 Test Coverage

The test suite covers:

  • ✅ Client-to-actor messaging
  • ✅ Actor-to-self messaging
  • ✅ Array name filtering
  • ✅ Timeout behavior
  • ✅ Abort handling
  • ✅ Queue size limits
  • ✅ Message size limits

Missing test scenarios:

  • Multiple concurrent waiters for the same message name
  • Queue persistence across actor restarts
  • Metadata corruption recovery via #rebuildMetadata()
  • Very large queue sizes (stress testing)

📖 Documentation

Consider adding:

  1. JSDoc comments explaining the queue ordering guarantees (FIFO per message name?)
  2. Examples of common patterns (request/response via queues, fan-out, etc.)
  3. Performance characteristics and recommended queue size limits

Summary

This is a well-implemented feature with good fundamentals. The main concerns are:

  1. Critical: Potential race condition in concurrent enqueue operations needs clarification or fixing
  2. Important: Performance of loading all messages on every receive could be optimized
  3. Minor: Several small improvements to error handling and validation

The test coverage is solid, error handling is comprehensive, and the API design is intuitive. Great work overall! 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants