Skip to content

Commit 55a92f7

Browse files
webui: MCP client with low coupling to current codebase
1 parent 2c453c6 commit 55a92f7

File tree

20 files changed

+2337
-5
lines changed

20 files changed

+2337
-5
lines changed
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import type {
2+
ApiChatCompletionToolCall,
3+
ApiChatCompletionToolCallDelta,
4+
ApiChatCompletionStreamChunk
5+
} from '$lib/types/api';
6+
import type { ChatMessagePromptProgress, ChatMessageTimings } from '$lib/types/chat';
7+
import { mergeToolCallDeltas, extractModelName } from '$lib/utils/chat-stream';
8+
import type { AgenticChatCompletionRequest } from './types';
9+
10+
export type OpenAISseCallbacks = {
11+
onChunk?: (chunk: string) => void;
12+
onReasoningChunk?: (chunk: string) => void;
13+
onToolCallChunk?: (serializedToolCalls: string) => void;
14+
onModel?: (model: string) => void;
15+
onFirstValidChunk?: () => void;
16+
onProcessingUpdate?: (timings?: ChatMessageTimings, progress?: ChatMessagePromptProgress) => void;
17+
};
18+
19+
export type OpenAISseTurnResult = {
20+
content: string;
21+
reasoningContent?: string;
22+
toolCalls: ApiChatCompletionToolCall[];
23+
finishReason?: string | null;
24+
timings?: ChatMessageTimings;
25+
};
26+
27+
export type OpenAISseClientOptions = {
28+
url: string;
29+
buildHeaders?: () => Record<string, string>;
30+
};
31+
32+
export class OpenAISseClient {
33+
constructor(private readonly options: OpenAISseClientOptions) {}
34+
35+
async stream(
36+
request: AgenticChatCompletionRequest,
37+
callbacks: OpenAISseCallbacks = {},
38+
abortSignal?: AbortSignal
39+
): Promise<OpenAISseTurnResult> {
40+
const response = await fetch(this.options.url, {
41+
method: 'POST',
42+
headers: {
43+
'Content-Type': 'application/json',
44+
...(this.options.buildHeaders?.() ?? {})
45+
},
46+
body: JSON.stringify(request),
47+
signal: abortSignal
48+
});
49+
50+
if (!response.ok) {
51+
const errorText = await response.text();
52+
throw new Error(errorText || `LLM request failed (${response.status})`);
53+
}
54+
55+
const reader = response.body?.getReader();
56+
if (!reader) {
57+
throw new Error('LLM response stream is not available');
58+
}
59+
60+
return this.consumeStream(reader, callbacks, abortSignal);
61+
}
62+
63+
private async consumeStream(
64+
reader: ReadableStreamDefaultReader<Uint8Array>,
65+
callbacks: OpenAISseCallbacks,
66+
abortSignal?: AbortSignal
67+
): Promise<OpenAISseTurnResult> {
68+
const decoder = new TextDecoder();
69+
let buffer = '';
70+
let aggregatedContent = '';
71+
let aggregatedReasoning = '';
72+
let aggregatedToolCalls: ApiChatCompletionToolCall[] = [];
73+
let hasOpenToolCallBatch = false;
74+
let toolCallIndexOffset = 0;
75+
let finishReason: string | null | undefined;
76+
let lastTimings: ChatMessageTimings | undefined;
77+
let modelEmitted = false;
78+
let firstValidChunkEmitted = false;
79+
80+
const finalizeToolCallBatch = () => {
81+
if (!hasOpenToolCallBatch) return;
82+
toolCallIndexOffset = aggregatedToolCalls.length;
83+
hasOpenToolCallBatch = false;
84+
};
85+
86+
const processToolCalls = (toolCalls?: ApiChatCompletionToolCallDelta[]) => {
87+
if (!toolCalls || toolCalls.length === 0) {
88+
return;
89+
}
90+
aggregatedToolCalls = mergeToolCallDeltas(
91+
aggregatedToolCalls,
92+
toolCalls,
93+
toolCallIndexOffset
94+
);
95+
if (aggregatedToolCalls.length === 0) {
96+
return;
97+
}
98+
hasOpenToolCallBatch = true;
99+
};
100+
101+
try {
102+
while (true) {
103+
if (abortSignal?.aborted) {
104+
throw new DOMException('Aborted', 'AbortError');
105+
}
106+
107+
const { done, value } = await reader.read();
108+
if (done) break;
109+
110+
buffer += decoder.decode(value, { stream: true });
111+
const lines = buffer.split('\n');
112+
buffer = lines.pop() ?? '';
113+
114+
for (const line of lines) {
115+
if (!line.startsWith('data: ')) {
116+
continue;
117+
}
118+
119+
const payload = line.slice(6);
120+
if (payload === '[DONE]' || payload.trim().length === 0) {
121+
continue;
122+
}
123+
124+
let chunk: ApiChatCompletionStreamChunk;
125+
try {
126+
chunk = JSON.parse(payload) as ApiChatCompletionStreamChunk;
127+
} catch (error) {
128+
console.error('[Agentic][SSE] Failed to parse chunk:', error);
129+
continue;
130+
}
131+
132+
if (!firstValidChunkEmitted && chunk.object === 'chat.completion.chunk') {
133+
firstValidChunkEmitted = true;
134+
callbacks.onFirstValidChunk?.();
135+
}
136+
137+
const choice = chunk.choices?.[0];
138+
const delta = choice?.delta;
139+
finishReason = choice?.finish_reason ?? finishReason;
140+
141+
if (!modelEmitted) {
142+
const chunkModel = extractModelName(chunk);
143+
if (chunkModel) {
144+
modelEmitted = true;
145+
callbacks.onModel?.(chunkModel);
146+
}
147+
}
148+
149+
if (chunk.timings || chunk.prompt_progress) {
150+
callbacks.onProcessingUpdate?.(chunk.timings, chunk.prompt_progress);
151+
if (chunk.timings) {
152+
lastTimings = chunk.timings;
153+
}
154+
}
155+
156+
if (delta?.content) {
157+
finalizeToolCallBatch();
158+
aggregatedContent += delta.content;
159+
callbacks.onChunk?.(delta.content);
160+
}
161+
162+
if (delta?.reasoning_content) {
163+
finalizeToolCallBatch();
164+
aggregatedReasoning += delta.reasoning_content;
165+
callbacks.onReasoningChunk?.(delta.reasoning_content);
166+
}
167+
168+
processToolCalls(delta?.tool_calls);
169+
}
170+
}
171+
172+
finalizeToolCallBatch();
173+
} catch (error) {
174+
if ((error as Error).name === 'AbortError') {
175+
throw error;
176+
}
177+
throw error instanceof Error ? error : new Error('LLM stream error');
178+
} finally {
179+
reader.releaseLock();
180+
}
181+
182+
return {
183+
content: aggregatedContent,
184+
reasoningContent: aggregatedReasoning || undefined,
185+
toolCalls: aggregatedToolCalls,
186+
finishReason,
187+
timings: lastTimings
188+
};
189+
}
190+
}

0 commit comments

Comments
 (0)