Skip to content

Commit

Permalink
make it possible to abort tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
  • Loading branch information
freben committed Nov 11, 2021
1 parent d8c28d9 commit 9bee0b9
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 20 deletions.
10 changes: 9 additions & 1 deletion packages/backend-tasks/api-report.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).
```ts
import { AbortSignal as AbortSignal_2 } from 'node-abort-controller';
import { Config } from '@backstage/config';
import { DatabaseManager } from '@backstage/backend-common';
import { Duration } from 'luxon';
Expand All @@ -17,13 +18,20 @@ export interface PluginTaskManager {

// @public
export interface TaskDefinition {
fn: () => void | Promise<void>;
fn: TaskFunction;
frequency: Duration;
id: string;
initialDelay?: Duration;
timeout: Duration;
}

// Warning: (ae-missing-release-tag) "TaskFunction" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export type TaskFunction =
| ((abortSignal: AbortSignal_2) => void | Promise<void>)
| (() => void | Promise<void>);

// @public
export class TaskManager {
constructor(databaseManager: DatabaseManager, logger: Logger_2);
Expand Down
36 changes: 20 additions & 16 deletions packages/backend-tasks/src/tasks/TaskWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import { Knex } from 'knex';
import { Duration } from 'luxon';
import { AbortController, AbortSignal } from 'node-abort-controller';
import { AbortController } from 'node-abort-controller';
import { v4 as uuid } from 'uuid';
import { Logger } from 'winston';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
import { TaskSettingsV1, taskSettingsV1Schema } from './types';
import { nowPlus, sleep } from './util';
import { TaskFunction, TaskSettingsV1, taskSettingsV1Schema } from './types';
import { delegateAbortController, nowPlus, sleep } from './util';

const WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });

Expand All @@ -32,24 +32,17 @@ const WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });
*/
export class TaskWorker {
private readonly taskId: string;
private readonly fn: () => void | Promise<void>;
private readonly fn: TaskFunction;
private readonly knex: Knex;
private readonly logger: Logger;
private readonly abortController: AbortController;
private readonly abortSignal: AbortSignal;

constructor(
taskId: string,
fn: () => void | Promise<void>,
knex: Knex,
logger: Logger,
) {

constructor(taskId: string, fn: TaskFunction, knex: Knex, logger: Logger) {
this.taskId = taskId;
this.fn = fn;
this.knex = knex;
this.logger = logger;
this.abortController = new AbortController();
this.abortSignal = this.abortController.signal;
}

async start(settings: TaskSettingsV1) {
Expand All @@ -65,13 +58,13 @@ export class TaskWorker {

(async () => {
try {
while (!this.abortSignal.aborted) {
while (!this.abortController.signal.aborted) {
const runResult = await this.runOnce();
if (runResult.result === 'abort') {
break;
}

await sleep(WORK_CHECK_FREQUENCY, this.abortSignal);
await sleep(WORK_CHECK_FREQUENCY, this.abortController.signal);
}
this.logger.info(`Task worker finished: ${this.taskId}`);
} catch (e) {
Expand Down Expand Up @@ -111,11 +104,22 @@ export class TaskWorker {
return { result: 'not-ready-yet' };
}

// Abort the task execution either if the worker is stopped, or if the
// task timeout is hit
const taskAbortController = delegateAbortController(
this.abortController.signal,
);
const timeoutHandle = setTimeout(() => {
taskAbortController.abort();
}, Duration.fromISO(taskSettings.timeoutAfterDuration).as('milliseconds'));

try {
await this.fn();
await this.fn(taskAbortController.signal);
} catch (e) {
await this.tryReleaseTask(ticket, taskSettings);
return { result: 'failed' };
} finally {
clearTimeout(timeoutHandle);
}

await this.tryReleaseTask(ticket, taskSettings);
Expand Down
2 changes: 1 addition & 1 deletion packages/backend-tasks/src/tasks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
*/

export { TaskManager } from './TaskManager';
export type { PluginTaskManager, TaskDefinition } from './types';
export type { PluginTaskManager, TaskDefinition, TaskFunction } from './types';
7 changes: 6 additions & 1 deletion packages/backend-tasks/src/tasks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
*/

import { Duration } from 'luxon';
import { AbortSignal } from 'node-abort-controller';
import { z } from 'zod';

export type TaskFunction =
| ((abortSignal: AbortSignal) => void | Promise<void>)
| (() => void | Promise<void>);

/**
* Options that apply to the invocation of a given task.
*
Expand All @@ -31,7 +36,7 @@ export interface TaskDefinition {
/**
* The actual task function to be invoked regularly.
*/
fn: () => void | Promise<void>;
fn: TaskFunction;

/**
* The maximum amount of time that a single task invocation can take, before
Expand Down
42 changes: 42 additions & 0 deletions packages/backend-tasks/src/tasks/util.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { AbortController } from 'node-abort-controller';
import { delegateAbortController } from './util';

describe('util', () => {
describe('delegateAbortController', () => {
it('inherits parent abort state', () => {
const parent = new AbortController();
const child = delegateAbortController(parent.signal);
expect(parent.signal.aborted).toBe(false);
expect(child.signal.aborted).toBe(false);
parent.abort();
expect(parent.signal.aborted).toBe(true);
expect(child.signal.aborted).toBe(true);
});

it('does not inherit from child to parent', () => {
const parent = new AbortController();
const child = delegateAbortController(parent.signal);
expect(parent.signal.aborted).toBe(false);
expect(child.signal.aborted).toBe(false);
child.abort();
expect(parent.signal.aborted).toBe(false);
expect(child.signal.aborted).toBe(true);
});
});
});
30 changes: 29 additions & 1 deletion packages/backend-tasks/src/tasks/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { InputError } from '@backstage/errors';
import { Knex } from 'knex';
import { DateTime, Duration } from 'luxon';
import { AbortSignal } from 'node-abort-controller';
import { AbortController, AbortSignal } from 'node-abort-controller';

// Keep the IDs compatible with e.g. Prometheus
export function validateId(id: string) {
Expand Down Expand Up @@ -75,3 +75,31 @@ export async function sleep(
abortSignal?.addEventListener('abort', done);
});
}

/**
* Creates a new AbortController that, in addition to working as a regular
* standalone controller, also gets aborted if the given parent signal
* reaches aborted state.
*
* @param parent - The "parent" signal that can trigger the delegate
*/
export function delegateAbortController(parent: AbortSignal): AbortController {
const delegate = new AbortController();

if (parent.aborted) {
delegate.abort();
} else {
const onParentAborted = () => {
delegate.abort();
};

const onChildAborted = () => {
parent.removeEventListener('abort', onParentAborted);
};

parent.addEventListener('abort', onParentAborted, { once: true });
delegate.signal.addEventListener('abort', onChildAborted, { once: true });
}

return delegate;
}

0 comments on commit 9bee0b9

Please sign in to comment.