Skip to content

Commit

Permalink
remove the lock functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
  • Loading branch information
freben committed Nov 11, 2021
1 parent 67456f7 commit 3f12371
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 253 deletions.
17 changes: 0 additions & 17 deletions packages/backend-tasks/api-report.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,8 @@ import { DatabaseManager } from '@backstage/backend-common';
import { Duration } from 'luxon';
import { Logger as Logger_2 } from 'winston';

// @public
export interface LockOptions {
timeout: Duration;
}

// @public
export interface PluginTaskManager {
acquireLock(
id: string,
options: LockOptions,
): Promise<
| {
acquired: false;
}
| {
acquired: true;
release(): Promise<void>;
}
>;
scheduleTask(
id: string,
options: TaskOptions,
Expand Down
31 changes: 0 additions & 31 deletions packages/backend-tasks/migrations/20210928160613_init.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,6 @@
* @param {import('knex').Knex} knex
*/
exports.up = async function up(knex) {
//
// mutexes
//
await knex.schema.createTable('backstage_backend_tasks__mutexes', table => {
table.comment('Locks used for mutual exclusion among multiple workers');
table
.text('id')
.primary()
.notNullable()
.comment('The unique ID of this particular mutex');
table
.text('current_lock_ticket')
.notNullable()
.comment('A unique ticket for the current mutex lock');
table
.dateTime('current_lock_acquired_at')
.nullable()
.comment('The time when the mutex was locked');
table
.dateTime('current_lock_expires_at')
.nullable()
.comment('The time when a locked mutex will time out and auto-release');
table.index(['id'], 'backstage_backend_tasks__mutexes__id_idx');
});
//
// tasks
//
Expand Down Expand Up @@ -89,11 +65,4 @@ exports.down = async function down(knex) {
table.dropIndex([], 'backstage_backend_tasks__tasks__id_idx');
});
await knex.schema.dropTable('backstage_backend_tasks__tasks');
//
// locks
//
await knex.schema.alterTable('backstage_backend_tasks__task_locks', table => {
table.dropIndex([], 'backstage_backend_tasks__task_locks__id_idx');
});
await knex.schema.dropTable('backstage_backend_tasks__task_locks');
};
8 changes: 0 additions & 8 deletions packages/backend-tasks/src/database/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,8 @@
*/

export const DB_MIGRATIONS_TABLE = 'backstage_backend_tasks__knex_migrations';
export const DB_MUTEXES_TABLE = 'backstage_backend_tasks__mutexes';
export const DB_TASKS_TABLE = 'backstage_backend_tasks__tasks';

export type DbMutexesRow = {
id: string;
current_lock_ticket: string;
current_lock_acquired_at?: Date | string;
current_lock_expires_at?: Date | string;
};

export type DbTasksRow = {
id: string;
settings_json: string;
Expand Down
81 changes: 0 additions & 81 deletions packages/backend-tasks/src/tasks/PluginTaskManagerImpl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import { getVoidLogger } from '@backstage/backend-common';
import { TestDatabaseId, TestDatabases } from '@backstage/backend-test-utils';
import { Duration } from 'luxon';
import waitForExpect from 'wait-for-expect';
import { migrateBackendTasks } from '../database/migrateBackendTasks';
import { PluginTaskManagerImpl } from './PluginTaskManagerImpl';
Expand All @@ -36,86 +35,6 @@ describe('PluginTaskManagerImpl', () => {
return { knex, manager };
}

describe('acquireLock', () => {
it.each(databases.eachSupportedId())(
'can run the happy path, %p',
async databaseId => {
const { manager } = await init(databaseId);

const lock1 = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});
const lock2 = await manager.acquireLock('lock2', {
timeout: Duration.fromMillis(5000),
});

expect(lock1.acquired).toBe(true);
expect(lock2.acquired).toBe(true);

await expect(
manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
}),
).resolves.toEqual({ acquired: false });

await (lock1 as any).release();
await (lock2 as any).release();

const lock1Again = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});
expect(lock1Again.acquired).toBe(true);
await (lock1Again as any).release();
},
);

it.each(databases.eachSupportedId())(
'rejects double lock attempts, %p',
async databaseId => {
const { manager } = await init(databaseId);

const lock1 = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});
const lock2 = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});

expect(lock1.acquired).toBe(true);
expect(lock2.acquired).toBe(false);

await (lock1 as any).release();

const lock1Again = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});
expect(lock1Again.acquired).toBe(true);
await (lock1Again as any).release();
},
);

it.each(databases.eachSupportedId())(
'times out locks, %p',
async databaseId => {
const { manager } = await init(databaseId);

const lock1 = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(200),
});

expect(lock1.acquired).toBe(true);

await new Promise(resolve => setTimeout(resolve, 1000));

const lock2 = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});
expect(lock2.acquired).toBe(true);
await (lock2 as any).release();
},
);
});

