Skip to content

Commit 1741e49

Browse files
committed
aggregate instance SSE streams through server bus so UI uses single connection
1 parent 8577b3d commit 1741e49

File tree

6 files changed

+253
-118
lines changed

6 files changed

+253
-118
lines changed

packages/server/src/api-types.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@ export interface InstanceData {
111111
agentModelSelections: AgentModelSelection
112112
}
113113

114+
export type InstanceStreamStatus = "connecting" | "connected" | "error" | "disconnected"
115+
116+
export interface InstanceStreamEvent {
117+
type: string
118+
properties?: Record<string, unknown>
119+
[key: string]: unknown
120+
}
121+
114122
export interface BinaryRecord {
115123
id: string
116124
path: string
@@ -157,6 +165,8 @@ export type WorkspaceEventType =
157165
| "config.appChanged"
158166
| "config.binariesChanged"
159167
| "instance.dataChanged"
168+
| "instance.event"
169+
| "instance.eventStatus"
160170

161171
export type WorkspaceEventPayload =
162172
| { type: "workspace.created"; workspace: WorkspaceDescriptor }
@@ -167,6 +177,8 @@ export type WorkspaceEventPayload =
167177
| { type: "config.appChanged"; config: AppConfig }
168178
| { type: "config.binariesChanged"; binaries: BinaryRecord[] }
169179
| { type: "instance.dataChanged"; instanceId: string; data: InstanceData }
180+
| { type: "instance.event"; instanceId: string; event: InstanceStreamEvent }
181+
| { type: "instance.eventStatus"; instanceId: string; status: InstanceStreamStatus; reason?: string }
170182

171183
export interface ServerMeta {
172184
/** Base URL clients should target for REST calls (useful for Electron embedding). */

packages/server/src/events/bus.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ export class EventBus extends EventEmitter {
88
}
99

1010
publish(event: WorkspaceEventPayload): boolean {
11-
this.logger?.debug({ event }, "Publishing workspace event")
11+
if (event.type !== "instance.event" && event.type !== "instance.eventStatus") {
12+
this.logger?.debug({ event }, "Publishing workspace event")
13+
}
1214
return super.emit(event.type, event)
1315
}
1416

@@ -22,6 +24,8 @@ export class EventBus extends EventEmitter {
2224
this.on("config.appChanged", handler)
2325
this.on("config.binariesChanged", handler)
2426
this.on("instance.dataChanged", handler)
27+
this.on("instance.event", handler)
28+
this.on("instance.eventStatus", handler)
2529
return () => {
2630
this.off("workspace.created", handler)
2731
this.off("workspace.started", handler)
@@ -31,6 +35,8 @@ export class EventBus extends EventEmitter {
3135
this.off("config.appChanged", handler)
3236
this.off("config.binariesChanged", handler)
3337
this.off("instance.dataChanged", handler)
38+
this.off("instance.event", handler)
39+
this.off("instance.eventStatus", handler)
3440
}
3541
}
3642
}

packages/server/src/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ import { FileSystemBrowser } from "./filesystem/browser"
1414
import { EventBus } from "./events/bus"
1515
import { ServerMeta } from "./api-types"
1616
import { InstanceStore } from "./storage/instance-store"
17+
import { InstanceEventBridge } from "./workspaces/instance-events"
1718
import { createLogger } from "./logger"
1819
import { launchInBrowser } from "./launcher"
1920

2021
const require = createRequire(import.meta.url)
22+
2123
const packageJson = require("../package.json") as { version: string }
2224
const __filename = fileURLToPath(import.meta.url)
2325
const __dirname = path.dirname(__filename)
@@ -121,6 +123,11 @@ async function main() {
121123
})
122124
const fileSystemBrowser = new FileSystemBrowser({ rootDir: options.rootDir, unrestricted: options.unrestrictedRoot })
123125
const instanceStore = new InstanceStore()
126+
const instanceEventBridge = new InstanceEventBridge({
127+
workspaceManager,
128+
eventBus,
129+
logger: logger.child({ component: "instance-events" }),
130+
})
124131

125132
const serverMeta: ServerMeta = {
126133
httpBaseUrl: `http://${options.host}:${options.port}`,
@@ -169,6 +176,7 @@ async function main() {
169176
}
170177

171178
try {
179+
instanceEventBridge.shutdown()
172180
await workspaceManager.shutdown()
173181
logger.info("Workspace manager shutdown complete")
174182
} catch (error) {
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import { fetch } from "undici"
2+
import { EventBus } from "../events/bus"
3+
import { Logger } from "../logger"
4+
import { WorkspaceManager } from "./manager"
5+
import { InstanceStreamEvent, InstanceStreamStatus } from "../api-types"
6+
7+
const INSTANCE_HOST = "127.0.0.1"
8+
const RECONNECT_DELAY_MS = 1000
9+
10+
interface InstanceEventBridgeOptions {
11+
workspaceManager: WorkspaceManager
12+
eventBus: EventBus
13+
logger: Logger
14+
}
15+
16+
interface ActiveStream {
17+
controller: AbortController
18+
task: Promise<void>
19+
}
20+
21+
export class InstanceEventBridge {
22+
private readonly streams = new Map<string, ActiveStream>()
23+
24+
constructor(private readonly options: InstanceEventBridgeOptions) {
25+
const bus = this.options.eventBus
26+
bus.on("workspace.started", (event) => this.startStream(event.workspace.id))
27+
bus.on("workspace.stopped", (event) => this.stopStream(event.workspaceId))
28+
bus.on("workspace.error", (event) => this.stopStream(event.workspace.id))
29+
}
30+
31+
shutdown() {
32+
for (const [id, active] of this.streams) {
33+
active.controller.abort()
34+
this.publishStatus(id, "disconnected")
35+
}
36+
this.streams.clear()
37+
}
38+
39+
private startStream(workspaceId: string) {
40+
if (this.streams.has(workspaceId)) {
41+
return
42+
}
43+
44+
const controller = new AbortController()
45+
const task = this.runStream(workspaceId, controller.signal)
46+
.catch((error) => {
47+
if (!controller.signal.aborted) {
48+
this.options.logger.warn({ workspaceId, err: error }, "Instance event stream failed")
49+
this.publishStatus(workspaceId, "error", error instanceof Error ? error.message : String(error))
50+
}
51+
})
52+
.finally(() => {
53+
const active = this.streams.get(workspaceId)
54+
if (active?.controller === controller) {
55+
this.streams.delete(workspaceId)
56+
}
57+
})
58+
59+
this.streams.set(workspaceId, { controller, task })
60+
}
61+
62+
private stopStream(workspaceId: string) {
63+
const active = this.streams.get(workspaceId)
64+
if (!active) {
65+
return
66+
}
67+
active.controller.abort()
68+
this.streams.delete(workspaceId)
69+
this.publishStatus(workspaceId, "disconnected")
70+
}
71+
72+
private async runStream(workspaceId: string, signal: AbortSignal) {
73+
while (!signal.aborted) {
74+
const port = this.options.workspaceManager.getInstancePort(workspaceId)
75+
if (!port) {
76+
await this.delay(RECONNECT_DELAY_MS, signal)
77+
continue
78+
}
79+
80+
this.publishStatus(workspaceId, "connecting")
81+
82+
try {
83+
await this.consumeStream(workspaceId, port, signal)
84+
} catch (error) {
85+
if (signal.aborted) {
86+
break
87+
}
88+
this.options.logger.warn({ workspaceId, err: error }, "Instance event stream disconnected")
89+
this.publishStatus(workspaceId, "error", error instanceof Error ? error.message : String(error))
90+
await this.delay(RECONNECT_DELAY_MS, signal)
91+
}
92+
}
93+
}
94+
95+
private async consumeStream(workspaceId: string, port: number, signal: AbortSignal) {
96+
const url = `http://${INSTANCE_HOST}:${port}/event`
97+
const response = await fetch(url, {
98+
headers: { Accept: "text/event-stream" },
99+
signal,
100+
})
101+
102+
if (!response.ok || !response.body) {
103+
throw new Error(`Instance event stream unavailable (${response.status})`)
104+
}
105+
106+
this.publishStatus(workspaceId, "connected")
107+
108+
const reader = response.body.getReader()
109+
const decoder = new TextDecoder()
110+
let buffer = ""
111+
112+
while (!signal.aborted) {
113+
const { done, value } = await reader.read()
114+
if (done || !value) {
115+
break
116+
}
117+
buffer += decoder.decode(value, { stream: true })
118+
buffer = this.flushEvents(buffer, workspaceId)
119+
}
120+
}
121+
122+
private flushEvents(buffer: string, workspaceId: string) {
123+
let separatorIndex = buffer.indexOf("\n\n")
124+
125+
while (separatorIndex >= 0) {
126+
const chunk = buffer.slice(0, separatorIndex)
127+
buffer = buffer.slice(separatorIndex + 2)
128+
this.processChunk(chunk, workspaceId)
129+
separatorIndex = buffer.indexOf("\n\n")
130+
}
131+
132+
return buffer
133+
}
134+
135+
private processChunk(chunk: string, workspaceId: string) {
136+
const lines = chunk.split(/\r?\n/)
137+
const dataLines: string[] = []
138+
139+
for (const line of lines) {
140+
if (line.startsWith(":")) {
141+
continue
142+
}
143+
if (line.startsWith("data:")) {
144+
dataLines.push(line.slice(5).trimStart())
145+
}
146+
}
147+
148+
if (dataLines.length === 0) {
149+
return
150+
}
151+
152+
const payload = dataLines.join("\n").trim()
153+
if (!payload) {
154+
return
155+
}
156+
157+
try {
158+
const event = JSON.parse(payload) as InstanceStreamEvent
159+
this.options.eventBus.publish({ type: "instance.event", instanceId: workspaceId, event })
160+
} catch (error) {
161+
this.options.logger.warn({ workspaceId, chunk: payload, err: error }, "Failed to parse instance SSE payload")
162+
}
163+
}
164+
165+
private publishStatus(instanceId: string, status: InstanceStreamStatus, reason?: string) {
166+
this.options.eventBus.publish({ type: "instance.eventStatus", instanceId, status, reason })
167+
}
168+
169+
private delay(duration: number, signal: AbortSignal) {
170+
if (duration <= 0) {
171+
return Promise.resolve()
172+
}
173+
return new Promise<void>((resolve) => {
174+
const timeout = setTimeout(() => {
175+
signal.removeEventListener("abort", onAbort)
176+
resolve()
177+
}, duration)
178+
179+
const onAbort = () => {
180+
clearTimeout(timeout)
181+
resolve()
182+
}
183+
184+
signal.addEventListener("abort", onAbort, { once: true })
185+
})
186+
}
187+
}

0 commit comments

Comments
 (0)