Skip to content

Commit

Permalink
Add 429 retries (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
hinthornw authored Feb 22, 2024
1 parent 79a763e commit 5d95f8a
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 9 deletions.
22 changes: 21 additions & 1 deletion js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,20 @@ function assertUuid(str: string): void {
}
}

const handle429 = async (response?: Response) => {
if (response?.status === 429) {
const retryAfter =
parseInt(response.headers.get("retry-after") ?? "30", 10) * 1000;
if (retryAfter > 0) {
await new Promise((resolve) => setTimeout(resolve, retryAfter));
// Return directly after calling this check
return true;
}
}
// Fall back to existing status checks
return false;
};

export class Queue<T> {
items: [T, () => void][] = [];

Expand Down Expand Up @@ -261,6 +275,8 @@ export class Client {

private caller: AsyncCaller;

private batchIngestCaller: AsyncCaller;

private timeout_ms: number;

private _tenantId: string | null = null;
Expand Down Expand Up @@ -296,6 +312,10 @@ export class Client {
this.webUrl = trimQuotes(config.webUrl ?? defaultConfig.webUrl);
this.timeout_ms = config.timeout_ms ?? 12_000;
this.caller = new AsyncCaller(config.callerOptions ?? {});
this.batchIngestCaller = new AsyncCaller({
...(config.callerOptions ?? {}),
onFailedResponseHook: handle429,
});
this.hideInputs = config.hideInputs ?? defaultConfig.hideInputs;
this.hideOutputs = config.hideOutputs ?? defaultConfig.hideOutputs;
this.autoBatchTracing = config.autoBatchTracing ?? this.autoBatchTracing;
Expand Down Expand Up @@ -696,7 +716,7 @@ export class Client {
"Content-Type": "application/json",
Accept: "application/json",
};
const response = await this.caller.call(
const response = await this.batchIngestCaller.call(
fetch,
`${this.apiUrl}/runs/batch`,
{
Expand Down
8 changes: 4 additions & 4 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe("Batch client tracing", () => {
autoBatchTracing: true,
});
const callSpy = jest
.spyOn((client as any).caller, "call")
.spyOn((client as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
Expand Down Expand Up @@ -98,7 +98,7 @@ describe("Batch client tracing", () => {
autoBatchTracing: true,
});
const callSpy = jest
.spyOn((client as any).caller, "call")
.spyOn((client as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
Expand Down Expand Up @@ -167,7 +167,7 @@ describe("Batch client tracing", () => {
autoBatchTracing: true,
});
const callSpy = jest
.spyOn((client as any).caller, "call")
.spyOn((client as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
Expand Down Expand Up @@ -279,7 +279,7 @@ describe("Batch client tracing", () => {
autoBatchTracing: true,
});
const callSpy = jest
.spyOn((client as any).caller, "call")
.spyOn((client as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
Expand Down
2 changes: 1 addition & 1 deletion js/src/tests/run_trees.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ test("Should work with manually set API key", async () => {
apiKey: key,
});
const callSpy = jest
.spyOn((langchainClient as any).caller, "call")
.spyOn((langchainClient as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
Expand Down
18 changes: 15 additions & 3 deletions js/src/utils/async_caller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const STATUS_IGNORE = [
409, // Conflict
];

type ResponseCallback = (response?: Response) => Promise<boolean>;

export interface AsyncCallerParams {
/**
* The maximum number of concurrent calls that can be made.
Expand All @@ -26,6 +28,8 @@ export interface AsyncCallerParams {
* with an exponential backoff between each attempt. Defaults to 6.
*/
maxRetries?: number;

onFailedResponseHook?: ResponseCallback;
}

export interface AsyncCallerCallOptions {
Expand All @@ -52,24 +56,28 @@ export class AsyncCaller {

private queue: typeof import("p-queue")["default"]["prototype"];

private onFailedResponseHook?: ResponseCallback;

constructor(params: AsyncCallerParams) {
this.maxConcurrency = params.maxConcurrency ?? Infinity;
this.maxRetries = params.maxRetries ?? 6;

const PQueue = "default" in PQueueMod ? PQueueMod.default : PQueueMod;
this.queue = new PQueue({ concurrency: this.maxConcurrency });
this.onFailedResponseHook = params?.onFailedResponseHook;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
call<A extends any[], T extends (...args: A) => Promise<any>>(
callable: T,
...args: Parameters<T>
): Promise<Awaited<ReturnType<T>>> {
const onFailedResponseHook = this.onFailedResponseHook;
return this.queue.add(
() =>
pRetry(
() =>
callable(...args).catch((error) => {
callable(...(args as Parameters<T>)).catch((error) => {
// eslint-disable-next-line no-instanceof/no-instanceof
if (error instanceof Error) {
throw error;
Expand All @@ -78,7 +86,7 @@ export class AsyncCaller {
}
}),
{
onFailedAttempt(error) {
async onFailedAttempt(error) {
if (
error.message.startsWith("Cancel") ||
error.message.startsWith("TimeoutError") ||
Expand All @@ -91,13 +99,17 @@ export class AsyncCaller {
throw error;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const status = (error as any)?.response?.status;
const response: Response | undefined = (error as any)?.response;
const status = response?.status;
if (status) {
if (STATUS_NO_RETRY.includes(+status)) {
throw error;
} else if (STATUS_IGNORE.includes(+status)) {
return;
}
if (onFailedResponseHook) {
await onFailedResponseHook(response);
}
}
},
// If needed we can change some of the defaults here,
Expand Down

0 comments on commit 5d95f8a

Please sign in to comment.