Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Do not load ScalingService in regular mode (no-changelog) #10333

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { ExternalHooks } from '@/ExternalHooks';
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import type { Job, JobData, JobResult } from '@/scaling/types';
import { ScalingService } from '@/scaling/scaling.service';
import type { ScalingService } from '@/scaling/scaling.service';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
Expand All @@ -40,7 +40,7 @@ import { EventService } from './events/event.service';

@Service()
export class WorkflowRunner {
private readonly scalingService: ScalingService;
private scalingService: ScalingService;

private executionsMode = config.getEnv('executions.mode');

Expand All @@ -53,11 +53,7 @@ export class WorkflowRunner {
private readonly nodeTypes: NodeTypes,
private readonly permissionChecker: PermissionChecker,
private readonly eventService: EventService,
) {
if (this.executionsMode === 'queue') {
this.scalingService = Container.get(ScalingService);
}
}
) {}

/** The process did error */
async processError(
Expand Down Expand Up @@ -360,6 +356,11 @@ export class WorkflowRunner {
loadStaticData: !!loadStaticData,
};

if (!this.scalingService) {
const { ScalingService } = await import('@/scaling/scaling.service');
this.scalingService = Container.get(ScalingService);
}

let priority = 100;
if (realtime === true) {
// Jobs which require a direct response get a higher priority
Expand Down Expand Up @@ -404,7 +405,7 @@ export class WorkflowRunner {
async (resolve, reject, onCancel) => {
onCancel.shouldReject = false;
onCancel(async () => {
await Container.get(ScalingService).stopJob(job);
await this.scalingService.stopJob(job);

// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { ApplicationError } from 'n8n-workflow';

import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ScalingService } from '@/scaling/scaling.service';
import { WebhookServer } from '@/webhooks/WebhookServer';
import { BaseCommand } from './BaseCommand';

Expand Down Expand Up @@ -96,6 +95,7 @@ export class Webhook extends BaseCommand {
);
}

const { ScalingService } = await import('@/scaling/scaling.service');
await Container.get(ScalingService).setupQueue();
await this.server.start();
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { sleep, ApplicationError } from 'n8n-workflow';
import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
import config from '@/config';
import { ScalingService } from '@/scaling/scaling.service';
import type { ScalingService } from '@/scaling/scaling.service';
import { N8N_VERSION, inTest } from '@/constants';
import type { ICredentialsOverwrite } from '@/Interfaces';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
Expand Down Expand Up @@ -171,6 +171,7 @@ export class Worker extends BaseCommand {
}

async initScalingService() {
const { ScalingService } = await import('@/scaling/scaling.service');
this.scalingService = Container.get(ScalingService);

await this.scalingService.setupQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.err
import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error';
import type { ActiveExecutions } from '@/ActiveExecutions';
import type { IExecutionResponse } from '@/Interfaces';
import type { ScalingService } from '@/scaling/scaling.service';
import { ScalingService } from '@/scaling/scaling.service';
import type { WaitTracker } from '@/WaitTracker';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { ExecutionRequest } from '@/executions/execution.types';
import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import type { Job } from '@/scaling/types';
import { mockInstance } from '@test/mocking';

describe('ExecutionService', () => {
const scalingService = mock<ScalingService>();
const scalingService = mockInstance(ScalingService);
const activeExecutions = mock<ActiveExecutions>();
const executionRepository = mock<ExecutionRepository>();
const waitTracker = mock<WaitTracker>();
Expand All @@ -23,7 +24,6 @@ describe('ExecutionService', () => {
const executionService = new ExecutionService(
mock(),
mock(),
scalingService,
activeExecutions,
executionRepository,
mock(),
Expand Down
10 changes: 5 additions & 5 deletions packages/cli/src/executions/execution.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Service } from 'typedi';
import { Container, Service } from 'typedi';
import { GlobalConfig } from '@n8n/config';
import { validate as jsonSchemaValidate } from 'jsonschema';
import type {
Expand All @@ -24,7 +24,6 @@ import type {
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { ScalingService } from '@/scaling/scaling.service';
import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types';
import { WorkflowRunner } from '@/WorkflowRunner';
import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository';
Expand Down Expand Up @@ -85,7 +84,6 @@ export class ExecutionService {
constructor(
private readonly globalConfig: GlobalConfig,
private readonly logger: Logger,
private readonly scalingService: ScalingService,
private readonly activeExecutions: ActiveExecutions,
private readonly executionRepository: ExecutionRepository,
private readonly workflowRepository: WorkflowRepository,
Expand Down Expand Up @@ -471,12 +469,14 @@ export class ExecutionService {
this.waitTracker.stopExecution(execution.id);
}

const jobs = await this.scalingService.findJobsByStatus(['active', 'waiting']);
const { ScalingService } = await import('@/scaling/scaling.service');
const scalingService = Container.get(ScalingService);
const jobs = await scalingService.findJobsByStatus(['active', 'waiting']);

const job = jobs.find(({ data }) => data.executionId === execution.id);

if (job) {
await this.scalingService.stopJob(job);
await scalingService.stopJob(job);
} else {
this.logger.debug('Job to stop not in queue', { executionId: execution.id });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ describe('ExecutionService', () => {
mock(),
mock(),
mock(),
mock(),
executionRepository,
Container.get(WorkflowRepository),
mock(),
Expand Down
Loading