Skip to content

Commit

Permalink
final cleanup, removing unschedule
Browse files Browse the repository at this point in the history
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
  • Loading branch information
freben committed Nov 11, 2021
1 parent 9bee0b9 commit 26cf7f6
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 71 deletions.
2 changes: 1 addition & 1 deletion packages/backend-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { TaskManager } from '@backstage/backend-tasks';

const manager = TaskManager.fromConfig(rootConfig).forPlugin('my-plugin');

const { unschedule } = await manager.scheduleTask({
await manager.scheduleTask({
id: 'refresh-things',
frequency: Duration.fromObject({ minutes: 10 }),
fn: async () => {
Expand Down
5 changes: 2 additions & 3 deletions packages/backend-tasks/api-report.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import { Logger as Logger_2 } from 'winston';

// @public
export interface PluginTaskManager {
scheduleTask(task: TaskDefinition): Promise<{
unschedule: () => Promise<void>;
}>;
scheduleTask(task: TaskDefinition): Promise<void>;
}

// @public
Expand All @@ -22,6 +20,7 @@ export interface TaskDefinition {
frequency: Duration;
id: string;
initialDelay?: Duration;
signal?: AbortSignal_2;
timeout: Duration;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/backend-tasks/migrations/20210928160613_init.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ exports.up = async function up(knex) {
.comment('JSON serialized object with properties for this task');
table
.dateTime('next_run_start_at')
.nullable()
.notNullable()
.comment('The next time that the task should be started');
table
.text('current_run_ticket')
Expand Down
2 changes: 1 addition & 1 deletion packages/backend-tasks/src/database/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const DB_TASKS_TABLE = 'backstage_backend_tasks__tasks';
export type DbTasksRow = {
id: string;
settings_json: string;
next_run_start_at?: Date | string;
next_run_start_at: Date;
current_run_ticket?: string;
current_run_started_at?: Date | string;
current_run_expires_at?: Date | string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe('PluginTaskManagerImpl', () => {
const { manager } = await init(databaseId);

const fn = jest.fn();
const { unschedule } = await manager.scheduleTask({
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
Expand All @@ -55,8 +55,6 @@ describe('PluginTaskManagerImpl', () => {
await waitForExpect(() => {
expect(fn).toBeCalled();
});

await unschedule();
},
);
});
Expand Down
25 changes: 11 additions & 14 deletions packages/backend-tasks/src/tasks/PluginTaskManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,22 @@ export class PluginTaskManagerImpl implements PluginTaskManager {
private readonly logger: Logger,
) {}

async scheduleTask(
task: TaskDefinition,
): Promise<{ unschedule: () => Promise<void> }> {
async scheduleTask(task: TaskDefinition): Promise<void> {
validateId(task.id);

const knex = await this.databaseFactory();

const worker = new TaskWorker(task.id, task.fn, knex, this.logger);
await worker.start({
version: 1,
initialDelayDuration: task.initialDelay?.toISO(),
recurringAtMostEveryDuration: task.frequency.toISO(),
timeoutAfterDuration: task.timeout.toISO(),
});

return {
async unschedule() {
await worker.stop();
await worker.start(
{
version: 1,
initialDelayDuration: task.initialDelay?.toISO(),
recurringAtMostEveryDuration: task.frequency.toISO(),
timeoutAfterDuration: task.timeout.toISO(),
},
{
signal: task.signal,
},
};
);
}
}
24 changes: 11 additions & 13 deletions packages/backend-tasks/src/tasks/PluginTaskManagerJanitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import { Knex } from 'knex';
import { Duration } from 'luxon';
import { AbortController, AbortSignal } from 'node-abort-controller';
import { AbortSignal } from 'node-abort-controller';
import { Logger } from 'winston';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
import { sleep } from './util';
Expand All @@ -29,8 +29,6 @@ export class PluginTaskManagerJanitor {
private readonly knex: Knex;
private readonly waitBetweenRuns: Duration;
private readonly logger: Logger;
private readonly abortController: AbortController;
private readonly abortSignal: AbortSignal;

constructor(options: {
knex: Knex;
Expand All @@ -40,26 +38,20 @@ export class PluginTaskManagerJanitor {
this.knex = options.knex;
this.waitBetweenRuns = options.waitBetweenRuns;
this.logger = options.logger;
this.abortController = new AbortController();
this.abortSignal = this.abortController.signal;
}

async start() {
while (!this.abortSignal.aborted) {
async start(abortSignal?: AbortSignal) {
while (!abortSignal?.aborted) {
try {
await this.runOnce();
} catch (e) {
this.logger.warn(`Error while performing janitorial tasks, ${e}`);
}

await sleep(this.waitBetweenRuns, this.abortSignal);
await sleep(this.waitBetweenRuns, abortSignal);
}
}

async stop() {
this.abortController.abort();
}

private async runOnce() {
// SQLite currently (Oct 1 2021) returns a number for returning()
// statements, effectively ignoring them and instead returning the outcome
Expand All @@ -68,9 +60,15 @@ export class PluginTaskManagerJanitor {
// https://github.com/knex/knex/issues/4370
// https://github.com/mapbox/node-sqlite3/issues/1453

const dbNull = this.knex.raw('null');

const tasksQuery = this.knex<DbTasksRow>(DB_TASKS_TABLE)
.where('current_run_expires_at', '<', this.knex.fn.now())
.delete();
.update({
current_run_ticket: dbNull,
current_run_started_at: dbNull,
current_run_expires_at: dbNull,
});

if (this.knex.client.config.client === 'sqlite3') {
const tasks = await tasksQuery;
Expand Down
11 changes: 8 additions & 3 deletions packages/backend-tasks/src/tasks/TaskManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { DatabaseManager, getVoidLogger } from '@backstage/backend-common';
import { TestDatabaseId, TestDatabases } from '@backstage/backend-test-utils';
import { Duration } from 'luxon';
import { TaskManager } from './TaskManager';
import waitForExpect from 'wait-for-expect';

describe('TaskManager', () => {
const logger = getVoidLogger();
Expand All @@ -42,14 +43,18 @@ describe('TaskManager', () => {
async databaseId => {
const database = await createDatabase(databaseId);
const manager = new TaskManager(database, logger).forPlugin('test');
const fn = jest.fn();

const task = await manager.scheduleTask({
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
fn: () => {},
fn,
});

await waitForExpect(() => {
expect(fn).toBeCalled();
});
expect(task.unschedule).toBeDefined();
},
);
});
24 changes: 9 additions & 15 deletions packages/backend-tasks/src/tasks/TaskWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import { Knex } from 'knex';
import { Duration } from 'luxon';
import { AbortController } from 'node-abort-controller';
import { AbortSignal } from 'node-abort-controller';
import { v4 as uuid } from 'uuid';
import { Logger } from 'winston';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
Expand All @@ -35,17 +35,15 @@ export class TaskWorker {
private readonly fn: TaskFunction;
private readonly knex: Knex;
private readonly logger: Logger;
private readonly abortController: AbortController;

constructor(taskId: string, fn: TaskFunction, knex: Knex, logger: Logger) {
this.taskId = taskId;
this.fn = fn;
this.knex = knex;
this.logger = logger;
this.abortController = new AbortController();
}

async start(settings: TaskSettingsV1) {
async start(settings: TaskSettingsV1, options?: { signal?: AbortSignal }) {
try {
await this.persistTask(settings);
} catch (e) {
Expand All @@ -58,13 +56,13 @@ export class TaskWorker {

(async () => {
try {
while (!this.abortController.signal.aborted) {
const runResult = await this.runOnce();
while (!options?.signal?.aborted) {
const runResult = await this.runOnce(options?.signal);
if (runResult.result === 'abort') {
break;
}

await sleep(WORK_CHECK_FREQUENCY, this.abortController.signal);
await sleep(WORK_CHECK_FREQUENCY, options?.signal);
}
this.logger.info(`Task worker finished: ${this.taskId}`);
} catch (e) {
Expand All @@ -73,16 +71,14 @@ export class TaskWorker {
})();
}

stop() {
this.abortController.abort();
}

/**
* Makes a single attempt at running the task to completion, if ready.
*
* @returns The outcome of the attempt
*/
async runOnce(): Promise<
async runOnce(
signal?: AbortSignal,
): Promise<
| { result: 'not-ready-yet' }
| { result: 'abort' }
| { result: 'failed' }
Expand All @@ -106,9 +102,7 @@ export class TaskWorker {

// Abort the task execution either if the worker is stopped, or if the
// task timeout is hit
const taskAbortController = delegateAbortController(
this.abortController.signal,
);
const taskAbortController = delegateAbortController(signal);
const timeoutHandle = setTimeout(() => {
taskAbortController.abort();
}, Duration.fromISO(taskSettings.timeoutAfterDuration).as('milliseconds'));
Expand Down
13 changes: 7 additions & 6 deletions packages/backend-tasks/src/tasks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ export interface TaskDefinition {
*/
fn: TaskFunction;

/**
* An abort signal that, when triggered, will stop the recurring execution of
* the task.
*/
signal?: AbortSignal;

/**
* The maximum amount of time that a single task invocation can take, before
* it's considered timed out and gets "released" such that a new invocation
Expand Down Expand Up @@ -93,13 +99,8 @@ export interface PluginTaskManager {
* continue from there.
*
* @param definition - The task definition
* @returns An `unschedule` function that can be used to stop the task
* invocations later on. This removes the task entirely from storage
* and stops its invocations across all workers.
*/
scheduleTask(
task: TaskDefinition,
): Promise<{ unschedule: () => Promise<void> }>;
scheduleTask(task: TaskDefinition): Promise<void>;
}

function isValidOptionalDurationString(d: string | undefined): boolean {
Expand Down
24 changes: 13 additions & 11 deletions packages/backend-tasks/src/tasks/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,24 @@ export async function sleep(
*
* @param parent - The "parent" signal that can trigger the delegate
*/
export function delegateAbortController(parent: AbortSignal): AbortController {
export function delegateAbortController(parent?: AbortSignal): AbortController {
const delegate = new AbortController();

if (parent.aborted) {
delegate.abort();
} else {
const onParentAborted = () => {
if (parent) {
if (parent.aborted) {
delegate.abort();
};
} else {
const onParentAborted = () => {
delegate.abort();
};

const onChildAborted = () => {
parent.removeEventListener('abort', onParentAborted);
};
const onChildAborted = () => {
parent.removeEventListener('abort', onParentAborted);
};

parent.addEventListener('abort', onParentAborted, { once: true });
delegate.signal.addEventListener('abort', onChildAborted, { once: true });
parent.addEventListener('abort', onParentAborted, { once: true });
delegate.signal.addEventListener('abort', onChildAborted, { once: true });
}
}

return delegate;
Expand Down

0 comments on commit 26cf7f6

Please sign in to comment.