diff --git a/pkg/http.test.ts b/pkg/http.test.ts index 976945b1..3ed1c76a 100644 --- a/pkg/http.test.ts +++ b/pkg/http.test.ts @@ -1,4 +1,4 @@ -import { expect, test } from "bun:test"; +import { describe, expect, test } from "bun:test"; import { HttpClient } from "./http"; import { newHttpClient } from "./test-utils"; @@ -8,8 +8,8 @@ test("remove trailing slash from urls", () => { expect(client.baseUrl).toEqual("https://example.com"); }); -test(new URL("", import.meta.url).pathname, () => { - test("when the request is invalid", () => { +describe(new URL("", import.meta.url).pathname, () => { + describe("when the request is invalid", () => { test("throws", async () => { const client = newHttpClient(); let hasThrown = false; @@ -20,7 +20,7 @@ test(new URL("", import.meta.url).pathname, () => { }); }); - test("whithout authorization", () => { + describe("whithout authorization", () => { test("throws", async () => { const client = newHttpClient(); client.headers = {}; @@ -32,3 +32,19 @@ test(new URL("", import.meta.url).pathname, () => { }); }); }); + +describe("Abort", () => { + test("should abort the request", async () => { + const controller = new AbortController(); + const signal = controller.signal; + + const client = newHttpClient(); + client.options.signal = signal; + const body = client.request({ + body: ["set", "name", "hezarfen"], + }); + controller.abort("Abort works!"); + + expect((await body).result).toEqual("Abort works!"); + }); +}); diff --git a/pkg/http.ts b/pkg/http.ts index 722edbbb..594a3f4c 100644 --- a/pkg/http.ts +++ b/pkg/http.ts @@ -19,7 +19,9 @@ export type UpstashRequest = { export type UpstashResponse = { result?: TResult; error?: string }; export interface Requester { - request: (req: UpstashRequest) => Promise>; + request: ( + req: UpstashRequest + ) => Promise>; } type ResultError = { @@ -93,6 +95,7 @@ export type HttpClientConfig = { options?: Options; retry?: RetryConfig; agent?: any; + signal?: AbortSignal; } & RequesterConfig; export class HttpClient implements Requester { @@ -101,6 +104,7 @@ export class HttpClient implements Requester { public readonly options: { backend?: string; agent: any; + signal?: AbortSignal; responseEncoding?: false | "base64"; cache?: CacheSetting; }; @@ -116,6 +120,7 @@ export class HttpClient implements Requester { agent: config.agent, responseEncoding: config.responseEncoding ?? "base64", // default to base64 cache: config.cache, + signal: config.signal, }; this.baseUrl = config.baseUrl.replace(/\/$/, ""); @@ -138,7 +143,8 @@ export class HttpClient implements Requester { } else { this.retry = { attempts: config?.retry?.retries ?? 5, - backoff: config?.retry?.backoff ?? ((retryCount) => Math.exp(retryCount) * 50), + backoff: + config?.retry?.backoff ?? ((retryCount) => Math.exp(retryCount) * 50), }; } } @@ -147,7 +153,7 @@ export class HttpClient implements Requester { function merge( obj: Record, key: string, - value?: string, + value?: string ): Record { if (!value) { return obj; @@ -160,12 +166,22 @@ export class HttpClient implements Requester { return obj; } - this.headers = merge(this.headers, "Upstash-Telemetry-Runtime", telemetry.runtime); - this.headers = merge(this.headers, "Upstash-Telemetry-Platform", telemetry.platform); + this.headers = merge( + this.headers, + "Upstash-Telemetry-Runtime", + telemetry.runtime + ); + this.headers = merge( + this.headers, + "Upstash-Telemetry-Platform", + telemetry.platform + ); this.headers = merge(this.headers, "Upstash-Telemetry-Sdk", telemetry.sdk); } - public async request(req: UpstashRequest): Promise> { + public async request( + req: UpstashRequest + ): Promise> { const requestOptions: RequestInit & { backend?: string; agent?: any } = { cache: this.options.cache, method: "POST", @@ -173,6 +189,7 @@ export class HttpClient implements Requester { body: JSON.stringify(req.body), keepalive: true, agent: this.options?.agent, + signal: this.options.signal, /** * Fastly specific @@ -184,9 +201,23 @@ export class HttpClient implements Requester { let error: Error | null = null; for (let i = 0; i <= this.retry.attempts; i++) { try { - res = await fetch([this.baseUrl, ...(req.path ?? [])].join("/"), requestOptions); + res = await fetch( + [this.baseUrl, ...(req.path ?? [])].join("/"), + requestOptions + ); break; } catch (err) { + if (this.options.signal?.aborted) { + const myBlob = new Blob([ + JSON.stringify({ result: this.options.signal.reason ?? "Aborted" }), + ]); + const myOptions = { + status: 200, + statusText: this.options.signal.reason ?? "Aborted", + }; + res = new Response(myBlob, myOptions); + break; + } error = err as Error; await new Promise((r) => setTimeout(r, this.retry.backoff(i))); } @@ -197,7 +228,9 @@ export class HttpClient implements Requester { const body = (await res.json()) as UpstashResponse; if (!res.ok) { - throw new UpstashError(`${body.error}, command was: ${JSON.stringify(req.body)}`); + throw new UpstashError( + `${body.error}, command was: ${JSON.stringify(req.body)}` + ); } if (this.options?.responseEncoding === "base64") { @@ -251,7 +284,11 @@ function decode(raw: ResultError["result"]): ResultError["result"] { case "object": { if (Array.isArray(raw)) { result = raw.map((v) => - typeof v === "string" ? base64decode(v) : Array.isArray(v) ? v.map(decode) : v, + typeof v === "string" + ? base64decode(v) + : Array.isArray(v) + ? v.map(decode) + : v ); } else { // If it's not an array it must be null diff --git a/platforms/cloudflare.ts b/platforms/cloudflare.ts index ff158a91..5e740901 100644 --- a/platforms/cloudflare.ts +++ b/platforms/cloudflare.ts @@ -22,6 +22,11 @@ export type RedisConfigCloudflare = { * UPSTASH_REDIS_REST_TOKEN */ token: string; + /** + * The signal will allow aborting requests on the fly. + * For more check: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal + */ + signal?: AbortSignal; } & core.RedisOptions & RequesterConfig & Env; @@ -42,11 +47,23 @@ export class Redis extends core.Redis { * ``` */ constructor(config: RedisConfigCloudflare, env?: Env) { - if (config.url.startsWith(" ") || config.url.endsWith(" ") || /\r|\n/.test(config.url)) { - console.warn("The redis url contains whitespace or newline, which can cause errors!"); + if ( + config.url.startsWith(" ") || + config.url.endsWith(" ") || + /\r|\n/.test(config.url) + ) { + console.warn( + "The redis url contains whitespace or newline, which can cause errors!" + ); } - if (config.token.startsWith(" ") || config.token.endsWith(" ") || /\r|\n/.test(config.token)) { - console.warn("The redis token contains whitespace or newline, which can cause errors!"); + if ( + config.token.startsWith(" ") || + config.token.endsWith(" ") || + /\r|\n/.test(config.token) + ) { + console.warn( + "The redis token contains whitespace or newline, which can cause errors!" + ); } const client = new HttpClient({ @@ -54,6 +71,7 @@ export class Redis extends core.Redis { baseUrl: config.url, headers: { authorization: `Bearer ${config.token}` }, responseEncoding: config.responseEncoding, + signal: config.signal, }); super(client, { diff --git a/platforms/nodejs.ts b/platforms/nodejs.ts index 91d52e99..bd712c5b 100644 --- a/platforms/nodejs.ts +++ b/platforms/nodejs.ts @@ -50,6 +50,11 @@ export type RedisConfigNodejs = { * } * ``` */ + /** + * The signal will allow aborting requests on the fly. + * For more check: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal + */ + signal?: AbortSignal; agent?: any; } & core.RedisOptions & RequesterConfig; @@ -99,14 +104,18 @@ export class Redis extends core.Redis { configOrRequester.url.endsWith(" ") || /\r|\n/.test(configOrRequester.url) ) { - console.warn("The redis url contains whitespace or newline, which can cause errors!"); + console.warn( + "The redis url contains whitespace or newline, which can cause errors!" + ); } if ( configOrRequester.token.startsWith(" ") || configOrRequester.token.endsWith(" ") || /\r|\n/.test(configOrRequester.token) ) { - console.warn("The redis token contains whitespace or newline, which can cause errors!"); + console.warn( + "The redis token contains whitespace or newline, which can cause errors!" + ); } const client = new HttpClient({ @@ -116,6 +125,7 @@ export class Redis extends core.Redis { agent: configOrRequester.agent, responseEncoding: configOrRequester.responseEncoding, cache: configOrRequester.cache || "no-store", + signal: configOrRequester.signal, }); super(client, { @@ -124,9 +134,16 @@ export class Redis extends core.Redis { }); this.addTelemetry({ - // @ts-ignore - runtime: typeof EdgeRuntime === "string" ? "edge-light" : `node@${process.version}`, - platform: process.env.VERCEL ? "vercel" : process.env.AWS_REGION ? "aws" : "unknown", + runtime: + // @ts-ignore + typeof EdgeRuntime === "string" + ? "edge-light" + : `node@${process.version}`, + platform: process.env.VERCEL + ? "vercel" + : process.env.AWS_REGION + ? "aws" + : "unknown", sdk: `@upstash/redis@${VERSION}`, }); } @@ -150,12 +167,16 @@ export class Redis extends core.Redis { // @ts-ignore process will be defined in node const url = process?.env.UPSTASH_REDIS_REST_URL; if (!url) { - throw new Error("Unable to find environment variable: `UPSTASH_REDIS_REST_URL`"); + throw new Error( + "Unable to find environment variable: `UPSTASH_REDIS_REST_URL`" + ); } // @ts-ignore process will be defined in node const token = process?.env.UPSTASH_REDIS_REST_TOKEN; if (!token) { - throw new Error("Unable to find environment variable: `UPSTASH_REDIS_REST_TOKEN`"); + throw new Error( + "Unable to find environment variable: `UPSTASH_REDIS_REST_TOKEN`" + ); } return new Redis({ ...config, url, token }); }