Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/long-bags-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

Report actual HTTP status codes in connectWs instead of generic connection error
101 changes: 101 additions & 0 deletions agents/src/inference/utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import http from 'node:http';
import { type AddressInfo } from 'node:net';
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import { APIConnectionError, APIStatusError } from '../_exceptions.js';
import { connectWs } from './utils.js';

/**
* Spins up a throwaway HTTP server that responds to WebSocket upgrade requests
* with a configurable status code instead of completing the handshake.
*/
function createRejectServer(statusCode: number): Promise<http.Server> {
return new Promise((resolve) => {
const server = http.createServer((req, res) => {
res.writeHead(statusCode);
res.end();
});
// Also handle upgrade requests to ensure ws gets the rejection
server.on('upgrade', (req, socket) => {
socket.write(
`HTTP/1.1 ${statusCode} ${http.STATUS_CODES[statusCode] || 'Unknown'}\r\n\r\n`,
);
socket.destroy();
});
server.listen(0, '127.0.0.1', () => resolve(server));
});
}

function serverUrl(server: http.Server): string {
const addr = server.address() as AddressInfo;
return `ws://127.0.0.1:${addr.port}`;
}

describe('connectWs', () => {
const servers: http.Server[] = [];

afterAll(() => {
for (const s of servers) {
s.close();
}
});

it('rejects with APIStatusError(429) for rate-limited responses', async () => {
const server = await createRejectServer(429);
servers.push(server);

const err = await connectWs(serverUrl(server), {}, 5000).catch((e) => e);

expect(err).toBeInstanceOf(APIStatusError);
expect(err.statusCode).toBe(429);
expect(err.message).toBe('LiveKit gateway quota exceeded');
expect(err.retryable).toBe(true);
});

it('rejects with APIStatusError for 401 Unauthorized', async () => {
const server = await createRejectServer(401);
servers.push(server);

const err = await connectWs(serverUrl(server), {}, 5000).catch((e) => e);

expect(err).toBeInstanceOf(APIStatusError);
expect(err.statusCode).toBe(401);
expect(err.message).toMatch(/Unexpected server response: 401/);
expect(err.retryable).toBe(false);
});

it('rejects with APIStatusError for 500 Internal Server Error', async () => {
const server = await createRejectServer(500);
servers.push(server);

const err = await connectWs(serverUrl(server), {}, 5000).catch((e) => e);

expect(err).toBeInstanceOf(APIStatusError);
expect(err.statusCode).toBe(500);
expect(err.message).toMatch(/Unexpected server response: 500/);
expect(err.retryable).toBe(true);
});

it('rejects with APIConnectionError preserving original message for network errors', async () => {
// Connect to a port where nothing is listening
const err = await connectWs('ws://127.0.0.1:1', {}, 5000).catch((e) => e);

expect(err).toBeInstanceOf(APIConnectionError);
expect(err.message).toMatch(/ECONNREFUSED/);
expect(err.retryable).toBe(true);
});

it('rejects with APIConnectionError on timeout', async () => {
// Create a server that never responds to the upgrade
const server = http.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
servers.push(server);

const err = await connectWs(serverUrl(server), {}, 100).catch((e) => e);

expect(err).toBeInstanceOf(APIConnectionError);
expect(err.message).toMatch(/Timeout/);
});
});
24 changes: 19 additions & 5 deletions agents/src/inference/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
import { AccessToken } from 'livekit-server-sdk';
import { WebSocket } from 'ws';
import { APIConnectionError, APIStatusError } from '../index.js';
import { APIConnectionError, APIStatusError } from '../_exceptions.js';

export type AnyString = string & NonNullable<unknown>;

Expand Down Expand Up @@ -35,20 +35,32 @@ export async function connectWs(
resolve(socket);
};

const onError = (err: unknown) => {
const onUnexpectedResponse = (_req: unknown, res: { statusCode: number }) => {
clearTimeout(timeout);
if (err && typeof err === 'object' && 'code' in err && (err as any).code === 429) {
socket.close();
if (res.statusCode === 429) {
reject(
new APIStatusError({
message: 'LiveKit gateway quota exceeded',
options: { statusCode: 429 },
options: { statusCode: 429, retryable: true },
}),
);
} else {
reject(new APIConnectionError({ message: 'Error connecting to LiveKit WebSocket' }));
reject(
new APIStatusError({
message: `Unexpected server response: ${res.statusCode}`,
options: { statusCode: res.statusCode },
}),
);
}
};

const onError = (err: unknown) => {
clearTimeout(timeout);
const message = err instanceof Error ? err.message : 'Error connecting to LiveKit WebSocket';
reject(new APIConnectionError({ message }));
};

const onClose = (code: number) => {
clearTimeout(timeout);
if (code !== 1000) {
Expand All @@ -59,7 +71,9 @@ export async function connectWs(
);
}
};

socket.once('open', onOpen);
socket.once('unexpected-response', onUnexpectedResponse);
socket.once('error', onError);
socket.once('close', onClose);
});
Expand Down
Loading