Skip to content

Commit

Permalink
feat: exponential backoff for retries and permit cancelling bot.init (
Browse files Browse the repository at this point in the history
  • Loading branch information
KnorpelSenf authored Mar 9, 2023
1 parent 59c5379 commit 23fa95d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 15 deletions.
2 changes: 1 addition & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"lock": false,
"tasks": {
"check": "deno cache --check=all src/mod.ts",
"backport": "deno run --no-prompt --allow-read=. --allow-write=. https://deno.land/x/deno2node@v1.7.1/src/cli.ts tsconfig.json",
"backport": "deno run --no-prompt --allow-read=. --allow-write=. https://deno.land/x/deno2node@v1.7.2/src/cli.ts tsconfig.json",
"test": "deno test --seed=123456 --parallel ./test/",
"dev": "deno fmt && deno lint && deno task test && deno task check",
"coverage": "deno task test --coverage=./test/cov_profile && deno coverage --lcov --output=./coverage.lcov ./test/cov_profile",
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
"@grammyjs/types": "^2.12.1",
"abort-controller": "^3.0.0",
"debug": "^4.3.4",
"node-fetch": "^2.6.7"
"node-fetch": "^2.6.9"
},
"devDependencies": {
"@types/debug": "^4.1.7",
"@types/node": "^12.20.55",
"@types/node-fetch": "^2.6.2",
"all-contributors-cli": "^6.24.0",
"deno2node": "^1.7.1"
"deno2node": "^1.7.2"
},
"files": [
"out/"
Expand Down
82 changes: 70 additions & 12 deletions src/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,16 @@ export class Bot<
* Initializes the bot, i.e. fetches information about the bot itself. This
* method is called automatically, you usually don't have to call it
* manually.
*
* @param signal Optional `AbortSignal` to cancel the initialization
*/
async init() {
async init(signal?: AbortSignal) {
if (!this.isInited()) {
debug("Initializing bot");
this.mePromise ??= withRetries(() => this.api.getMe());
this.mePromise ??= withRetries(
() => this.api.getMe(signal),
signal,
);
let me: UserFromGetMe;
try {
me = await this.mePromise;
Expand Down Expand Up @@ -353,20 +358,27 @@ a known bot info object.",
*/
async start(options?: PollingOptions) {
// Perform setup
if (!this.isInited()) await this.init();
if (!this.isInited()) {
await this.init(this.pollingAbortController?.signal);
}
if (this.pollingRunning) {
debug("Simple long polling already running!");
return;
}
await withRetries(() =>
this.api.deleteWebhook({
drop_pending_updates: options?.drop_pending_updates,
})
await withRetries(
() =>
this.api.deleteWebhook({
drop_pending_updates: options?.drop_pending_updates,
}, this.pollingAbortController?.signal),
this.pollingAbortController?.signal,
);

// All async ops of setup complete, run callback
await options?.onStart?.(this.botInfo);

// Bot was stopped during `onStart`
if (!this.pollingRunning) return;

// Prevent common misuse that causes memory leak
this.use = () => {
throw new Error(`It looks like you are registering more listeners \
Expand Down Expand Up @@ -525,33 +537,79 @@ you can circumvent this protection against memory leaks.`);
/**
* Performs a network call task, retrying upon known errors until success.
*
* If the task errors and a retry_after value can be used, a subsequent retry
* will be delayed by the specified period of time.
*
* Otherwise, if the first attempt at running the task fails, the task is
* retried immediately. If second attempt fails, too, waits for 100 ms, and then
* doubles this delay for every subsequent attemt. Never waits longer than 1
* hour before retrying.
*
* @param task Async task to perform
* @param signal Optional `AbortSignal` to prevent further retries
*/
async function withRetries<T>(task: () => Promise<T>): Promise<T> {
async function withRetries<T>(
task: () => Promise<T>,
signal?: AbortSignal,
): Promise<T> {
let result: { ok: false } | { ok: true; value: T } = { ok: false };
const INITIAL_DELAY = 100; // ms
let delay = INITIAL_DELAY;
while (!result.ok) {
let mustDelay = true;
try {
result = { ok: true, value: await task() };
mustDelay = false;
} catch (error) {
debugErr(error);
if (error instanceof HttpError) continue;
if (error instanceof GrammyError) {
if (error.error_code >= 500) continue;
if (error.error_code === 429) {
const retryAfter = error.parameters.retry_after;
if (retryAfter !== undefined) await sleep(retryAfter);
if (retryAfter !== undefined) {
await sleep(retryAfter, signal);
mustDelay = false;
}
continue;
}
}
throw error;
} finally {
if (mustDelay) {
if (delay !== INITIAL_DELAY) {
await sleep(delay, signal);
}
// double the next delay but cap it at 1 hour
delay = Math.min(1 * 60 * 60 * 1000, delay + delay);
}
}
}
return result.value;
}

/**
* Returns a new promise that resolves after the specified number of seconds.
* Returns a new promise that resolves after the specified number of seconds, or
* rejects as soon as the given signal is aborted.
*/
function sleep(seconds: number) {
return new Promise((r) => setTimeout(r, 1000 * seconds));
async function sleep(seconds: number, signal?: AbortSignal) {
let handle: number | undefined;
let reject: ((err: Error) => void) | undefined;
function abort() {
reject?.(new Error("Aborted delay"));
if (handle !== undefined) clearTimeout(handle);
}
try {
await new Promise<void>((res, rej) => {
reject = rej;
if (signal?.aborted) {
abort();
return;
}
signal?.addEventListener("abort", abort);
handle = setTimeout(res, 1000 * seconds);
});
} finally {
signal?.removeEventListener("abort", abort);
}
}

0 comments on commit 23fa95d

Please sign in to comment.