-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(maxun-core): robot browser recording crashes #811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a public abort-state getter in the Interpreter, introduces batched persistence with timers/retries in WorkflowInterpreter, bounds Airtable/GSheet processing loops, hardens browser init/cleanup error handling, adds per-call .catch handlers for integration updates, and extends server pool and global process error handlers. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Runner
participant WI as WorkflowInterpreter
participant DB as Database
Runner->>WI: interpret(items...)
WI->>WI: addToPersistenceBatch(item)
WI->>WI: scheduleBatchFlush(timer/size)
opt Batch trigger (timeout or size)
WI->>DB: flushPersistenceBuffer() (transaction)
alt success
DB-->>WI: committed
else failure
WI->>WI: retry with backoff
WI->>DB: re-attempt
end
end
Runner->>WI: clearState() / stop()
WI->>DB: flush remaining buffer
WI-->>Runner: stopped
sequenceDiagram
autonumber
actor Orchestrator
participant Ctrl as BrowserController
participant Session as BrowserSession
Orchestrator->>Ctrl: createRemoteBrowserForRun()
Ctrl->>Session: initializeBrowserAsync()
alt init OK
Session-->>Ctrl: initialized
else init fails
Ctrl->>Session: switchOff() (attempt cleanup)
note right of Ctrl: log cleanup result
Ctrl-->>Orchestrator: rethrow original error
end
Orchestrator->>Ctrl: destroyRemoteBrowser()
Ctrl->>Ctrl: try { cleanup namespace } catch (err) { log error }
sequenceDiagram
autonumber
actor Trigger
participant Scheduler as Scheduler/API/Worker
participant Airtable as processAirtableUpdates()
participant GSheet as processGoogleSheetUpdates()
Trigger->>Scheduler: triggerIntegrationUpdates()
Scheduler->>Airtable: processAirtableUpdates().catch(log)
Scheduler->>GSheet: processGoogleSheetUpdates().catch(log)
note right of Airtable: runs bounded window (60s)
note right of GSheet: runs bounded window (60s)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60–75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
maxun-core/src/interpret.ts
(1 hunks)server/src/api/record.ts
(1 hunks)server/src/browser-management/controller.ts
(3 hunks)server/src/pgboss-worker.ts
(1 hunks)server/src/server.ts
(2 hunks)server/src/workflow-management/classes/Interpreter.ts
(5 hunks)server/src/workflow-management/integrations/airtable.ts
(1 hunks)server/src/workflow-management/integrations/gsheet.ts
(3 hunks)server/src/workflow-management/scheduler/index.ts
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
server/src/workflow-management/scheduler/index.ts (2)
server/src/workflow-management/integrations/airtable.ts (1)
processAirtableUpdates
(456-500)server/src/workflow-management/integrations/gsheet.ts (1)
processGoogleSheetUpdates
(288-330)
server/src/api/record.ts (2)
server/src/workflow-management/integrations/airtable.ts (1)
processAirtableUpdates
(456-500)server/src/workflow-management/integrations/gsheet.ts (1)
processGoogleSheetUpdates
(288-330)
server/src/browser-management/controller.ts (1)
server/src/server.ts (2)
browserPool
(91-91)io
(86-86)
server/src/workflow-management/classes/Interpreter.ts (1)
server/src/db/models/index.js (1)
sequelize
(16-16)
server/src/pgboss-worker.ts (2)
server/src/workflow-management/integrations/airtable.ts (1)
processAirtableUpdates
(456-500)server/src/workflow-management/integrations/gsheet.ts (1)
processGoogleSheetUpdates
(288-330)
🔇 Additional comments (4)
maxun-core/src/interpret.ts (1)
126-131
: Getter addition looks good.
Thanks for exposing the abort flag cleanly; this keeps callers from poking at private state while preserving existing semantics.server/src/server.ts (2)
40-45
: Stronger pool configuration.
Nice to see the pool tuned with sensible caps and timeouts; this should curb connection leaks during spikes.
224-238
: Appreciate the global fail-safes.
Catching unhandled rejections/exceptions here gives us at least one clean log line before restarting, and the delayed exit in production is a good compromise.server/src/workflow-management/integrations/gsheet.ts (1)
289-330
: Bounded loop plus cleanup FTW.
The 60‑second window, retry pruning, and auto-purging of terminal states should keep this worker from pinning the event loop forever while still giving pending tasks a few chances.
try { | ||
const namespace = io.of(id); | ||
namespace.removeAllListeners(); | ||
namespace.disconnectSockets(true); | ||
logger.log('debug', `Cleaned up socket namespace for browser ${id}`); | ||
} catch (namespaceCleanupError: any) { | ||
logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean up namespace registry as well
removeAllListeners()
plus disconnectSockets(true)
stops activity, but the namespace entry stays in io._nsps
, so every run leaves a UUID‑named namespace hanging around. Given we create fresh IDs per run, that map grows without bound and retains adapters/listeners, undermining the resource-leak fix this PR is aiming for. Please drop the namespace from the server registry once cleanup succeeds (Socket.IO 4 exposes the map at io._nsps
). One straightforward approach:
const namespace = io.of(id);
namespace.removeAllListeners();
namespace.disconnectSockets(true);
logger.log('debug', `Cleaned up socket namespace for browser ${id}`);
+ const namespaces = (io as any)._nsps as Map<string, unknown> | undefined;
+ namespaces?.delete(namespace.name);
That keeps us aligned with the recommended disconnect‑then‑delete flow for dynamic namespaces and prevents the process from accumulating stale entries over long uptimes. Based on learnings.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try { | |
const namespace = io.of(id); | |
namespace.removeAllListeners(); | |
namespace.disconnectSockets(true); | |
logger.log('debug', `Cleaned up socket namespace for browser ${id}`); | |
} catch (namespaceCleanupError: any) { | |
logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`); | |
} | |
try { | |
const namespace = io.of(id); | |
namespace.removeAllListeners(); | |
namespace.disconnectSockets(true); | |
logger.log('debug', `Cleaned up socket namespace for browser ${id}`); | |
const namespaces = (io as any)._nsps as Map<string, unknown> | undefined; | |
namespaces?.delete(namespace.name); | |
} catch (namespaceCleanupError: any) { | |
logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`); | |
} |
🤖 Prompt for AI Agents
In server/src/browser-management/controller.ts around lines 118 to 125, after
successfully removing listeners and disconnecting sockets for the namespace,
also remove the namespace entry from the server registry (io._nsps) to avoid
accumulating stale UUID namespaces: after namespace.disconnectSockets(true)
check that the registry entry exists and call the appropriate delete method
(io._nsps.delete(id) for a Map or delete io._nsps[id] for a plain object) so the
namespace is removed only on successful cleanup, and keep this inside the try
block before logging the cleanup complete message.
private async flushPersistenceBuffer(): Promise<void> { | ||
if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) { | ||
return; | ||
} | ||
|
||
if (this.persistenceTimer) { | ||
clearTimeout(this.persistenceTimer); | ||
this.persistenceTimer = null; | ||
} | ||
|
||
this.persistenceInProgress = true; | ||
const batchToProcess = [...this.persistenceBuffer]; | ||
this.persistenceBuffer = []; | ||
|
||
try { | ||
const sequelize = require('../../storage/db').default; | ||
await sequelize.transaction(async (transaction: any) => { | ||
const { Run } = require('../../models'); | ||
const run = await Run.findOne({ | ||
where: { runId: this.currentRunId! }, | ||
transaction | ||
}); | ||
|
||
if (!run) { | ||
logger.log('warn', `Run not found for batched persistence: ${this.currentRunId}`); | ||
return; | ||
} | ||
|
||
const currentSerializableOutput = run.serializableOutput ? | ||
JSON.parse(JSON.stringify(run.serializableOutput)) : | ||
{ scrapeSchema: [], scrapeList: [] }; | ||
|
||
let hasUpdates = false; | ||
|
||
for (const item of batchToProcess) { | ||
if (item.actionType === 'scrapeSchema') { | ||
const newSchemaData = Array.isArray(item.data) ? item.data : [item.data]; | ||
currentSerializableOutput.scrapeSchema = newSchemaData; | ||
hasUpdates = true; | ||
} else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') { | ||
if (!Array.isArray(currentSerializableOutput.scrapeList)) { | ||
currentSerializableOutput.scrapeList = []; | ||
} | ||
currentSerializableOutput.scrapeList[item.listIndex] = item.data; | ||
hasUpdates = true; | ||
} | ||
} | ||
|
||
if (hasUpdates) { | ||
await run.update({ | ||
serializableOutput: currentSerializableOutput | ||
}, { transaction }); | ||
|
||
logger.log('debug', `Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items`); | ||
} | ||
}); | ||
|
||
this.persistenceRetryCount = 0; | ||
|
||
} catch (error: any) { | ||
logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`); | ||
|
||
if (!this.persistenceRetryCount) { | ||
this.persistenceRetryCount = 0; | ||
} | ||
|
||
if (this.persistenceRetryCount < 3) { | ||
this.persistenceBuffer.unshift(...batchToProcess); | ||
this.persistenceRetryCount++; | ||
|
||
const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000); | ||
setTimeout(async () => { | ||
await this.flushPersistenceBuffer(); | ||
}, backoffDelay); | ||
|
||
logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`); | ||
} else { | ||
logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`); | ||
this.persistenceRetryCount = 0; | ||
} | ||
} finally { | ||
this.persistenceInProgress = false; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix batching deadlock when a flush overlaps new writes.
If new data lands during an in-flight flushPersistenceBuffer
, persistDataToDatabase
skips scheduleBatchFlush()
because persistenceInProgress
is still true. Once the current flush completes, nothing wakes up the remaining buffer, so those items sit in memory indefinitely and never reach the DB. Please queue another flush after the current transaction ends.
Apply this diff to reschedule once the flush finishes:
} finally {
this.persistenceInProgress = false;
+
+ if (this.persistenceBuffer.length > 0 && !this.persistenceTimer) {
+ this.scheduleBatchFlush();
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private async flushPersistenceBuffer(): Promise<void> { | |
if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) { | |
return; | |
} | |
if (this.persistenceTimer) { | |
clearTimeout(this.persistenceTimer); | |
this.persistenceTimer = null; | |
} | |
this.persistenceInProgress = true; | |
const batchToProcess = [...this.persistenceBuffer]; | |
this.persistenceBuffer = []; | |
try { | |
const sequelize = require('../../storage/db').default; | |
await sequelize.transaction(async (transaction: any) => { | |
const { Run } = require('../../models'); | |
const run = await Run.findOne({ | |
where: { runId: this.currentRunId! }, | |
transaction | |
}); | |
if (!run) { | |
logger.log('warn', `Run not found for batched persistence: ${this.currentRunId}`); | |
return; | |
} | |
const currentSerializableOutput = run.serializableOutput ? | |
JSON.parse(JSON.stringify(run.serializableOutput)) : | |
{ scrapeSchema: [], scrapeList: [] }; | |
let hasUpdates = false; | |
for (const item of batchToProcess) { | |
if (item.actionType === 'scrapeSchema') { | |
const newSchemaData = Array.isArray(item.data) ? item.data : [item.data]; | |
currentSerializableOutput.scrapeSchema = newSchemaData; | |
hasUpdates = true; | |
} else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') { | |
if (!Array.isArray(currentSerializableOutput.scrapeList)) { | |
currentSerializableOutput.scrapeList = []; | |
} | |
currentSerializableOutput.scrapeList[item.listIndex] = item.data; | |
hasUpdates = true; | |
} | |
} | |
if (hasUpdates) { | |
await run.update({ | |
serializableOutput: currentSerializableOutput | |
}, { transaction }); | |
logger.log('debug', `Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items`); | |
} | |
}); | |
this.persistenceRetryCount = 0; | |
} catch (error: any) { | |
logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`); | |
if (!this.persistenceRetryCount) { | |
this.persistenceRetryCount = 0; | |
} | |
if (this.persistenceRetryCount < 3) { | |
this.persistenceBuffer.unshift(...batchToProcess); | |
this.persistenceRetryCount++; | |
const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000); | |
setTimeout(async () => { | |
await this.flushPersistenceBuffer(); | |
}, backoffDelay); | |
logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`); | |
} else { | |
logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`); | |
this.persistenceRetryCount = 0; | |
} | |
} finally { | |
this.persistenceInProgress = false; | |
} | |
}; | |
private async flushPersistenceBuffer(): Promise<void> { | |
if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) { | |
return; | |
} | |
if (this.persistenceTimer) { | |
clearTimeout(this.persistenceTimer); | |
this.persistenceTimer = null; | |
} | |
this.persistenceInProgress = true; | |
const batchToProcess = [...this.persistenceBuffer]; | |
this.persistenceBuffer = []; | |
try { | |
const sequelize = require('../../storage/db').default; | |
await sequelize.transaction(async (transaction: any) => { | |
const { Run } = require('../../models'); | |
const run = await Run.findOne({ | |
where: { runId: this.currentRunId! }, | |
transaction | |
}); | |
if (!run) { | |
logger.log('warn', `Run not found for batched persistence: ${this.currentRunId}`); | |
return; | |
} | |
const currentSerializableOutput = run.serializableOutput | |
? JSON.parse(JSON.stringify(run.serializableOutput)) | |
: { scrapeSchema: [], scrapeList: [] }; | |
let hasUpdates = false; | |
for (const item of batchToProcess) { | |
if (item.actionType === 'scrapeSchema') { | |
const newSchemaData = Array.isArray(item.data) ? item.data : [item.data]; | |
currentSerializableOutput.scrapeSchema = newSchemaData; | |
hasUpdates = true; | |
} else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') { | |
if (!Array.isArray(currentSerializableOutput.scrapeList)) { | |
currentSerializableOutput.scrapeList = []; | |
} | |
currentSerializableOutput.scrapeList[item.listIndex] = item.data; | |
hasUpdates = true; | |
} | |
} | |
if (hasUpdates) { | |
await run.update( | |
{ serializableOutput: currentSerializableOutput }, | |
{ transaction } | |
); | |
logger.log( | |
'debug', | |
`Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items` | |
); | |
} | |
}); | |
this.persistenceRetryCount = 0; | |
} catch (error: any) { | |
logger.log( | |
'error', | |
`Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}` | |
); | |
if (!this.persistenceRetryCount) { | |
this.persistenceRetryCount = 0; | |
} | |
if (this.persistenceRetryCount < 3) { | |
this.persistenceBuffer.unshift(...batchToProcess); | |
this.persistenceRetryCount++; | |
const backoffDelay = Math.min( | |
5000 * Math.pow(2, this.persistenceRetryCount), | |
30000 | |
); | |
setTimeout(async () => { | |
await this.flushPersistenceBuffer(); | |
}, backoffDelay); | |
logger.log( | |
'warn', | |
`Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms` | |
); | |
} else { | |
logger.log( | |
'error', | |
`Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items` | |
); | |
this.persistenceRetryCount = 0; | |
} | |
} finally { | |
this.persistenceInProgress = false; | |
// If new items arrived during the flush, ensure we schedule another flush | |
if (this.persistenceBuffer.length > 0 && !this.persistenceTimer) { | |
this.scheduleBatchFlush(); | |
} | |
} | |
}; |
🤖 Prompt for AI Agents
In server/src/workflow-management/classes/Interpreter.ts around lines 611-694,
the finally block of flushPersistenceBuffer clears persistenceInProgress but
doesn't reschedule a flush if new items were added while the flush ran, causing
items to be left in memory; after setting this.persistenceInProgress = false add
logic to check if this.persistenceBuffer.length > 0 and there is no existing
persistenceTimer, and if so call this.scheduleBatchFlush() (or otherwise
schedule a timeout) to queue another flush; ensure you don't double-schedule by
checking persistenceTimer or a similar flag before scheduling.
if (task.retries < MAX_RETRIES) { | ||
airtableUpdateTasks[runId].retries += 1; | ||
console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries + 1}`); | ||
} else { | ||
airtableUpdateTasks[runId].status = 'failed'; | ||
console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`); | ||
logger.log('error', `Permanent failure for run ${runId}: ${error.message}`); | ||
console.log(`Max retries reached for runId: ${runId}. Removing task.`); | ||
delete airtableUpdateTasks[runId]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix retry counter logic to match new MAX_RETRIES guard.
Each scheduled task is still seeded with retries: 5
in triggerIntegrationUpdates
(see server/src/pgboss-worker.ts
, Lines 91-103). Because this block now checks task.retries < MAX_RETRIES
and increments upward, freshly scheduled tasks immediately fail the guard (5 < 3
is false), so the first transient Airtable error is treated as “max retries reached.” That’s a regression from prior behavior and drops resiliency for all Airtable runs. Please align the counter semantics—easiest fix is to initialize both Airtable and Google Sheets tasks with retries: 0
so the counter reflects attempts used, or otherwise adjust the conditional to work with the existing seed value. Example patch for the initializer:
- retries: 5,
+ retries: 0,
Apply the same change for the Google Sheets task to keep behavior consistent. After that, this guard will actually allow the intended three retry attempts before capping out.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In server/src/workflow-management/integrations/airtable.ts around lines 477-483
the retry guard treats freshly scheduled tasks as already maxed out because
tasks are seeded with retries: 5 elsewhere; fix by updating the task seeding in
server/src/pgboss-worker.ts (around lines 91-103) to initialize retries: 0 for
both the Airtable and Google Sheets tasks so the existing check task.retries <
MAX_RETRIES works as intended; apply the same initialization change for the
Google Sheets task to keep behavior consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
maxun-core/src/interpret.ts
(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
maxun-core/src/interpret.ts (1)
maxun-core/src/browserSide/scraper.js (1)
selector
(28-28)
let args = step.args; | ||
|
||
if (Array.isArray(args) && args.length === 1) { | ||
args = [args[0], { timeout: 30000 }]; | ||
} else if (!Array.isArray(args)) { | ||
args = [args, { timeout: 30000 }]; | ||
} | ||
await executeAction(invokee, methodName, step.args); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore the 30 s timeout for waitForLoadState
.
We build the normalized args
array but still call executeAction
with the original step.args
, so the timeout never actually takes effect. The interpreter can still hang indefinitely, defeating the purpose of this change. Please pass the normalized args
through.
- let args = step.args;
+ let args = step.args;
if (Array.isArray(args) && args.length === 1) {
args = [args[0], { timeout: 30000 }];
} else if (!Array.isArray(args)) {
args = [args, { timeout: 30000 }];
}
- await executeAction(invokee, methodName, step.args);
+ await executeAction(invokee, methodName, args);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let args = step.args; | |
if (Array.isArray(args) && args.length === 1) { | |
args = [args[0], { timeout: 30000 }]; | |
} else if (!Array.isArray(args)) { | |
args = [args, { timeout: 30000 }]; | |
} | |
await executeAction(invokee, methodName, step.args); | |
let args = step.args; | |
if (Array.isArray(args) && args.length === 1) { | |
args = [args[0], { timeout: 30000 }]; | |
} else if (!Array.isArray(args)) { | |
args = [args, { timeout: 30000 }]; | |
} | |
await executeAction(invokee, methodName, args); |
🤖 Prompt for AI Agents
In maxun-core/src/interpret.ts around lines 620 to 627, the code builds a
normalized args array with a 30s timeout but still calls executeAction with the
original step.args, so the timeout is never applied; update the executeAction
call to pass the normalized args variable (e.g., args) instead of step.args so
the timeout is used, preserving the existing normalization logic and behavior.
What this PR does?
Fixes #798
Fixes #799
Fixes #800
Fixes #801
Fixes #802
Summary by CodeRabbit
New Features
Bug Fixes
Chores