Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-and-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
tags:
- 'v*'
workflow_dispatch:

jobs:
build_and_push:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
}
},
"queue": {
"jobManagerBaseUrl": "http//localhost:8080",
"heartbeatBaseUrl": "http//localhost:8081",
"jobManagerBaseUrl": "http://localhost:8080",
"heartbeatBaseUrl": "http://localhost:8081",
"heartbeatIntervalMs": 1000,
"dequeueIntervalMs": 3000
},
Expand Down
331 changes: 274 additions & 57 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
},
"dependencies": {
"@godaddy/terminus": "^4.12.1",
"@map-colonies/config": "^3.0.0",
"@map-colonies/config": "^4.0.1",
"@map-colonies/jobnik-sdk": "^0.1.0",
"@map-colonies/js-logger": "^3.0.2",
"@map-colonies/mc-priority-queue": "^9.0.2",
Expand Down
21 changes: 11 additions & 10 deletions src/cleaner/errors/errorHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { inject, injectable } from 'tsyringe';
import { type Logger } from '@map-colonies/js-logger';
import { SERVICES } from '@common/constants';
import type { ErrorContext, ErrorDecision } from '../types';
import { RecoverableError, UnrecoverableError } from './errors';
import { toError, RecoverableError, UnrecoverableError } from './errors';

/**
* ErrorHandler centralizes error handling logic for task processing.
Expand All @@ -20,18 +20,20 @@ export class ErrorHandler {
*/
public handleError(context: ErrorContext): ErrorDecision {
const { jobId, taskId, attemptNumber, maxAttempts, error } = context;
const normalizedError = toError(error); // Normalize error to ensure it's an Error instance

this.logger.error({
msg: 'Task processing error',
jobId,
taskId,
attemptNumber,
maxAttempts,
err: error,
err: normalizedError,
});

const decision = this.evaluateError(context);
const decision = this.evaluateError(context, normalizedError);

this.logDecision(context, decision);
this.logDecision(context, normalizedError, decision);

return decision;
}
Expand All @@ -43,8 +45,8 @@ export class ErrorHandler {
* - RecoverableError: retry if attempts remain
* - Unknown errors: treated as recoverable, retry if attempts remain
*/
private evaluateError(context: ErrorContext): ErrorDecision {
const { attemptNumber, maxAttempts, error } = context;
private evaluateError(context: ErrorContext, error: Error): ErrorDecision {
const { attemptNumber, maxAttempts } = context;

if (error instanceof UnrecoverableError) {
return { shouldRetry: false, reason: `Unrecoverable error-${error.name}: ${error.message}` };
Expand All @@ -61,14 +63,13 @@ export class ErrorHandler {
return { shouldRetry, reason };
}

private logDecision(context: ErrorContext, decision: ErrorDecision): void {
const { jobId, taskId, error } = context;
private logDecision(context: ErrorContext, error: Error, decision: ErrorDecision): void {
const isKnownError = error instanceof RecoverableError || error instanceof UnrecoverableError;

const logData = {
msg: decision.shouldRetry ? 'Task will be retried' : 'Task will be rejected',
jobId,
taskId,
jobId: context.jobId,
taskId: context.taskId,
reason: decision.reason,
};

Expand Down
15 changes: 15 additions & 0 deletions src/cleaner/errors/errors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Safely converts any thrown value to an Error instance.
* Handles the edge case where String(value) itself throws (e.g. a custom toString() that throws).
*/
export function toError(value: unknown): Error {
if (value instanceof Error) {
return value;
}
try {
return new Error(String(value));
} catch {
return new Error('non-serializable thrown value');
}
}

/**
* Base class for recoverable errors that can be retried.
* Task will be retried if attempts < maxAttempts.
Expand Down
2 changes: 1 addition & 1 deletion src/cleaner/errors/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { RecoverableError, UnrecoverableError, ConfigurationError, ValidationError, StrategyNotFoundError } from './errors';
export { toError, RecoverableError, UnrecoverableError, ConfigurationError, ValidationError, StrategyNotFoundError } from './errors';
export { ErrorHandler } from './errorHandler';
2 changes: 1 addition & 1 deletion src/cleaner/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export interface ErrorContext {
taskId: string;
attemptNumber: number;
maxAttempts: number;
error: Error;
error: unknown;
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/cleaner/utils/validationHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { ValidationError } from '../errors';
* @throws {ValidationError} If validation fails
*/
export function validateSchema<T>(schema: ZodSchema<T>, params: unknown, logger: Logger): T {
console.debug('Validating task parameters', { schema: schema.description, params });
const result = schema.safeParse(params);

if (!result.success) {
Expand Down
1 change: 1 addition & 0 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export const SERVICES = {
ERROR_HANDLER: Symbol('ErrorHandler'),
STRATEGY_FACTORY: Symbol('StrategyFactory'),
TASK_VALIDATOR: Symbol('TaskValidator'),
POLLING_PAIRS: Symbol('PollingPairs'),
// =============================================================================
// TODO: When we move to the new job-manager, we will use @map-colonies/jobnik-sdk
// The tokens below are kept for future migration.
Expand Down
35 changes: 34 additions & 1 deletion src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import { instancePerContainerCachingFactory } from 'tsyringe';
import { DependencyContainer } from 'tsyringe/dist/typings/types';
import jsLogger, { Logger } from '@map-colonies/js-logger';
import { IWorker, JobnikSDK } from '@map-colonies/jobnik-sdk';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { InjectionObject, registerDependencies } from '@common/dependencyRegistration';
import { SERVICES, SERVICE_NAME } from '@common/constants';
import { getTracing } from '@common/tracing';
import type { QueueConfig } from './cleaner/types';
import { ConfigType, getConfig } from './common/config';
import { workerBuilder } from './worker';
import { StrategyFactory } from './cleaner/strategies';
import { StrategyFactory, TilesDeletionStrategy } from './cleaner/strategies';
import { ErrorHandler } from './cleaner/errors';

export interface RegisterOptions {
override?: InjectionObject<unknown>[];
Expand Down Expand Up @@ -49,6 +52,24 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
}),
},
},
{
token: SERVICES.QUEUE_CLIENT,
provider: {
useFactory: instancePerContainerCachingFactory((container) => {
const logger = container.resolve<Logger>(SERVICES.LOGGER);
const config = container.resolve<ConfigType>(SERVICES.CONFIG);
const queueConfig = config.get('queue') as QueueConfig;

return new QueueClient(
logger,
queueConfig.jobManagerBaseUrl,
queueConfig.heartbeatBaseUrl,
queueConfig.dequeueIntervalMs,
queueConfig.heartbeatIntervalMs
);
}),
},
},
{
token: SERVICES.WORKER,
provider: {
Expand All @@ -61,6 +82,18 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
useClass: StrategyFactory,
},
},
{
token: SERVICES.ERROR_HANDLER,
provider: {
useClass: ErrorHandler,
},
},
{
token: configInstance.get('jobDefinitions.tasks.tilesDeletion.type') as unknown as string, //TODO: when we create worker config schema we can move this to a constant and remove the cast
provider: {
useClass: TilesDeletionStrategy,
},
},
{
token: 'onSignal',
provider: {
Expand Down
15 changes: 1 addition & 14 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1 @@
import type { IWorker } from '@map-colonies/jobnik-sdk';
import type { FactoryFunction } from 'tsyringe';

/**
* Worker factory function.
* TODO: Implement worker creation once TaskPoller is implemented.
* For now, this is a stub to satisfy the containerConfig registration.
*/
export const workerBuilder: FactoryFunction<IWorker> = () => {
// TODO: Replace with actual worker implementation
// When TaskPoller is ready, this will create and configure the worker
// For the skeleton, we return a minimal stub
throw new Error('Worker not implemented - TaskPoller integration pending');
};
export { workerBuilder } from './worker/workerBuilder';
142 changes: 142 additions & 0 deletions src/worker/taskPoller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { setTimeout as sleep } from 'timers/promises';
import { inject, injectable } from 'tsyringe';
import type { Logger } from '@map-colonies/js-logger';
import type { TaskHandler as QueueClient, ITaskResponse } from '@map-colonies/mc-priority-queue';
import type { IWorker } from '@map-colonies/jobnik-sdk';
import { SERVICES } from '@common/constants';
import type { ConfigType } from '@common/config';
import type { PollingPairConfig } from '../cleaner/types';
import type { StrategyFactory } from '../cleaner/strategies';
import { UnrecoverableError, type ErrorHandler } from '../cleaner/errors';

/**
* TaskPoller - Simple bridge to implement IWorker using the old mc-priority-queue SDK
*/
@injectable()
export class TaskPoller implements IWorker {
private shouldStop = false;
private readonly dequeueIntervalMs: number;

public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.CONFIG) config: ConfigType,
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient,
@inject(SERVICES.STRATEGY_FACTORY) private readonly strategyFactory: StrategyFactory,
@inject(SERVICES.ERROR_HANDLER) private readonly errorHandler: ErrorHandler,
@inject(SERVICES.POLLING_PAIRS) private readonly pollingPairs: PollingPairConfig[]
) {
this.dequeueIntervalMs = config.get('queue.dequeueIntervalMs') as unknown as number; //TODO:when we create worker config schema we can remove the cast
}

public async start(): Promise<void> {
this.shouldStop = false;
await this.poll();
}

public async stop(): Promise<void> {
this.shouldStop = true;
await Promise.resolve();
}

// IWorker event methods - delegated to internal EventEmitter (no-op since nothing listens)
public on(): this {
return this;
}

public off(): this {
return this;
}

public once(): this {
return this;
}

public removeAllListeners(): this {
return this;
}

private async poll(): Promise<void> {
while (!this.shouldStop) {
const result = await this.tryDequeue();

if (!result) {
await sleep(this.dequeueIntervalMs);
continue;
}

await this.processTask(result);
}
}

private async tryDequeue(): Promise<
| {
task: ITaskResponse<unknown>;
pair: PollingPairConfig;
}
| undefined
> {
for (const pair of this.pollingPairs) {
if (this.shouldStop) {
return undefined;
}

try {
const task = await this.queueClient.dequeue<unknown>(pair.jobType, pair.taskType);
if (task) {
this.logger.info({ msg: 'Task dequeued', taskId: task.id, taskType: task.type, jobId: task.jobId });
return { task, pair };
}
} catch (error) {
this.logger.error({ msg: 'Dequeue error', error });
}
}

return undefined;
}

private async processTask(dequeued: { task: ITaskResponse<unknown>; pair: PollingPairConfig }): Promise<void> {
const { task, pair } = dequeued;
const startTime = Date.now();

this.logger.debug({ msg: 'Task started', taskId: task.id, jobId: task.jobId });

try {
if (task.attempts >= pair.maxAttempts) {
throw new UnrecoverableError(`Task exceeded max attempts: ${task.attempts}/${pair.maxAttempts}`);
}

const strategy = this.strategyFactory.resolveWithContext({
jobId: task.jobId,
taskId: task.id,
jobType: pair.jobType,
taskType: pair.taskType,
});

const validated = strategy.validate(task.parameters);
await strategy.execute(validated);

await this.queueClient.ack(task.jobId, task.id);

const durationMs = Date.now() - startTime;
this.logger.info({ msg: 'Task completed', taskId: task.id, durationMs });
} catch (error) {
await this.handleTaskFailure(error, task, pair);
}
}

private async handleTaskFailure(error: unknown, task: ITaskResponse<unknown>, pair: PollingPairConfig): Promise<void> {
const decision = this.errorHandler.handleError({
jobId: task.jobId,
taskId: task.id,
attemptNumber: task.attempts,
maxAttempts: pair.maxAttempts,
error,
});

try {
await this.queueClient.reject(task.jobId, task.id, decision.shouldRetry, decision.reason);
} catch (error) {
this.logger.error({ msg: 'Failed to reject task', taskId: task.id, error });
}
}
}
20 changes: 20 additions & 0 deletions src/worker/workerBuilder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import type { FactoryFunction } from 'tsyringe';
import type { IWorker } from '@map-colonies/jobnik-sdk';
import { SERVICES } from '@common/constants';
import type { ConfigType } from '@common/config';
import type { WorkerCapabilities, JobDefinitions, PollingPairConfig } from '../cleaner/types';
import { buildPollingPairs } from '../cleaner/utils';
import { TaskPoller } from './taskPoller';

export const workerBuilder: FactoryFunction<IWorker> = (container) => {
const config = container.resolve<ConfigType>(SERVICES.CONFIG);

const jobDefinitions = config.get('jobDefinitions') as JobDefinitions; //TODO:when we create worker config schema we can remove the cast
const workerCapabilities = config.get('worker.capabilities') as unknown as WorkerCapabilities; //TODO:when we create worker config schema we can remove the cast

const pollingPairs = buildPollingPairs(jobDefinitions, workerCapabilities);

container.register<PollingPairConfig[]>(SERVICES.POLLING_PAIRS, { useValue: pollingPairs });

return container.resolve(TaskPoller);
};
Loading
Loading