Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 42 additions & 38 deletions backend/InMemoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,53 @@ const EVICTION_TIME = 5 * 60 * 1000;
const EVICTION_CLOCK_TIME = 1 * 60 * 1000;

export class InMemoryStore {
private static store: InMemoryStore;
private store: Record<string, {
messages: Message[],
evictionTime: number
}>;

private clock: NodeJS.Timeout;

private constructor() {
this.store = {};
this.clock = setInterval(() => {
Object.entries(this.store).forEach(([key, {evictionTime}]) => {
if (evictionTime > Date.now()) {
delete this.store[key]
}
});
}, EVICTION_CLOCK_TIME);
private static store: InMemoryStore;
private store: Record<
string,
{
messages: Message[];
evictionTime: number;
}
>;

public destroy() {
clearInterval(this.clock)
}
private clock: NodeJS.Timeout;

static getInstance() {
if (!InMemoryStore.store) {
InMemoryStore.store = new InMemoryStore()
private constructor() {
this.store = {};
this.clock = setInterval(() => {
Object.entries(this.store).forEach(([key, { evictionTime }]) => {
if (evictionTime > Date.now()) {
delete this.store[key];
}
return InMemoryStore.store;
}
});
}, EVICTION_CLOCK_TIME);
}

public destroy() {
clearInterval(this.clock);
}

get(conversationId: string): Message[] {
return this.store[conversationId]?.messages ?? []
static getInstance() {
if (!InMemoryStore.store) {
InMemoryStore.store = new InMemoryStore();
}
return InMemoryStore.store;
}

get(conversationId: string): Message[] {
return this.store[conversationId]?.messages ?? [];
}

add(conversationId: string, message: Message) {
if (!this.store[conversationId]) {
this.store[conversationId] = {
messages: [],
evictionTime: Date.now() + EVICTION_TIME,
};
}

add(conversationId: string, message: Message) {
if (!this.store[conversationId]) {
this.store[conversationId] = {
messages: [],
evictionTime: Date.now() + EVICTION_TIME
}
}
this.store[conversationId]?.messages?.push(message);
this.store[conversationId].evictionTime = Date.now() + EVICTION_TIME;
}
}

this.store[conversationId]?.messages?.push(message);
this.store[conversationId].evictionTime = Date.now() + EVICTION_TIME;
}
}
160 changes: 86 additions & 74 deletions backend/openrouter.ts
Original file line number Diff line number Diff line change
@@ -1,84 +1,96 @@
import type { Message, MODEL, SUPPORTER_MODELS } from "./types";
import { Role, type Message, type MODEL, type SUPPORTER_MODELS } from "./types";
const OPENROUTER_KEY = process.env.OPENROUTER_KEY!;
const MAX_TOKEN_ITERATIONS = 1000;

type CreateCompletionOptions = {
plugins?: { id: string }[];
};

export const createCompletion = async (
messages: Message[],
model: MODEL,
cb: (chunk: string) => void,
systemPrompt?: string
messages: Message[],
model: MODEL,
cb: (chunk: string) => void,
systemPrompt?: string,
options?: CreateCompletionOptions,
) => {
return new Promise<void>(async (resolve, reject) => {
const response = await fetch('https://openrouter.ai/api/v1/chat/completions', {
method: 'POST',
headers: {
Authorization: `Bearer ${OPENROUTER_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model,
messages: messages,
stream: true,
system: systemPrompt,
}),
});

const reader = response.body?.getReader();
if (!reader) {
throw new Error('Response body is not readable');
return new Promise<void>(async (resolve, reject) => {
const response = await fetch(
"https://openrouter.ai/api/v1/chat/completions",
{
method: "POST",
headers: {
Authorization: `Bearer ${OPENROUTER_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model,
messages: [
...(systemPrompt
? [{ role: Role.System, content: systemPrompt }]
: []),
...messages,
],
stream: true,
system: systemPrompt,
plugins: options?.plugins,
}),
},
);

const reader = response.body?.getReader();
if (!reader) {
throw new Error("Response body is not readable");
}

const decoder = new TextDecoder();
let buffer = "";

try {
let tokenIterations = 0;
while (true) {
tokenIterations++;
if (tokenIterations > MAX_TOKEN_ITERATIONS) {
console.log("max token iterations");
resolve();
return;
}
const { done, value } = await reader.read();
if (done) break;

// Append new chunk to buffer
buffer += decoder.decode(value, { stream: true });

// Process complete lines from buffer
while (true) {
const lineEnd = buffer.indexOf("\n");
if (lineEnd === -1) {
console.log("max token iterations 2");
break;
}

const decoder = new TextDecoder();
let buffer = '';

try {
let tokenIterations = 0;
while (true) {

tokenIterations++;
if (tokenIterations > MAX_TOKEN_ITERATIONS) {
console.log("max token iterations");
resolve()
return;
}
const { done, value } = await reader.read();
if (done) break;

// Append new chunk to buffer
buffer += decoder.decode(value, { stream: true });

// Process complete lines from buffer
while (true) {
const lineEnd = buffer.indexOf('\n');
if (lineEnd === -1) {
console.log("max token iterations 2");
break
};

const line = buffer.slice(0, lineEnd).trim();
buffer = buffer.slice(lineEnd + 1);

if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') break;

try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
cb(content);
}
} catch (e) {
// Ignore invalid JSON - this is common in SSE streams
console.warn("Failed to parse SSE data:", data, e);
}
}
const line = buffer.slice(0, lineEnd).trim();
buffer = buffer.slice(lineEnd + 1);

if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") break;

try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
cb(content);
}
} catch (e) {
// Ignore invalid JSON - this is common in SSE streams
console.warn("Failed to parse SSE data:", data, e);
}
} finally {
resolve()
reader.cancel();
}
})
}

}
}
} finally {
resolve();
reader.cancel();
}
});
};
Loading