diff --git a/js/src/client.ts b/js/src/client.ts index 5643530d6..5f891a5ba 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -220,6 +220,20 @@ function assertUuid(str: string): void { } } +const handle429 = async (response?: Response) => { + if (response?.status === 429) { + const retryAfter = + parseInt(response.headers.get("retry-after") ?? "30", 10) * 1000; + if (retryAfter > 0) { + await new Promise((resolve) => setTimeout(resolve, retryAfter)); + // Return directly after calling this check + return true; + } + } + // Fall back to existing status checks + return false; +}; + export class Queue { items: [T, () => void][] = []; @@ -261,6 +275,8 @@ export class Client { private caller: AsyncCaller; + private batchIngestCaller: AsyncCaller; + private timeout_ms: number; private _tenantId: string | null = null; @@ -296,6 +312,10 @@ export class Client { this.webUrl = trimQuotes(config.webUrl ?? defaultConfig.webUrl); this.timeout_ms = config.timeout_ms ?? 12_000; this.caller = new AsyncCaller(config.callerOptions ?? {}); + this.batchIngestCaller = new AsyncCaller({ + ...(config.callerOptions ?? {}), + onFailedResponseHook: handle429, + }); this.hideInputs = config.hideInputs ?? defaultConfig.hideInputs; this.hideOutputs = config.hideOutputs ?? defaultConfig.hideOutputs; this.autoBatchTracing = config.autoBatchTracing ?? this.autoBatchTracing; @@ -696,7 +716,7 @@ export class Client { "Content-Type": "application/json", Accept: "application/json", }; - const response = await this.caller.call( + const response = await this.batchIngestCaller.call( fetch, `${this.apiUrl}/runs/batch`, { diff --git a/js/src/tests/batch_client.test.ts b/js/src/tests/batch_client.test.ts index 7a0e1797c..8ed9d8216 100644 --- a/js/src/tests/batch_client.test.ts +++ b/js/src/tests/batch_client.test.ts @@ -10,7 +10,7 @@ describe("Batch client tracing", () => { autoBatchTracing: true, }); const callSpy = jest - .spyOn((client as any).caller, "call") + .spyOn((client as any).batchIngestCaller, "call") .mockResolvedValue({ ok: true, text: () => "", @@ -98,7 +98,7 @@ describe("Batch client tracing", () => { autoBatchTracing: true, }); const callSpy = jest - .spyOn((client as any).caller, "call") + .spyOn((client as any).batchIngestCaller, "call") .mockResolvedValue({ ok: true, text: () => "", @@ -167,7 +167,7 @@ describe("Batch client tracing", () => { autoBatchTracing: true, }); const callSpy = jest - .spyOn((client as any).caller, "call") + .spyOn((client as any).batchIngestCaller, "call") .mockResolvedValue({ ok: true, text: () => "", @@ -279,7 +279,7 @@ describe("Batch client tracing", () => { autoBatchTracing: true, }); const callSpy = jest - .spyOn((client as any).caller, "call") + .spyOn((client as any).batchIngestCaller, "call") .mockResolvedValue({ ok: true, text: () => "", diff --git a/js/src/tests/run_trees.test.ts b/js/src/tests/run_trees.test.ts index fac33dc89..45c7ea737 100644 --- a/js/src/tests/run_trees.test.ts +++ b/js/src/tests/run_trees.test.ts @@ -15,7 +15,7 @@ test("Should work with manually set API key", async () => { apiKey: key, }); const callSpy = jest - .spyOn((langchainClient as any).caller, "call") + .spyOn((langchainClient as any).batchIngestCaller, "call") .mockResolvedValue({ ok: true, text: () => "", diff --git a/js/src/utils/async_caller.ts b/js/src/utils/async_caller.ts index 731909891..01da72d84 100644 --- a/js/src/utils/async_caller.ts +++ b/js/src/utils/async_caller.ts @@ -15,6 +15,8 @@ const STATUS_IGNORE = [ 409, // Conflict ]; +type ResponseCallback = (response?: Response) => Promise; + export interface AsyncCallerParams { /** * The maximum number of concurrent calls that can be made. @@ -26,6 +28,8 @@ export interface AsyncCallerParams { * with an exponential backoff between each attempt. Defaults to 6. */ maxRetries?: number; + + onFailedResponseHook?: ResponseCallback; } export interface AsyncCallerCallOptions { @@ -52,12 +56,15 @@ export class AsyncCaller { private queue: typeof import("p-queue")["default"]["prototype"]; + private onFailedResponseHook?: ResponseCallback; + constructor(params: AsyncCallerParams) { this.maxConcurrency = params.maxConcurrency ?? Infinity; this.maxRetries = params.maxRetries ?? 6; const PQueue = "default" in PQueueMod ? PQueueMod.default : PQueueMod; this.queue = new PQueue({ concurrency: this.maxConcurrency }); + this.onFailedResponseHook = params?.onFailedResponseHook; } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -65,11 +72,12 @@ export class AsyncCaller { callable: T, ...args: Parameters ): Promise>> { + const onFailedResponseHook = this.onFailedResponseHook; return this.queue.add( () => pRetry( () => - callable(...args).catch((error) => { + callable(...(args as Parameters)).catch((error) => { // eslint-disable-next-line no-instanceof/no-instanceof if (error instanceof Error) { throw error; @@ -78,7 +86,7 @@ export class AsyncCaller { } }), { - onFailedAttempt(error) { + async onFailedAttempt(error) { if ( error.message.startsWith("Cancel") || error.message.startsWith("TimeoutError") || @@ -91,13 +99,17 @@ export class AsyncCaller { throw error; } // eslint-disable-next-line @typescript-eslint/no-explicit-any - const status = (error as any)?.response?.status; + const response: Response | undefined = (error as any)?.response; + const status = response?.status; if (status) { if (STATUS_NO_RETRY.includes(+status)) { throw error; } else if (STATUS_IGNORE.includes(+status)) { return; } + if (onFailedResponseHook) { + await onFailedResponseHook(response); + } } }, // If needed we can change some of the defaults here,