// This is just to test the wrapper code; most of the actual tests are in
// TaskWorker.test.ts
describe('scheduleTask', () => {
Expand Down
58 changes: 2 additions & 56 deletions packages/backend-tasks/src/tasks/PluginTaskManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
* limitations under the License.
*/

import { isDatabaseConflictError } from '@backstage/backend-common';
import { Knex } from 'knex';
import { v4 as uuid } from 'uuid';
import { Logger } from 'winston';
import { DbMutexesRow, DB_MUTEXES_TABLE } from '../database/tables';
import { TaskWorker } from './TaskWorker';
import { LockOptions, PluginTaskManager, TaskOptions } from './types';
import { nowPlus, validateId } from './util';
import { PluginTaskManager, TaskOptions } from './types';
import { validateId } from './util';

/**
* Implements the actual task management.
Expand All @@ -32,57 +29,6 @@ export class PluginTaskManagerImpl implements PluginTaskManager {
private readonly logger: Logger,
) {}

async acquireLock(
id: string,
options: LockOptions,
): Promise<
{ acquired: false } | { acquired: true; release(): Promise<void> }
> {
validateId(id);

const knex = await this.databaseFactory();
const ticket = uuid();

const release = async () => {
try {
await knex<DbMutexesRow>(DB_MUTEXES_TABLE)
.where('id', '=', id)
.where('current_lock_ticket', '=', ticket)
.delete();
} catch (e) {
this.logger.warn(`Failed to release lock, ${e}`);
}
};

const record: Knex.DbRecord<DbMutexesRow> = {
current_lock_ticket: ticket,
current_lock_acquired_at: knex.fn.now(),
current_lock_expires_at: options.timeout
? nowPlus(options.timeout, knex)
: knex.raw('null'),
};

// First try to overwrite an existing lock, that has timed out
const stolen = await knex<DbMutexesRow>(DB_MUTEXES_TABLE)
.where('id', '=', id)
.where('current_lock_expires_at', '<', knex.fn.now())
.update(record);

if (stolen) {
return { acquired: true, release };
}

try {
await knex<DbMutexesRow>(DB_MUTEXES_TABLE).insert({ id, ...record });
return { acquired: true, release };
} catch (e) {
if (!isDatabaseConflictError(e)) {
this.logger.warn(`Failed to acquire lock, ${e}`);
}
return { acquired: false };
}
}

async scheduleTask(
id: string,
options: TaskOptions,
Expand Down
23 changes: 1 addition & 22 deletions packages/backend-tasks/src/tasks/PluginTaskManagerJanitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@
import { Knex } from 'knex';
import { Duration } from 'luxon';
import { Logger } from 'winston';
import {
DbMutexesRow,
DbTasksRow,
DB_MUTEXES_TABLE,
DB_TASKS_TABLE,
} from '../database/tables';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
import { CancelToken } from './CancelToken';

/**
Expand Down Expand Up @@ -70,22 +65,6 @@ export class PluginTaskManagerJanitor {
// https://github.com/knex/knex/issues/4370
// https://github.com/mapbox/node-sqlite3/issues/1453

const mutexesQuery = this.knex<DbMutexesRow>(DB_MUTEXES_TABLE)
.where('current_lock_expires_at', '<', this.knex.fn.now())
.delete();

if (this.knex.client.config.client === 'sqlite3') {
const mutexes = await mutexesQuery;
if (mutexes > 0) {
this.logger.warn(`${mutexes} mutex locks timed out and were lost`);
}
} else {
const mutexes = await mutexesQuery.returning(['id']);
for (const { id } of mutexes) {
this.logger.warn(`Mutex lock timed out and was lost: ${id}`);
}
}

const tasksQuery = this.knex<DbTasksRow>(DB_TASKS_TABLE)
.where('current_run_expires_at', '<', this.knex.fn.now())
.delete();
Expand Down
12 changes: 8 additions & 4 deletions packages/backend-tasks/src/tasks/TaskManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ describe('TaskManager', () => {
const database = await createDatabase(databaseId);
const manager = new TaskManager(database, logger).forPlugin('test');

const lock = await manager.acquireLock('lock1', {
timeout: Duration.fromMillis(5000),
});
expect(lock.acquired).toBe(true);
const task = await manager.scheduleTask(
'task1',
{
timeout: Duration.fromMillis(5000),
},
() => {},
);
expect(task.unschedule).toBeDefined();
},
);
});
2 changes: 1 addition & 1 deletion packages/backend-tasks/src/tasks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
*/

export { TaskManager } from './TaskManager';
export type { LockOptions, PluginTaskManager, TaskOptions } from './types';
export type { PluginTaskManager, TaskOptions } from './types';
33 changes: 0 additions & 33 deletions packages/backend-tasks/src/tasks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,6 @@
import { Duration } from 'luxon';
import { z } from 'zod';

/**
* Options that apply to the acquiral of a given lock.
*
* @public
*/
export interface LockOptions {
/**
* The maximum amount of time that the lock can be held, before it's
* considered timed out and gets auto-released by the framework.
*/
timeout: Duration;
}

/**
* Options that apply to the invocation of a given task.
*
Expand Down Expand Up @@ -82,26 +69,6 @@ export interface TaskOptions {
* @public
*/
export interface PluginTaskManager {
/**
* Attempts to acquire an exclusive lock.
*
* A lock can only be held by one party at a time. Any subsequent attempts to
* acquire the lock will fail, unless the timeout period has been exceeded or
* the lock was released by the previous holder.
*
* @param id - A unique ID (within the scope of the plugin) for a lock
* @param options - Options for the lock
* @returns The result of the lock attempt. If it was successfully acquired,
* you should remember to call its `release` method as soon as you
* are done with the lock.
*/
acquireLock(
id: string,
options: LockOptions,
): Promise<
{ acquired: false } | { acquired: true; release(): Promise<void> }
>;

/**
* Schedules a task function for coordinated exclusive invocation across
* workers.
Expand Down

0 comments on commit 3f12371

Please sign in to comment.