Skip to content

Commit

Permalink
Get most of the task worker code into place
Browse files Browse the repository at this point in the history
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
  • Loading branch information
freben committed Nov 11, 2021
1 parent d088c7f commit d4f412f
Show file tree
Hide file tree
Showing 15 changed files with 997 additions and 180 deletions.
47 changes: 47 additions & 0 deletions packages/backend-common/api-report.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { BitbucketIntegration } from '@backstage/integration';
import { Config } from '@backstage/config';
import cors from 'cors';
import Docker from 'dockerode';
import { Duration } from 'luxon';
import { ErrorRequestHandler } from 'express';
import express from 'express';
import { GithubCredentialsProvider } from '@backstage/integration';
Expand Down Expand Up @@ -399,6 +400,37 @@ export type PluginEndpointDiscovery = {
getExternalBaseUrl(pluginId: string): Promise<string>;
};

// @public
export interface PluginTaskManager {
// (undocumented)
acquireLock(
id: string,
options: {
timeout: Duration;
},
): Promise<
| {
acquired: false;
}
| {
acquired: true;
release: () => void | Promise<void>;
}
>;
// (undocumented)
scheduleTask(
id: string,
options: {
timeout: Duration;
frequency: Duration;
initialDelay?: Duration;
},
fn: () => Promise<void>,
): Promise<{
unschedule: () => Promise<void>;
}>;
}

