Skip to content

Commit a1bfedf

Browse files
remove immediate functions from esqueue worker cycles (elastic#65375) (elastic#65753)
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent ceab635 commit a1bfedf

File tree

2 files changed

+14
-32
lines changed

2 files changed

+14
-32
lines changed

x-pack/legacy/plugins/reporting/server/lib/create_worker.ts

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,13 @@ import {
1111
ESQueueInstance,
1212
ESQueueWorkerExecuteFn,
1313
ExportTypeDefinition,
14-
ImmediateExecuteFn,
15-
JobDocPayload,
1614
JobSource,
1715
Logger,
18-
RequestFacade,
1916
} from '../../types';
2017
// @ts-ignore untyped dependency
2118
import { events as esqueueEvents } from './esqueue';
2219

2320
export function createWorkerFactory<JobParamsType>(reporting: ReportingCore, logger: Logger) {
24-
type JobDocPayloadType = JobDocPayload<JobParamsType>;
25-
2621
const config = reporting.getConfig();
2722
const queueConfig = config.get('queue');
2823
const kibanaName = config.kbnConfig.get('server', 'name');
@@ -31,48 +26,36 @@ export function createWorkerFactory<JobParamsType>(reporting: ReportingCore, log
3126
// Once more document types are added, this will need to be passed in
3227
return async function createWorker(queue: ESQueueInstance) {
3328
// export type / execute job map
34-
const jobExecutors: Map<
35-
string,
36-
ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>
37-
> = new Map();
29+
const jobExecutors: Map<string, ESQueueWorkerExecuteFn<unknown>> = new Map();
3830

3931
for (const exportType of reporting.getExportTypesRegistry().getAll() as Array<
40-
ExportTypeDefinition<
41-
JobParamsType,
42-
unknown,
43-
unknown,
44-
ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>
45-
>
32+
ExportTypeDefinition<JobParamsType, unknown, unknown, ESQueueWorkerExecuteFn<unknown>>
4633
>) {
4734
const jobExecutor = await exportType.executeJobFactory(reporting, logger); // FIXME: does not "need" to be async
4835
jobExecutors.set(exportType.jobType, jobExecutor);
4936
}
5037

51-
const workerFn = (jobSource: JobSource<JobParamsType>, ...workerRestArgs: any[]) => {
38+
const workerFn = <ScheduledTaskParamsType>(
39+
jobSource: JobSource<ScheduledTaskParamsType>,
40+
jobParams: ScheduledTaskParamsType,
41+
cancellationToken: CancellationToken
42+
) => {
5243
const {
5344
_id: jobId,
5445
_source: { jobtype: jobType },
5546
} = jobSource;
5647

48+
if (!jobId) {
49+
throw new Error(`Claimed job is missing an ID!: ${JSON.stringify(jobSource)}`);
50+
}
51+
5752
const jobTypeExecutor = jobExecutors.get(jobType);
58-
// pass the work to the jobExecutor
5953
if (!jobTypeExecutor) {
6054
throw new Error(`Unable to find a job executor for the claimed job: [${jobId}]`);
6155
}
6256

63-
if (jobId) {
64-
const jobExecutorWorker = jobTypeExecutor as ESQueueWorkerExecuteFn<JobDocPayloadType>;
65-
return jobExecutorWorker(
66-
jobId,
67-
...(workerRestArgs as [JobDocPayloadType, CancellationToken])
68-
);
69-
} else {
70-
const jobExecutorImmediate = jobExecutors.get(jobType) as ImmediateExecuteFn<JobParamsType>;
71-
return jobExecutorImmediate(
72-
null,
73-
...(workerRestArgs as [JobDocPayload<JobParamsType>, RequestFacade])
74-
);
75-
}
57+
// pass the work to the jobExecutor
58+
return jobTypeExecutor(jobId, jobParams, cancellationToken);
7659
};
7760

7861
const workerOptions = {

x-pack/legacy/plugins/reporting/server/lib/enqueue_job.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import {
99
ConditionalHeaders,
1010
EnqueueJobFn,
1111
ESQueueCreateJobFn,
12-
ImmediateCreateJobFn,
1312
Job,
1413
Logger,
1514
RequestFacade,
@@ -40,7 +39,7 @@ export function enqueueJobFactory(reporting: ReportingCore, parentLogger: Logger
4039
headers: ConditionalHeaders['headers'],
4140
request: RequestFacade
4241
): Promise<Job> {
43-
type CreateJobFn = ESQueueCreateJobFn<JobParamsType> | ImmediateCreateJobFn<JobParamsType>;
42+
type CreateJobFn = ESQueueCreateJobFn<JobParamsType>;
4443

4544
const esqueue = await reporting.getEsqueue();
4645
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId);

0 commit comments

Comments
 (0)