Skip to content

Commit

Permalink
use a proper AbortController instead of the home baked one
Browse files Browse the repository at this point in the history
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
  • Loading branch information
freben committed Nov 11, 2021
1 parent cc3b6c2 commit b91fe07
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 86 deletions.
1 change: 1 addition & 0 deletions packages/backend-tasks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"knex": "^0.95.1",
"lodash": "^4.17.21",
"luxon": "^2.0.2",
"node-abort-controller": "^3.0.1",
"uuid": "^8.0.0",
"winston": "^3.2.1",
"zod": "^3.9.5"
Expand Down
49 changes: 0 additions & 49 deletions packages/backend-tasks/src/tasks/CancelToken.ts

This file was deleted.

26 changes: 9 additions & 17 deletions packages/backend-tasks/src/tasks/PluginTaskManagerJanitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

import { Knex } from 'knex';
import { Duration } from 'luxon';
import { AbortController, AbortSignal } from 'node-abort-controller';
import { Logger } from 'winston';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
import { CancelToken } from './CancelToken';
import { sleep } from './util';

/**
* Makes sure to auto-expire and clean up things that time out or for other
Expand All @@ -28,7 +29,8 @@ export class PluginTaskManagerJanitor {
private readonly knex: Knex;
private readonly waitBetweenRuns: Duration;
private readonly logger: Logger;
private readonly cancelToken: CancelToken;
private readonly abortController: AbortController;
private readonly abortSignal: AbortSignal;

constructor(options: {
knex: Knex;
Expand All @@ -38,23 +40,24 @@ export class PluginTaskManagerJanitor {
this.knex = options.knex;
this.waitBetweenRuns = options.waitBetweenRuns;
this.logger = options.logger;
this.cancelToken = CancelToken.create();
this.abortController = new AbortController();
this.abortSignal = this.abortController.signal;
}

async start() {
while (!this.cancelToken.isCancelled) {
while (!this.abortSignal.aborted) {
try {
await this.runOnce();
} catch (e) {
this.logger.warn(`Error while performing janitorial tasks, ${e}`);
}

await this.sleep(this.waitBetweenRuns);
await sleep(this.waitBetweenRuns, this.abortSignal);
}
}

async stop() {
this.cancelToken.cancel();
this.abortController.abort();
}

private async runOnce() {
Expand All @@ -79,15 +82,4 @@ export class PluginTaskManagerJanitor {
}
}
}

/**
* Sleeps for the given duration, but aborts sooner if the cancel token
* triggers.
*/
private async sleep(duration: Duration) {
await Promise.race([
new Promise(resolve => setTimeout(resolve, duration.as('milliseconds'))),
this.cancelToken.promise,
]);
}
}
29 changes: 9 additions & 20 deletions packages/backend-tasks/src/tasks/TaskWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import { Knex } from 'knex';
import { Duration } from 'luxon';
import { AbortController, AbortSignal } from 'node-abort-controller';
import { v4 as uuid } from 'uuid';
import { Logger } from 'winston';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
import { CancelToken } from './CancelToken';
import { TaskSettingsV1, taskSettingsV1Schema } from './types';
import { nowPlus } from './util';
import { nowPlus, sleep } from './util';

const WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });

Expand All @@ -35,7 +35,8 @@ export class TaskWorker {
private readonly fn: () => void | Promise<void>;
private readonly knex: Knex;
private readonly logger: Logger;
private readonly cancelToken: CancelToken;
private readonly abortController: AbortController;
private readonly abortSignal: AbortSignal;

constructor(
taskId: string,
Expand All @@ -47,7 +48,8 @@ export class TaskWorker {
this.fn = fn;
this.knex = knex;
this.logger = logger;
this.cancelToken = CancelToken.create();
this.abortController = new AbortController();
this.abortSignal = this.abortController.signal;
}

async start(settings: TaskSettingsV1) {
Expand All @@ -63,13 +65,13 @@ export class TaskWorker {

(async () => {
try {
while (!this.cancelToken.isCancelled) {
while (!this.abortSignal.aborted) {
const runResult = await this.runOnce();
if (runResult.result === 'abort') {
break;
}

await this.sleep(WORK_CHECK_FREQUENCY);
await sleep(WORK_CHECK_FREQUENCY, this.abortSignal);
}
this.logger.info(`Task worker finished: ${this.taskId}`);
} catch (e) {
Expand All @@ -79,7 +81,7 @@ export class TaskWorker {
}

stop() {
this.cancelToken.cancel();
this.abortController.abort();
}

/**
Expand Down Expand Up @@ -120,19 +122,6 @@ export class TaskWorker {
return { result: 'completed' };
}

/**
* Sleep for the given duration, but abort sooner if the cancel token
* triggers.
*
* @param duration - The amount of time to sleep, at most
*/
private async sleep(duration: Duration): Promise<void> {
await Promise.race([
new Promise(resolve => setTimeout(resolve, duration.as('milliseconds'))),
this.cancelToken.promise,
]);
}

/**
* Perform the initial store of the task info
*/
Expand Down
32 changes: 32 additions & 0 deletions packages/backend-tasks/src/tasks/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { InputError } from '@backstage/errors';
import { Knex } from 'knex';
import { DateTime, Duration } from 'luxon';
import { AbortSignal } from 'node-abort-controller';

// Keep the IDs compatible with e.g. Prometheus
export function validateId(id: string) {
Expand All @@ -43,3 +44,34 @@ export function nowPlus(duration: Duration | undefined, knex: Knex) {
? knex.raw(`datetime('now', ?)`, [`${seconds} seconds`])
: knex.raw(`now() + interval '${seconds} seconds'`);
}

/**
* Sleep for the given duration, but return sooner if the abort signal
* triggers.
*
* @param duration - The amount of time to sleep, at most
* @param abortSignal - An optional abort signal that short circuits the wait
*/
export async function sleep(
duration: Duration,
abortSignal?: AbortSignal,
): Promise<void> {
if (abortSignal?.aborted) {
return;
}

await new Promise<void>(resolve => {
let timeoutHandle: NodeJS.Timeout | undefined = undefined;

const done = () => {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
abortSignal?.removeEventListener('abort', done);
resolve();
};

timeoutHandle = setTimeout(done, duration.as('milliseconds'));
abortSignal?.addEventListener('abort', done);
});
}

0 comments on commit b91fe07

Please sign in to comment.