// @public
export type ReaderFactory = (options: {
config: Config;
Expand Down Expand Up @@ -576,6 +608,21 @@ export interface StatusCheckHandlerOptions {
statusCheck?: StatusCheck;
}

// @public
export class TaskManager {
constructor(databaseManager: DatabaseManager, logger: Logger_2);
// (undocumented)
forPlugin(pluginId: string): PluginTaskManager;
// (undocumented)
static fromConfig(
config: Config,
options?: {
databaseManager?: DatabaseManager;
logger?: Logger_2;
},
): TaskManager;
}

// @public
export type UrlReader = {
read(url: string): Promise<Buffer>;
Expand Down
89 changes: 60 additions & 29 deletions packages/backend-common/migrations/20210928160613_init.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,29 @@
*/
exports.up = async function up(knex) {
//
// locking
// mutexes
//
await knex.schema.createTable(
'backstage_backend_common__task_locks',
table => {
table.comment('Locks used for mutual exclusion among multiple workers');
table
.text('id')
.primary()
.notNullable()
.comment('The unique id of this particular lock');
table
.text('acquired_ticket')
.nullable()
.comment('A unique ticket for the current lock acquiral, if any');
table
.dateTime('acquired_at')
.nullable()
.comment('The time when the lock was acquired, if locked');
table
.dateTime('expires_at')
.nullable()
.comment('The time when an acquired lock will time out and expire');
table.index('id', 'task_locks_id_idx');
},
);
await knex.schema.createTable('backstage_backend_common__mutexes', table => {
table.comment('Locks used for mutual exclusion among multiple workers');
table
.text('id')
.primary()
.notNullable()
.comment('The unique ID of this particular mutex');
table
.text('current_lock_ticket')
.nullable()
.comment('A unique ticket for the current mutex lock');
table
.dateTime('current_lock_acquired_at')
.nullable()
.comment('The time when the mutex was locked');
table
.dateTime('current_lock_expires_at')
.nullable()
.comment('The time when a locked mutex will time out and auto-release');
table.index(['id'], 'backstage_backend_common__mutexes__id_idx');
});
//
// tasks
//
Expand All @@ -56,16 +53,50 @@ exports.up = async function up(knex) {
.text('id')
.primary()
.notNullable()
.comment('The unique id of this particular task');
.comment('The unique ID of this particular task');
table
.text('settings_json')
.notNullable()
.comment('JSON serialized object with properties for this task');
table
.dateTime('next_run_start_at')
.nullable()
.comment('The next time that the task should be started');
table
.text('current_run_ticket')
.nullable()
.comment('A unique ticket for the current task run');
table
.dateTime('current_run_started_at')
.nullable()
.comment('The time that the current task run started');
table
.dateTime('current_run_expires_at')
.nullable()
.comment('The time that the current task run will time out');
table.index(['id'], 'backstage_backend_common__tasks__id_idx');
});
};

/**
* @param {import('knex').Knex} knex
*/
exports.down = async function down(knex) {
await knex.schema.alterTable('task_locks', table => {
table.dropIndex([], 'task_locks_id_idx');
//
// tasks
//
await knex.schema.alterTable('backstage_backend_common__tasks', table => {
table.dropIndex([], 'backstage_backend_common__tasks__id_idx');
});
await knex.schema.dropTable('task_locks');
await knex.schema.dropTable('backstage_backend_common__tasks');
//
// locks
//
await knex.schema.alterTable(
'backstage_backend_common__task_locks',
table => {
table.dropIndex([], 'backstage_backend_common__task_locks__id_idx');
},
);
await knex.schema.dropTable('backstage_backend_common__task_locks');
};
6 changes: 4 additions & 2 deletions packages/backend-common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
"unzipper": "^0.10.11",
"uuid": "^8.0.0",
"winston": "^3.2.1",
"yn": "^4.0.0"
"yn": "^4.0.0",
"zod": "^3.9.5"
},
"peerDependencies": {
"pg-connection-string": "^2.3.0"
Expand Down Expand Up @@ -107,7 +108,8 @@
"msw": "^0.35.0",
"mysql2": "^2.2.5",
"recursive-readdir": "^2.2.2",
"supertest": "^6.1.3"
"supertest": "^6.1.3",
"wait-for-expect": "^3.0.2"
},
"files": [
"dist",
Expand Down
20 changes: 16 additions & 4 deletions packages/backend-common/src/database/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,21 @@
* limitations under the License.
*/

export type DbTaskLocksRow = {
export const DB_MUTEXES_TABLE = 'backstage_backend_common__mutexes';
export const DB_TASKS_TABLE = 'backstage_backend_common__tasks';

export type DbMutexesRow = {
id: string;
current_lock_ticket?: string;
current_lock_acquired_at?: Date | string;
current_lock_expires_at?: Date | string;
};

export type DbTasksRow = {
id: string;
acquired_ticket?: string;
acquired_at?: Date;
expires_at?: Date;
settings_json: string;
next_run_start_at?: Date | string;
current_run_ticket?: string;
current_run_started_at?: Date | string;
current_run_expires_at?: Date | string;
};
48 changes: 48 additions & 0 deletions packages/backend-common/src/tasks/CancelToken.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export class CancelToken {
// @ts-ignore: is actually assigned by the Promise constructor
#cancel: () => void;
#isCancelled: boolean;
#cancelPromise: Promise<void>;

static create(): CancelToken {
return new CancelToken();
}

private constructor() {
this.#isCancelled = false;
this.#cancelPromise = new Promise(resolve => {
this.#cancel = () => {
this.#isCancelled = true;
resolve();
};
});
}

cancel(): void {
this.#cancel();
}

get isCancelled(): boolean {
return this.#isCancelled;
}

get promise(): Promise<void> {
return this.#cancelPromise;
}
}
67 changes: 67 additions & 0 deletions packages/backend-common/src/tasks/PluginTaskManagerImpl.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { TestDatabases } from '@backstage/backend-test-utils';
import { Duration } from 'luxon';
import { migrateBackendCommon } from '../database/migrateBackendCommon';
import { getVoidLogger } from '../logging';
import { PluginTaskManagerImpl } from './PluginTaskManagerImpl';

describe('PluginTaskManagerImpl', () => {
const databases = TestDatabases.create({
ids: ['POSTGRES_13', 'POSTGRES_9', 'SQLITE_3'],
});

describe('locking', () => {
it.each(databases.eachSupportedId())(
'can run the happy path, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendCommon(knex);

const manager = new PluginTaskManagerImpl(
async () => knex,
getVoidLogger(),
);

const lock1 = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});
const lock2 = await manager.acquireLock('lock2', {
timeout: Duration.fromMillis(5000),
});

expect(lock1.acquired).toBe(true);
expect(lock2.acquired).toBe(true);

await expect(
manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
}),
).resolves.toEqual({ acquired: false });

await (lock1 as any).release();
await (lock2 as any).release();

const lock1Again = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});
expect(lock1Again.acquired).toBe(true);
await (lock1Again as any).release();
},
);
});
});
Loading

0 comments on commit d4f412f

Please sign in to comment.