Skip to content

Commit 540573d

Browse files
Websocket followup (#49)
* websocket followup added * websocket followup added
1 parent b98b344 commit 540573d

File tree

6 files changed

+202
-36
lines changed

6 files changed

+202
-36
lines changed

packages/agent/src/agent/BaseAgent.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,19 @@ export abstract class BaseAgent {
132132
*/
133133
abstract destroy(): Promise<void>;
134134

135+
/**
136+
* Abort current execution
137+
* Triggers the abort signal to stop the current task
138+
* Must be implemented by concrete agent classes
139+
*/
140+
abstract abort(): void;
141+
142+
/**
143+
* Check if agent is currently executing
144+
* Must be implemented by concrete agent classes
145+
*/
146+
abstract isExecuting(): boolean;
147+
135148
/**
136149
* Get current agent metadata
137150
*/

packages/agent/src/agent/ClaudeSDKAgent.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,9 @@ export class ClaudeSDKAgent extends BaseAgent {
191191
logger.info(
192192
'⚠️ Agent execution aborted (caught during iterator wait)',
193193
);
194-
// Cleanup iterator
194+
// Cleanup iterator (fire-and-forget to avoid blocking)
195195
if (iterator.return) {
196-
await iterator.return(undefined).catch(() => {});
196+
iterator.return(undefined).catch(() => {});
197197
}
198198
return;
199199
}
@@ -375,6 +375,26 @@ export class ClaudeSDKAgent extends BaseAgent {
375375
}
376376
}
377377

378+
/**
379+
* Abort current execution
380+
* Triggers abort signal to stop the current task gracefully
381+
*/
382+
abort(): void {
383+
if (this.abortController) {
384+
logger.info('🛑 Aborting ClaudeSDKAgent execution');
385+
this.abortController.abort();
386+
} else {
387+
logger.warn('⚠️ Cancel not fully supported - no active execution');
388+
}
389+
}
390+
391+
/**
392+
* Check if agent is currently executing
393+
*/
394+
isExecuting(): boolean {
395+
return this.metadata.state === 'executing' && this.abortController !== null;
396+
}
397+
378398
/**
379399
* Cleanup agent resources
380400
*

packages/agent/src/agent/CodexSDKAgent.ts

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import {accessSync, constants as fsConstants} from 'node:fs';
77
import {dirname, join} from 'node:path';
88

9-
import {Codex, type McpServerConfig} from '@browseros/codex-sdk-ts';
9+
import {Codex, Thread, type McpServerConfig} from '@browseros/codex-sdk-ts';
1010
import {logger} from '@browseros/common';
1111
import type {ControllerBridge} from '@browseros/controller-server';
1212
import {allControllerTools} from '@browseros/tools/controller-based';
@@ -66,6 +66,7 @@ export class CodexSDKAgent extends BaseAgent {
6666
private codex: Codex | null = null;
6767
private codexExecutablePath: string | null = null;
6868
private codexConfigPath: string | null = null;
69+
private currentThread: Thread | null = null;
6970

7071
constructor(config: AgentConfig, _controllerBridge: ControllerBridge) {
7172
const mcpServerConfig = buildMcpServerConfig(config);
@@ -338,7 +339,17 @@ export class CodexSDKAgent extends BaseAgent {
338339
});
339340
}
340341

341-
const thread = this.codex.startThread(threadOptions);
342+
// Reuse existing thread for follow-up messages, or create new one
343+
// CRITICAL: Check both existence AND thread ID (ID is null if cancelled before thread.started event)
344+
if (!this.currentThread || !this.currentThread.id) {
345+
this.currentThread = this.codex.startThread(threadOptions);
346+
logger.info('🆕 Created new thread for session');
347+
} else {
348+
logger.info('♻️ Reusing existing thread for follow-up message', {
349+
threadId: this.currentThread.id,
350+
});
351+
}
352+
const thread = this.currentThread;
342353

343354
// Get streaming events from thread
344355
const messages: Array<{type: 'text'; text: string}> = [];
@@ -368,6 +379,9 @@ export class CodexSDKAgent extends BaseAgent {
368379
logger.info(
369380
'⚠️ Agent execution aborted by client (breaking loop)',
370381
);
382+
// Clear thread - next message will create fresh thread
383+
this.currentThread = null;
384+
logger.debug('🔄 Cleared thread reference due to abort');
371385
break;
372386
}
373387

@@ -454,9 +468,14 @@ export class CodexSDKAgent extends BaseAgent {
454468
}
455469
} finally {
456470
// CRITICAL: Close iterator to trigger SIGKILL in forked SDK's finally block
471+
// Fire-and-forget to avoid blocking markIdle() - subprocess cleanup can happen async
457472
if (iterator.return) {
458473
logger.debug('🔒 Closing iterator to terminate Codex subprocess');
459-
await iterator.return(undefined);
474+
iterator.return(undefined).catch((error) => {
475+
logger.warn('⚠️ Iterator cleanup error (non-fatal)', {
476+
error: error instanceof Error ? error.message : String(error),
477+
});
478+
});
460479
}
461480
}
462481

@@ -469,6 +488,10 @@ export class CodexSDKAgent extends BaseAgent {
469488
duration: Date.now() - this.executionStartTime,
470489
});
471490
} catch (error) {
491+
// Clear thread on error - next call will create fresh thread
492+
this.currentThread = null;
493+
logger.debug('🔄 Cleared thread reference due to error');
494+
472495
// Mark execution error
473496
this.errorExecution(
474497
error instanceof Error ? error : new Error(String(error)),
@@ -486,6 +509,24 @@ export class CodexSDKAgent extends BaseAgent {
486509
}
487510
}
488511

512+
/**
513+
* Abort current execution
514+
* Triggers abort signal to stop the current task gracefully
515+
*/
516+
abort(): void {
517+
if (this.abortController) {
518+
logger.info('🛑 Aborting CodexSDKAgent execution');
519+
this.abortController.abort();
520+
}
521+
}
522+
523+
/**
524+
* Check if agent is currently executing
525+
*/
526+
isExecuting(): boolean {
527+
return this.metadata.state === 'executing' && this.abortController !== null;
528+
}
529+
489530
/**
490531
* Cleanup agent resources
491532
*
@@ -500,6 +541,9 @@ export class CodexSDKAgent extends BaseAgent {
500541

501542
this.markDestroyed();
502543

544+
// Clear thread reference
545+
this.currentThread = null;
546+
503547
// Trigger abort controller for cleanup
504548
if (this.abortController) {
505549
this.abortController.abort();

packages/agent/src/session/SessionManager.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,45 @@ export class SessionManager {
260260
logger.debug('Session marked as idle', {sessionId});
261261
}
262262

263+
/**
264+
* Cancel current execution for a session
265+
* Triggers abort on the agent if it's executing
266+
* CRITICAL: Does NOT mark session as idle - let processMessage() handle that
267+
*
268+
* @param sessionId - Session ID
269+
* @returns true if cancel was triggered, false if not executing or agent not found
270+
*/
271+
cancelExecution(sessionId: string): boolean {
272+
const agent = this.agents.get(sessionId);
273+
if (!agent) {
274+
logger.warn('⚠️ Cancel requested but no agent found', {sessionId});
275+
return false;
276+
}
277+
278+
// Defensive: check abort support
279+
if (typeof agent.abort !== 'function') {
280+
logger.warn('⚠️ Agent does not support cancel', {
281+
sessionId,
282+
agentType: agent.getMetadata().type,
283+
});
284+
return false;
285+
}
286+
287+
if (!agent.isExecuting()) {
288+
logger.debug('⚠️ Cancel requested but agent not executing', {sessionId});
289+
return false;
290+
}
291+
292+
logger.info('🛑 Cancelling execution', {sessionId});
293+
agent.abort();
294+
295+
// CRITICAL: Do NOT mark idle here!
296+
// Let the original processMessage() call mark idle when it completes
297+
// Otherwise we get race condition: new messages can start while execute() is still in finally block
298+
299+
return true;
300+
}
301+
263302
/**
264303
* Delete a session and its agent
265304
*

packages/agent/src/websocket/protocol.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,29 @@ import {z} from 'zod';
1717
// ============================================================================
1818

1919
/**
20-
* Message sent from client to server
20+
* Regular message from client
2121
*/
22-
export const ClientMessageSchema = z.object({
22+
export const ClientRegularMessageSchema = z.object({
2323
type: z.literal('message'),
2424
content: z.string().min(1, 'Message content cannot be empty'),
2525
});
2626

27+
/**
28+
* Cancel message from client
29+
*/
30+
export const ClientCancelMessageSchema = z.object({
31+
type: z.literal('cancel'),
32+
sessionId: z.string().optional(),
33+
});
34+
35+
/**
36+
* Discriminated union of all client message types
37+
*/
38+
export const ClientMessageSchema = z.discriminatedUnion('type', [
39+
ClientRegularMessageSchema,
40+
ClientCancelMessageSchema,
41+
]);
42+
2743
export type ClientMessage = z.infer<typeof ClientMessageSchema>;
2844

2945
// ============================================================================
@@ -63,6 +79,17 @@ export const AgentEventSchema = z.object({
6379

6480
export type AgentEvent = z.infer<typeof AgentEventSchema>;
6581

82+
/**
83+
* Cancelled event (acknowledgment of cancel request)
84+
*/
85+
export const CancelledEventSchema = z.object({
86+
type: z.literal('cancelled'),
87+
sessionId: z.string(),
88+
message: z.string().optional(),
89+
});
90+
91+
export type CancelledEvent = z.infer<typeof CancelledEventSchema>;
92+
6693
/**
6794
* Error event
6895
*/
@@ -80,6 +107,7 @@ export type ErrorEvent = z.infer<typeof ErrorEventSchema>;
80107
export const ServerEventSchema = z.union([
81108
ConnectionEventSchema,
82109
AgentEventSchema,
110+
CancelledEventSchema,
83111
ErrorEventSchema,
84112
]);
85113

packages/agent/src/websocket/server.ts

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -94,23 +94,23 @@ export function createServer(
9494
// Track WebSocket connections (needed to close idle sessions)
9595
const wsConnections = new Map<string, ServerWebSocket<WebSocketData>>();
9696

97-
// Cleanup idle sessions callback (now async)
98-
const cleanupIdle = async () => {
99-
const idleSessionIds = sessionManager.findIdleSessions();
100-
101-
for (const sessionId of idleSessionIds) {
102-
const ws = wsConnections.get(sessionId);
103-
if (ws) {
104-
logger.info('🧹 Closing idle session', {sessionId});
105-
ws.close(1001, 'Idle timeout');
106-
wsConnections.delete(sessionId);
107-
}
108-
await sessionManager.deleteSession(sessionId);
109-
}
110-
};
111-
112-
// Run cleanup check with the timer
113-
setInterval(cleanupIdle, 60000);
97+
// Cleanup idle sessions callback (now async) -> commenting out for now as we should let BrowserOS agent handle this
98+
// const cleanupIdle = async () => {
99+
// const idleSessionIds = sessionManager.findIdleSessions();
100+
101+
// for (const sessionId of idleSessionIds) {
102+
// const ws = wsConnections.get(sessionId);
103+
// if (ws) {
104+
// logger.info('🧹 Closing idle session', {sessionId});
105+
// ws.close(1001, 'Idle timeout');
106+
// wsConnections.delete(sessionId);
107+
// }
108+
// await sessionManager.deleteSession(sessionId);
109+
// }
110+
// };
111+
112+
// // Run cleanup check with the timer
113+
// setInterval(cleanupIdle, 60000);
114114

115115
const server = Bun.serve<WebSocketData>({
116116
port: config.port,
@@ -244,15 +244,6 @@ export function createServer(
244244
return;
245245
}
246246

247-
// Try to mark session as processing (reject if already processing)
248-
if (!sessionManager.markProcessing(sessionId)) {
249-
sendError(
250-
ws,
251-
'Session is already processing a message. Please wait.',
252-
);
253-
return;
254-
}
255-
256247
// Parse message
257248
const messageStr =
258249
typeof message === 'string'
@@ -266,11 +257,40 @@ export function createServer(
266257
const clientMessage = tryParseClientMessage(parsedData);
267258

268259
if (!clientMessage) {
269-
sessionManager.markIdle(sessionId); // Mark idle before returning
270260
sendError(ws, 'Invalid message format');
271261
return;
272262
}
273263

264+
// Handle cancel message
265+
if (clientMessage.type === 'cancel') {
266+
logger.info('🛑 Cancel request received', {sessionId});
267+
268+
const success = sessionManager.cancelExecution(sessionId);
269+
270+
// Send cancelled acknowledgment
271+
const cancelledEvent = {
272+
type: 'cancelled',
273+
sessionId,
274+
message: success
275+
? 'Execution cancelled'
276+
: 'No active execution to cancel',
277+
};
278+
ws.send(JSON.stringify(cancelledEvent));
279+
280+
logger.info(success ? '✅ Cancel successful' : '⚠️ Nothing to cancel', {sessionId});
281+
return;
282+
}
283+
284+
// Handle regular message
285+
// Try to mark session as processing (reject if already processing)
286+
if (!sessionManager.markProcessing(sessionId)) {
287+
sendError(
288+
ws,
289+
'Session is already processing a message. Please wait.',
290+
);
291+
return;
292+
}
293+
274294
// Update stats
275295
stats.messagesProcessed++;
276296

@@ -408,8 +428,10 @@ async function processMessage(
408428
try {
409429
result = await Promise.race([iterator.next(), timeoutPromise]);
410430
} catch (timeoutError) {
411-
// Cleanup iterator
412-
if (iterator.return) await iterator.return(undefined);
431+
// Cleanup iterator (fire-and-forget - session will be deleted anyway)
432+
if (iterator.return) {
433+
iterator.return(undefined).catch(() => {});
434+
}
413435
throw new Error(
414436
`Event gap timeout: No activity for ${config.eventGapTimeoutMs / 1000}s`,
415437
);

0 commit comments

Comments
 (0)