-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: socket conn handling #814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds persistence flush scheduling in Interpreter finally block. Expands queue socket API with onRunStarted/onRunRecovered/onRunScheduled callbacks and wires corresponding listeners. MainPage integrates new handlers and updates effect usage. Removes dialog-specific logic from clientSelectorGenerator’s deepest-element selection. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant S as Server: Queue
participant WS as WebSocket
participant C as Client: Socket Context
participant M as MainPage
Note over S,WS: Run lifecycle events broadcast
S-->>WS: run-scheduled
WS-->>C: run-scheduled
C->>M: onRunScheduled(data)
S-->>WS: run-started
WS-->>C: run-started
C->>M: onRunStarted(data)
S-->>WS: run-recovered
WS-->>C: run-recovered
C->>M: onRunRecovered(data)
S-->>WS: run-completed
WS-->>C: run-completed
C->>M: onRunCompleted(data)
Note over C: connectToQueueSocket(onRunStarted, onRunRecovered, onRunScheduled, onRunCompleted)
sequenceDiagram
autonumber
participant I as Interpreter
participant PB as persistenceBuffer
participant T as Persistence Timer
I->>I: Execute workflow (try)
alt any outcome
I->>I: finally
I->>PB: Check buffer length
I->>T: Check active timer
opt buffer has data AND no timer
I->>T: Schedule batched flush
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/src/workflow-management/classes/Interpreter.ts (1)
611-696
: Race: currentRunId can change mid‑flush leading to wrong DB lookup.flushPersistenceBuffer dereferences this.currentRunId inside the transaction. clearState later sets currentRunId = null; if that happens mid‑flush, Run.findOne may query with null and drop updates. Pin runId at the start of the flush and use that local.
async flushPersistenceBuffer(): Promise<void> { if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) { return; } + // Pin the runId to avoid races with clearState() mutating this.currentRunId + const runId = this.currentRunId as string; if (this.persistenceTimer) { clearTimeout(this.persistenceTimer); this.persistenceTimer = null; } this.persistenceInProgress = true; const batchToProcess = [...this.persistenceBuffer]; this.persistenceBuffer = []; try { const sequelize = require('../../storage/db').default; await sequelize.transaction(async (transaction: any) => { - const run = await Run.findOne({ - where: { runId: this.currentRunId! }, + const run = await Run.findOne({ + where: { runId }, transaction }); ... - logger.log('debug', `Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items`); + logger.log('debug', `Batched persistence: Updated run ${runId} with ${batchToProcess.length} items`); }); this.persistenceRetryCount = 0; } catch (error: any) { - logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`); + logger.log('error', `Failed to flush persistence buffer for run ${runId}: ${error.message}`); } finally { this.persistenceInProgress = false; if (this.persistenceBuffer.length > 0 && !this.persistenceTimer) { this.scheduleBatchFlush(); } } }
🧹 Nitpick comments (3)
server/src/workflow-management/classes/Interpreter.ts (1)
692-695
: Good finally-block addition to avoid starvation.Scheduling a flush when new items arrive during an in‑flight flush is correct and prevents starvation. One refinement: retries use an ad‑hoc setTimeout not tracked by persistenceTimer, so you cannot cancel pending retries on shutdown. Consider storing the retry timer too and clearing it in clearState.
+ private persistenceRetryTimer: NodeJS.Timeout | null = null
And where you schedule retries:
- setTimeout(async () => { + this.persistenceRetryTimer = setTimeout(async () => { await this.flushPersistenceBuffer(); }, backoffDelay);In clearState():
+ if (this.persistenceRetryTimer) { + clearTimeout(this.persistenceRetryTimer); + this.persistenceRetryTimer = null; + }src/pages/MainPage.tsx (2)
247-261
: Avoid stale closure when removing from queuedRuns.The outer queuedRuns.has(...) can be stale. Do the check inside the functional state update.
- if (queuedRuns.has(recoveredData.runId)) { - setQueuedRuns(prev => { - const newSet = new Set(prev); - newSet.delete(recoveredData.runId); - return newSet; - }); - } + setQueuedRuns(prev => { + if (!prev.has(recoveredData.runId)) return prev; + const next = new Set(prev); + next.delete(recoveredData.runId); + return next; + });Also, confirm the UX: notifying interpretation_failed on “recovered” is intentional.
271-275
: Trim effect deps to prevent unnecessary reconnects.queuedRuns and setQueuedRuns in deps cause the effect to reconnect the socket whenever the set changes. Since handlers use functional updates, remove them from the dependency array.
- }, [user?.id, connectToQueueSocket, disconnectQueueSocket, t, setRerenderRuns, queuedRuns, setQueuedRuns]); + }, [user?.id, connectToQueueSocket, disconnectQueueSocket, t, setRerenderRuns]);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
server/src/workflow-management/classes/Interpreter.ts
(1 hunks)src/context/socket.tsx
(5 hunks)src/helpers/clientSelectorGenerator.ts
(0 hunks)src/pages/MainPage.tsx
(2 hunks)
💤 Files with no reviewable changes (1)
- src/helpers/clientSelectorGenerator.ts
🔇 Additional comments (7)
src/pages/MainPage.tsx (3)
218-225
: run-started handler wiring looks good.Notifies user and refreshes runs; aligns with new socket event.
263-269
: run-scheduled handler is fine.Refreshing caches on schedule events is sufficient; optional toast could be noisy.
268-269
: Socket hookup OK; watch for duplicate ‘run started’ toasts.You already toast on immediate starts; this additional run-started event might duplicate notifications for non-queued runs.
src/context/socket.tsx (4)
12-12
: API extension reads well.connectToQueueSocket signature with optional lifecycle callbacks is clear and backwards compatible.
32-35
: Refs for new callbacks are appropriate.useRef avoids re-renders and keeps latest handlers.
51-56
: Callback assignment is correct.Latest handlers are captured before connecting the socket.
78-97
: New event listeners are wired correctly.Handlers invoke refs safely; logs aid diagnostics.
runCompletedCallbackRef.current = null; | ||
runRecoveredCallbackRef.current = null; | ||
runScheduledCallbackRef.current = null; | ||
}, []); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Missed reset for runStarted callback on disconnect.
runStartedCallbackRef isn’t cleared, unlike the others. Reset it to avoid stale references.
socketStore.queueSocket = null;
runCompletedCallbackRef.current = null;
+ runStartedCallbackRef.current = null;
runRecoveredCallbackRef.current = null;
runScheduledCallbackRef.current = null;
You can also quickly scan for event-name consistency:
🏁 Script executed:
#!/bin/bash
rg -n "run-(completed|started|recovered|scheduled)" -C2
Length of output: 3870
🏁 Script executed:
sed -n '100,140p' src/context/socket.tsx
Length of output: 876
🏁 Script executed:
rg -n "runStartedCallbackRef" -C3 src/context/socket.tsx
Length of output: 1227
Reset runStartedCallbackRef in disconnectQueueSocket
Add runStartedCallbackRef.current = null
alongside the other callback resets to avoid retaining a stale “run-started” handler.
--- a/src/context/socket.tsx
+++ b/src/context/socket.tsx
@@ const disconnectQueueSocket = useCallback(() => {…, []);
- socketStore.queueSocket = null;
- runCompletedCallbackRef.current = null;
- runRecoveredCallbackRef.current = null;
- runScheduledCallbackRef.current = null;
+ socketStore.queueSocket = null;
+ runStartedCallbackRef.current = null;
+ runCompletedCallbackRef.current = null;
+ runRecoveredCallbackRef.current = null;
+ runScheduledCallbackRef.current = null;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
runCompletedCallbackRef.current = null; | |
runRecoveredCallbackRef.current = null; | |
runScheduledCallbackRef.current = null; | |
}, []); | |
const disconnectQueueSocket = useCallback(() => { | |
socketStore.queueSocket = null; | |
runStartedCallbackRef.current = null; | |
runCompletedCallbackRef.current = null; | |
runRecoveredCallbackRef.current = null; | |
runScheduledCallbackRef.current = null; | |
}, []); |
🤖 Prompt for AI Agents
In src/context/socket.tsx around lines 118 to 121, disconnectQueueSocket resets
several callback refs but omits runStartedCallbackRef, which can retain a stale
run-started handler; add a line setting runStartedCallbackRef.current = null
alongside runCompletedCallbackRef.current = null,
runRecoveredCallbackRef.current = null, and runScheduledCallbackRef.current =
null so all run-* callback refs are cleared on disconnect.
Summary by CodeRabbit
New Features
Bug Fixes