Skip to content

Commit 2824863

Browse files
spencermarxruvnet
andcommitted
feat(dashboard/server): per-execution event journal + events API
Each `command_executions` row gets one JSONL file at `.ocr/data/events/<execution_id>.jsonl`. command-runner appends one StreamEvent per line; the dashboard's `GET /api/commands/:id/events` route reads the file back for live-reload rehydration and history replay. The journal is also load-bearing for the resume-recovery primitive: when relational state misses a `vendor_session_id` capture, the SessionCaptureService walks the JSONL and backfills. Append-only, atomic-rename-friendly, malformed-line tolerant. The file is read-only on the recovery path — writers go through the SQL primitives in `@open-code-review/cli/db`; readers may consult JSONL but never write it as authoritative state. Co-Authored-By: claude-flow <ruv@ruv.net>
1 parent 861275e commit 2824863

3 files changed

Lines changed: 252 additions & 1 deletion

File tree

packages/dashboard/src/server/routes/commands.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { Router } from 'express'
66
import type { Database } from 'sql.js'
77
import { getCommandHistory } from '../db.js'
88
import { getActiveCommands } from '../socket/command-runner.js'
9+
import { readEventJournal } from '../services/event-journal.js'
910

1011
type CommandDefinition = {
1112
name: string
@@ -44,7 +45,7 @@ const AVAILABLE_COMMANDS: CommandDefinition[] = [
4445
},
4546
]
4647

47-
export function createCommandsRouter(db: Database): Router {
48+
export function createCommandsRouter(db: Database, ocrDir: string): Router {
4849
const router = Router()
4950

5051
// GET /api/commands — List available commands with descriptions
@@ -80,5 +81,32 @@ export function createCommandsRouter(db: Database): Router {
8081
}
8182
})
8283

