Skip to content

Commit

Permalink
make timeout and frequency mandatory
Browse files Browse the repository at this point in the history
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
  • Loading branch information
freben committed Nov 11, 2021
1 parent aeaa2fe commit cc3b6c2
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 53 deletions.
10 changes: 9 additions & 1 deletion packages/backend-tasks/src/tasks/PluginTaskManagerImpl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { getVoidLogger } from '@backstage/backend-common';
import { TestDatabaseId, TestDatabases } from '@backstage/backend-test-utils';
import { Duration } from 'luxon';
import waitForExpect from 'wait-for-expect';
import { migrateBackendTasks } from '../database/migrateBackendTasks';
import { PluginTaskManagerImpl } from './PluginTaskManagerImpl';
Expand Down Expand Up @@ -44,7 +45,14 @@ describe('PluginTaskManagerImpl', () => {
const { manager } = await init(databaseId);

const fn = jest.fn();
const { unschedule } = await manager.scheduleTask('task1', {}, fn);
const { unschedule } = await manager.scheduleTask(
'task1',
{
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
},
fn,
);

await waitForExpect(() => {
expect(fn).toBeCalled();
Expand Down
4 changes: 2 additions & 2 deletions packages/backend-tasks/src/tasks/PluginTaskManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ export class PluginTaskManagerImpl implements PluginTaskManager {
await task.start({
version: 1,
initialDelayDuration: options.initialDelay?.toISO(),
recurringAtMostEveryDuration: options.frequency?.toISO(),
timeoutAfterDuration: options.timeout?.toISO(),
recurringAtMostEveryDuration: options.frequency.toISO(),
timeoutAfterDuration: options.timeout.toISO(),
});

return {
Expand Down
1 change: 1 addition & 0 deletions packages/backend-tasks/src/tasks/TaskManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ describe('TaskManager', () => {
'task1',
{
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
},
() => {},
);
Expand Down
52 changes: 20 additions & 32 deletions packages/backend-tasks/src/tasks/TaskWorker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('TaskWorker', () => {
});

it.each(databases.eachSupportedId())(
'can run a single task to completion, %p',
'goes through the expected states, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
Expand All @@ -44,32 +44,9 @@ describe('TaskWorker', () => {
);
const settings: TaskSettingsV1 = {
version: 1,
};

const worker = new TaskWorker('task1', fn, knex, logger);
await worker.start(settings);

waitForExpect(() => {
expect(fn).toBeCalledTimes(1);
});
},
60_000,
);

it.each(databases.eachSupportedId())(
'goes through the expected states for a single run, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);

const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const settings: TaskSettingsV1 = {
version: 1,
initialDelayDuration: Duration.fromObject({ seconds: 1 }).toISO(),
recurringAtMostEveryDuration: undefined,
timeoutAfterDuration: undefined,
initialDelayDuration: Duration.fromMillis(1000).toISO(),
recurringAtMostEveryDuration: Duration.fromMillis(2000).toISO(),
timeoutAfterDuration: Duration.fromMillis(60000).toISO(),
};

const worker = new TaskWorker('task1', fn, knex, logger);
Expand All @@ -87,6 +64,8 @@ describe('TaskWorker', () => {
expect(JSON.parse(row.settings_json)).toEqual({
version: 1,
initialDelayDuration: 'PT1S',
recurringAtMostEveryDuration: 'PT2S',
timeoutAfterDuration: 'PT60S',
});

await expect(worker.findReadyTask()).resolves.toEqual({
Expand Down Expand Up @@ -117,7 +96,7 @@ describe('TaskWorker', () => {
id: 'task1',
current_run_ticket: 'ticket',
current_run_started_at: expect.anything(),
current_run_expires_at: null,
current_run_expires_at: expect.anything(),
}),
);

Expand All @@ -126,7 +105,14 @@ describe('TaskWorker', () => {
);

row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row).toBeUndefined();
expect(row).toEqual(
expect.objectContaining({
id: 'task1',
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
}),
);
},
60_000,
);
Expand All @@ -142,7 +128,7 @@ describe('TaskWorker', () => {
version: 1,
initialDelayDuration: undefined,
recurringAtMostEveryDuration: Duration.fromMillis(0).toISO(),
timeoutAfterDuration: undefined,
timeoutAfterDuration: Duration.fromMillis(60000).toISO(),
};

const worker = new TaskWorker('task1', fn, knex, logger);
Expand All @@ -167,6 +153,7 @@ describe('TaskWorker', () => {
const settings: TaskSettingsV1 = {
version: 1,
recurringAtMostEveryDuration: Duration.fromMillis(0).toISO(),
timeoutAfterDuration: Duration.fromMillis(60000).toISO(),
};

const worker = new TaskWorker('task1', fn, knex, logger);
Expand All @@ -183,7 +170,7 @@ describe('TaskWorker', () => {
id: 'task1',
current_run_ticket: 'ticket',
current_run_started_at: expect.anything(),
current_run_expires_at: null,
current_run_expires_at: expect.anything(),
}),
);

Expand All @@ -201,7 +188,7 @@ describe('TaskWorker', () => {
id: 'task1',
current_run_ticket: 'stolen',
current_run_started_at: expect.anything(),
current_run_expires_at: null,
current_run_expires_at: expect.anything(),
}),
);
},
Expand All @@ -218,6 +205,7 @@ describe('TaskWorker', () => {
const settings: TaskSettingsV1 = {
version: 1,
recurringAtMostEveryDuration: Duration.fromMillis(0).toISO(),
timeoutAfterDuration: Duration.fromMillis(60000).toISO(),
};

const worker1 = new TaskWorker('task1', fn, knex, logger);
Expand Down
14 changes: 0 additions & 14 deletions packages/backend-tasks/src/tasks/TaskWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ export class TaskWorker {
if (runResult.result === 'abort') {
break;
}
if (!settings.recurringAtMostEveryDuration) {
break;
}

await this.sleep(WORK_CHECK_FREQUENCY);
}
Expand Down Expand Up @@ -240,17 +237,6 @@ export class TaskWorker {
): Promise<boolean> {
const { recurringAtMostEveryDuration } = settings;

// If this is not a recurring task, and we still have the current run
// ticket, delete it from the table
if (recurringAtMostEveryDuration === undefined) {
const rows = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.where('id', '=', this.taskId)
.where('current_run_ticket', '=', ticket)
.delete();

return rows === 1;
}

// We make an effort to keep the datetime calculations in the database
// layer, making sure to not have to perform conversions back and forth and
// leaning on the database as a central clock source
Expand Down
6 changes: 2 additions & 4 deletions packages/backend-tasks/src/tasks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface TaskOptions {
* If no value is given for this field then there is no timeout. This is
* potentially dangerous.
*/
timeout?: Duration;
timeout: Duration;

/**
* The amount of time that should pass between task invocation starts.
Expand All @@ -48,7 +48,7 @@ export interface TaskOptions {
* If no value is given for this field then the task will only be invoked
* once (on any worker) and then unscheduled automatically.
*/
frequency?: Duration;
frequency: Duration;

/**
* The amount of time that should pass before the first invocation happens.
Expand Down Expand Up @@ -107,11 +107,9 @@ export const taskSettingsV1Schema = z.object({
.refine(isValidOptionalDurationString, { message: 'Invalid duration' }),
recurringAtMostEveryDuration: z
.string()
.optional()
.refine(isValidOptionalDurationString, { message: 'Invalid duration' }),
timeoutAfterDuration: z
.string()
.optional()
.refine(isValidOptionalDurationString, { message: 'Invalid duration' }),
});

Expand Down

0 comments on commit cc3b6c2

Please sign in to comment.