Skip to content

Commit

Permalink
feat(core): Add Job Summary to Worker response (#7360)
Browse files Browse the repository at this point in the history
  • Loading branch information
flipswitchingmonkey authored Oct 6, 2023
1 parent c8c14ca commit b8608ce
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 14 deletions.
17 changes: 17 additions & 0 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessage
import { IConfig } from '@oclif/config';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import type { WorkerJobStatusSummary } from '../services/orchestration/worker/types';

export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker';
Expand All @@ -56,6 +57,10 @@ export class Worker extends BaseCommand {
[key: string]: PCancelable<IRun>;
} = {};

static runningJobsSummary: {
[jobId: string]: WorkerJobStatusSummary;
} = {};

static jobQueue: JobQueue;

redisSubscriber: RedisServicePubSubSubscriber;
Expand Down Expand Up @@ -232,11 +237,22 @@ export class Worker extends BaseCommand {
}

Worker.runningJobs[job.id] = workflowRun;
Worker.runningJobsSummary[job.id] = {
jobId: job.id.toString(),
executionId,
workflowId: fullExecutionData.workflowId ?? '',
workflowName: fullExecutionData.workflowData.name,
mode: fullExecutionData.mode,
startedAt: fullExecutionData.startedAt,
retryOf: fullExecutionData.retryOf ?? '',
status: fullExecutionData.status,
};

// Wait till the execution is finished
await workflowRun;

delete Worker.runningJobs[job.id];
delete Worker.runningJobsSummary[job.id];

// do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution()
// already!
Expand Down Expand Up @@ -305,6 +321,7 @@ export class Worker extends BaseCommand {
instanceId: this.instanceId,
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs),
getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary),
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Container from 'typedi';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker';
import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';

export abstract class OrchestrationHandlerService {
protected initialized = false;
Expand Down
7 changes: 7 additions & 0 deletions packages/cli/src/services/orchestration/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LoggerProxy, jsonParse } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
import * as os from 'os';

export interface RedisServiceCommandLastReceived {
[date: string]: Date;
Expand Down Expand Up @@ -31,3 +32,9 @@ export function debounceMessageReceiver(message: RedisServiceCommandObject, time
lastReceived[message.command] = now;
return true;
}

export function getOsCpuString(): string {
const cpus = os.cpus();
if (cpus.length === 0) return 'no CPU info';
return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ export async function handleWorkerResponseMessageMain(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) {
// TODO: Handle worker response
LoggerProxy.debug('Received worker response', workerResponse);
LoggerProxy.debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
}
return workerResponse;
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import * as os from 'os';
import Container from 'typedi';
import { License } from '@/License';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { debounceMessageReceiver } from '../helpers';

export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
}
import { debounceMessageReceiver, getOsCpuString } from '../helpers';
import type { WorkerCommandReceivedHandlerOptions } from './types';

export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
return async (channel: string, messageString: string) => {
Expand Down Expand Up @@ -45,11 +38,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
payload: {
workerId: options.queueModeId,
runningJobs: options.getRunningJobIds(),
runningJobsSummary: options.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`),
cpus: getOsCpuString(),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Service } from 'typedi';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker';
import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker';
import type { WorkerCommandReceivedHandlerOptions } from './types';

@Service()
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {
Expand Down
21 changes: 21 additions & 0 deletions packages/cli/src/services/orchestration/worker/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher';

export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
getRunningJobsSummary: () => WorkerJobStatusSummary[];
}

export interface WorkerJobStatusSummary {
jobId: string;
executionId: string;
retryOf?: string;
startedAt: Date;
mode: WorkflowExecuteMode;
workflowName: string;
workflowId: string;
status: ExecutionStatus;
}
5 changes: 4 additions & 1 deletion packages/cli/src/services/redis/RedisServiceCommands.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { WorkerJobStatusSummary } from '../orchestration/worker/types';

export type RedisServiceCommand =
| 'getStatus'
| 'getId'
Expand Down Expand Up @@ -29,11 +31,12 @@ export type RedisServiceWorkerResponseObject = {
payload: {
workerId: string;
runningJobs: string[];
runningJobsSummary: WorkerJobStatusSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;
Expand Down

0 comments on commit b8608ce

Please sign in to comment.