84+
// GET /api/commands/:id/events — Replay the per-execution event stream.
85+
//
86+
// Returns the contents of `.ocr/data/events/<id>.jsonl` parsed back into
87+
// a StreamEvent[]. Used by the client for two paths:
88+
// 1. Rehydration when a tab reloads mid-run — the live socket
89+
// subscription only sees events from now on; this fills in the gap.
90+
// 2. History replay — expanding a completed command in the history
91+
// list lazy-fetches its events to render the timeline.
92+
//
93+
// Returns an empty array (not 404) when no journal exists. Non-AI
94+
// commands and rows that predate the events feature have no journal —
95+
// the client treats empty as "use the legacy raw output instead."
96+
router.get('/:id/events', (req, res) => {
97+
const id = parseInt(req.params['id'] ?? '', 10)
98+
if (!Number.isFinite(id) || id <= 0) {
99+
res.status(400).json({ error: 'Invalid execution id' })
100+
return
101+
}
102+
try {
103+
const events = readEventJournal(ocrDir, id)
104+
res.json({ execution_id: id, events })
105+
} catch (err) {
106+
console.error(`Failed to read events for execution ${id}:`, err)
107+
res.status(500).json({ error: 'Failed to read event journal' })
108+
}
109+
})
110+
83111
return router
84112
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/**
2+
* Event journal — round-trip + edge-case tests for the JSONL persistence
3+
* helper that backs `command:event` rehydration.
4+
*/
5+
6+
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
7+
import { mkdtempSync, readFileSync, rmSync, writeFileSync } from 'node:fs'
8+
import { tmpdir } from 'node:os'
9+
import { join } from 'node:path'
10+
import {
11+
EventJournalAppender,
12+
eventJournalPath,
13+
readEventJournal,
14+
} from '../event-journal.js'
15+
import type { StreamEvent } from '../ai-cli/types.js'
16+
17+
let workspace: string
18+
let ocrDir: string
19+
20+
beforeEach(() => {
21+
workspace = mkdtempSync(join(tmpdir(), 'ocr-events-'))
22+
ocrDir = join(workspace, '.ocr')
23+
})
24+
25+
afterEach(() => {
26+
rmSync(workspace, { recursive: true, force: true })
27+
})
28+
29+
function makeEvent(seq: number, overrides: Partial<StreamEvent> = {}): StreamEvent {
30+
return {
31+
type: 'text_delta',
32+
text: `chunk ${seq}`,
33+
executionId: 1,
34+
agentId: 'orchestrator',
35+
timestamp: new Date(2026, 0, 1, 0, 0, seq).toISOString(),
36+
seq,
37+
...overrides,
38+
} as StreamEvent
39+
}
40+
41+
describe('event-journal', () => {
42+
it('appends each event as one JSON line and reads them back in order', async () => {
43+
const appender = new EventJournalAppender(ocrDir, 1)
44+
appender.append(makeEvent(1))
45+
appender.append(makeEvent(2, { type: 'message', text: 'final', executionId: 1 } as never))
46+
await appender.close()
47+
48+
const path = eventJournalPath(ocrDir, 1)
49+
const raw = readFileSync(path, 'utf-8')
50+
const lines = raw.trim().split('\n')
51+
expect(lines).toHaveLength(2)
52+
53+
const events = readEventJournal(ocrDir, 1)
54+
expect(events).toHaveLength(2)
55+
expect(events[0]?.seq).toBe(1)
56+
expect(events[1]?.seq).toBe(2)
57+
})
58+
59+
it('returns an empty array when no journal exists', () => {
60+
expect(readEventJournal(ocrDir, 999)).toEqual([])
61+
})
62+
63+
it('skips malformed lines rather than throwing', async () => {
64+
// Initialize directory by appending one valid event, then close.
65+
const appender = new EventJournalAppender(ocrDir, 7)
66+
appender.append(makeEvent(1))
67+
await appender.close()
68+
// Inject a malformed line at the end of the file.
69+
const path = eventJournalPath(ocrDir, 7)
70+
const original = readFileSync(path, 'utf-8')
71+
writeFileSync(path, original + '{this is not json}\n', 'utf-8')
72+
73+
const events = readEventJournal(ocrDir, 7)
74+
expect(events).toHaveLength(1)
75+
expect(events[0]?.seq).toBe(1)
76+
})
77+
78+
it('append after close is a no-op rather than throwing', () => {
79+
const appender = new EventJournalAppender(ocrDir, 11)
80+
appender.close()
81+
expect(() => appender.append(makeEvent(1))).not.toThrow()
82+
})
83+
84+
it('lazily creates the events directory on first appender', async () => {
85+
// The appender's constructor should have created the directory; the
86+
// path is what we care about.
87+
const appender = new EventJournalAppender(ocrDir, 42)
88+
appender.append(makeEvent(1))
89+
await appender.close()
90+
const path = eventJournalPath(ocrDir, 42)
91+
expect(readFileSync(path, 'utf-8').length).toBeGreaterThan(0)
92+
})
93+
})
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* Event journal — JSONL persistence for live command streams.
3+
*
4+
* Each command_executions row gets one journal file at
5+
* `.ocr/data/events/<execution_id>.jsonl`. The command-runner appends one
6+
* `StreamEvent` per JSON line as the AI CLI emits them; the dashboard's
7+
* `GET /api/commands/:id/events` route reads the file back for rehydration
8+
* (page reload mid-run) and history-replay.
9+
*
10+
* Why JSONL on disk rather than a sqlite table:
11+
* 1. Append-only writes avoid the sql.js merge-before-write rename dance
12+
* under high event throughput
13+
* 2. The format is trivially `tail -f`-able for humans debugging a run
14+
* 3. Event volume per execution is bounded but non-trivial (hundreds to
15+
* low-thousands per active review) — keeping it out of the DB keeps
16+
* the in-memory sql.js DB small
17+
* 4. No schema migration needed if the event union evolves
18+
*
19+
* Writes are best-effort and intentionally non-blocking — if the journal
20+
* write fails, the live socket emit still happens, and the user just loses
21+
* the ability to replay/reload-rehydrate that one event. The command itself
22+
* does NOT fail because of a journal error.
23+
*/
24+
25+
import {
26+
createWriteStream,
27+
existsSync,
28+
mkdirSync,
29+
readFileSync,
30+
type WriteStream,
31+
} from 'node:fs'
32+
import { join } from 'node:path'
33+
import type { StreamEvent } from './ai-cli/types.js'
34+
35+
/**
36+
* Resolves the directory where event journals live for a given workspace.
37+
* Lazily creates the directory so first-run installs work without setup.
38+
*/
39+
export function eventsDir(ocrDir: string): string {
40+
const dir = join(ocrDir, 'data', 'events')
41+
if (!existsSync(dir)) {
42+
mkdirSync(dir, { recursive: true })
43+
}
44+
return dir
45+
}
46+
47+
/**
48+
* Resolves the journal file path for a single execution.
49+
* The file may or may not exist yet — appendEvent creates it on first write.
50+
*/
51+
export function eventJournalPath(ocrDir: string, executionId: number): string {
52+
return join(eventsDir(ocrDir), `${executionId}.jsonl`)
53+
}
54+
55+
/**
56+
* Per-execution append handle. Keeps a write stream open for the lifetime
57+
* of the execution so we don't pay the open/close cost on every event.
58+
*
59+
* Call `close()` when the execution finishes. Idempotent.
60+
*/
61+
export class EventJournalAppender {
62+
private stream: WriteStream | null
63+
readonly path: string
64+
65+
constructor(ocrDir: string, executionId: number) {
66+
this.path = eventJournalPath(ocrDir, executionId)
67+
// 'a' = append, creates if missing
68+
this.stream = createWriteStream(this.path, { flags: 'a' })
69+
// Errors on the stream are logged but don't crash the runner — this is
70+
// a best-effort journal, not a load-bearing path.
71+
this.stream.on('error', (err) => {
72+
console.error(`[event-journal] write error for ${this.path}:`, err)
73+
this.stream = null
74+
})
75+
}
76+
77+
append(event: StreamEvent): void {
78+
if (!this.stream) return
79+
this.stream.write(JSON.stringify(event) + '\n')
80+
}
81+
82+
/**
83+
* Close the underlying write stream. Returns a promise that resolves
84+
* once the OS has flushed all pending writes, so callers that need
85+
* to read the file back synchronously (tests, the events route on
86+
* a just-finished execution) can await this.
87+
*
88+
* Idempotent — calling close after the stream is already closed is
89+
* a no-op that resolves immediately.
90+
*/
91+
close(): Promise<void> {
92+
if (!this.stream) return Promise.resolve()
93+
const stream = this.stream
94+
this.stream = null
95+
return new Promise<void>((resolve) => {
96+
stream.end(() => resolve())
97+
})
98+
}
99+
}
100+
101+
/**
102+
* Reads all events for a given execution. Returns an empty array when no
103+
* journal exists yet (pre-AI command, journal write failed, or execution
104+
* predates the event-stream feature).
105+
*
106+
* The events are returned in write order. Malformed lines are skipped with
107+
* a warning rather than throwing — partial recovery is more useful than
108+
* an all-or-nothing failure for a debug surface.
109+
*/
110+
export function readEventJournal(ocrDir: string, executionId: number): StreamEvent[] {
111+
const path = eventJournalPath(ocrDir, executionId)
112+
if (!existsSync(path)) return []
113+
let raw: string
114+
try {
115+
raw = readFileSync(path, 'utf-8')
116+
} catch {
117+
return []
118+
}
119+
const events: StreamEvent[] = []
120+
const lines = raw.split('\n')
121+
for (const line of lines) {
122+
if (!line.trim()) continue
123+
try {
124+
events.push(JSON.parse(line) as StreamEvent)
125+
} catch (err) {
126+
console.warn(`[event-journal] malformed line in ${path}:`, err)
127+
}
128+
}
129+
return events
130+
}

0 commit comments

Comments
 (0)