Skip to content

Commit 1a7134d

Browse files
committed
[event_log] index event docs in bulk instead of individually
resolves #55634 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call.
1 parent c8b8a0a commit 1a7134d

File tree

15 files changed

+280
-373
lines changed

15 files changed

+280
-373
lines changed

x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import { IClusterClientAdapter } from './cluster_client_adapter';
99
const createClusterClientMock = () => {
1010
const mock: jest.Mocked<IClusterClientAdapter> = {
1111
indexDocument: jest.fn(),
12+
indexDocuments: jest.fn(),
1213
doesIlmPolicyExist: jest.fn(),
1314
createIlmPolicy: jest.fn(),
1415
doesIndexTemplateExist: jest.fn(),
1516
createIndexTemplate: jest.fn(),
1617
doesAliasExist: jest.fn(),
1718
createIndex: jest.fn(),
1819
queryEventsBySavedObject: jest.fn(),
20+
shutdown: jest.fn(),
1921
};
2022
return mock;
2123
};

x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts

Lines changed: 120 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,21 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
import { LegacyClusterClient, Logger } from 'src/core/server';
7+
import { LegacyClusterClient } from 'src/core/server';
88
import { elasticsearchServiceMock, loggingSystemMock } from 'src/core/server/mocks';
9-
import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter';
9+
import {
10+
ClusterClientAdapter,
11+
IClusterClientAdapter,
12+
EVENT_BUFFER_LENGTH,
13+
} from './cluster_client_adapter';
14+
import { contextMock } from './context.mock';
1015
import { findOptionsSchema } from '../event_log_client';
16+
import { delay } from '../lib/delay';
1117

1218
type EsClusterClient = Pick<jest.Mocked<LegacyClusterClient>, 'callAsInternalUser' | 'asScoped'>;
19+
type MockedLogger = ReturnType<typeof loggingSystemMock['createLogger']>;
1320

14-
let logger: Logger;
21+
let logger: MockedLogger;
1522
let clusterClient: EsClusterClient;
1623
let clusterClientAdapter: IClusterClientAdapter;
1724

@@ -21,22 +28,97 @@ beforeEach(() => {
2128
clusterClientAdapter = new ClusterClientAdapter({
2229
logger,
2330
clusterClientPromise: Promise.resolve(clusterClient),
31+
context: contextMock.create(),
2432
});
2533
});
2634

2735
describe('indexDocument', () => {
28-
test('should call cluster client with given doc', async () => {
29-
await clusterClientAdapter.indexDocument({ args: true });
30-
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('index', {
31-
args: true,
36+
test('should call cluster client bulk with given doc', async () => {
37+
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
38+
39+
await retryUntil('cluster client bulk called', () => {
40+
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
41+
});
42+
43+
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', {
44+
body: [{ create: { _index: 'event-log' } }, { message: 'foo' }],
3245
});
3346
});
3447

35-
test('should throw error when cluster client throws an error', async () => {
36-
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
37-
await expect(
38-
clusterClientAdapter.indexDocument({ args: true })
39-
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`);
48+
test('should log an error when cluster client throws an error', async () => {
49+
clusterClient.callAsInternalUser.mockRejectedValue(new Error('expected failure'));
50+
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
51+
await retryUntil('cluster client bulk called', () => {
52+
return logger.error.mock.calls.length !== 0;
53+
});
54+
55+
const expectedMessage = `error writing bulk events: "expected failure"; docs: [{"create":{"_index":"event-log"}},{"message":"foo"}]`;
56+
expect(logger.error).toHaveBeenCalledWith(expectedMessage);
57+
});
58+
});
59+
60+
describe('shutdown()', () => {
61+
test('should work if no docs have been written', async () => {
62+
const result = await clusterClientAdapter.shutdown();
63+
expect(result).toBeFalsy();
64+
});
65+
66+
test('should work if some docs have been written', async () => {
67+
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
68+
const resultPromise = clusterClientAdapter.shutdown();
69+
70+
await retryUntil('cluster client bulk called', () => {
71+
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
72+
});
73+
74+
const result = await resultPromise;
75+
expect(result).toBeFalsy();
76+
});
77+
});
78+
79+
describe('buffering documents', () => {
80+
test('should write buffered docs after timeout', async () => {
81+
// write EVENT_BUFFER_LENGTH - 1 docs
82+
for (let i = 0; i < EVENT_BUFFER_LENGTH - 1; i++) {
83+
clusterClientAdapter.indexDocument({ body: { message: `foo ${i}` }, index: 'event-log' });
84+
}
85+
86+
await retryUntil('cluster client bulk called', () => {
87+
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
88+
});
89+
90+
const expectedBody = [];
91+
for (let i = 0; i < EVENT_BUFFER_LENGTH - 1; i++) {
92+
expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` });
93+
}
94+
95+
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', {
96+
body: expectedBody,
97+
});
98+
});
99+
100+
test('should write buffered docs after buffer exceeded', async () => {
101+
// write EVENT_BUFFER_LENGTH + 1 docs
102+
for (let i = 0; i < EVENT_BUFFER_LENGTH + 1; i++) {
103+
clusterClientAdapter.indexDocument({ body: { message: `foo ${i}` }, index: 'event-log' });
104+
}
105+
106+
await retryUntil('cluster client bulk called', () => {
107+
return clusterClient.callAsInternalUser.mock.calls.length >= 2;
108+
});
109+
110+
const expectedBody = [];
111+
for (let i = 0; i < EVENT_BUFFER_LENGTH; i++) {
112+
expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` });
113+
}
114+
115+
expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(1, 'bulk', {
116+
body: expectedBody,
117+
});
118+
119+
expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(2, 'bulk', {
120+
body: [{ create: { _index: 'event-log' } }, { message: `foo 100` }],
121+
});
40122
});
41123
});
42124

@@ -575,3 +657,29 @@ describe('queryEventsBySavedObject', () => {
575657
`);
576658
});
577659
});
660+
661+
type RetryableFunction = () => boolean;
662+
663+
const RETRY_UNTIL_DEFAULT_COUNT = 20;
664+
const RETRY_UNTIL_DEFAULT_WAIT = 1000; // milliseconds
665+
666+
async function retryUntil(
667+
label: string,
668+
fn: RetryableFunction,
669+
count: number = RETRY_UNTIL_DEFAULT_COUNT,
670+
wait: number = RETRY_UNTIL_DEFAULT_WAIT
671+
): Promise<boolean> {
672+
while (count > 0) {
673+
count--;
674+
675+
if (fn()) return true;
676+
677+
// eslint-disable-next-line no-console
678+
console.log(`attempt failed waiting for "${label}", attempts left: ${count}`);
679+
680+
if (count === 0) return false;
681+
await delay(wait);
682+
}
683+
684+
return false;
685+
}

x-pack/plugins/event_log/server/es/cluster_client_adapter.ts

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,32 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7+
import { Subject, Subscription } from 'rxjs';
8+
import { bufferTime, filter, switchMap } from 'rxjs/operators';
79
import { reject, isUndefined } from 'lodash';
810
import { SearchResponse, Client } from 'elasticsearch';
911
import type { PublicMethodsOf } from '@kbn/utility-types';
1012
import { Logger, LegacyClusterClient } from 'src/core/server';
11-
12-
import { IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
13+
import { ReadySignal, createReadySignal } from '../lib/ready_signal';
14+
import { EsContext } from '.';
15+
import { IEvent, IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
1316
import { FindOptionsType } from '../event_log_client';
1417

18+
export const EVENT_BUFFER_TIME = 1000; // milliseconds
19+
export const EVENT_BUFFER_LENGTH = 100;
20+
1521
export type EsClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
1622
export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>;
1723

24+
export interface Doc {
25+
index: string;
26+
body: IEvent;
27+
}
28+
1829
export interface ConstructorOpts {
1930
logger: Logger;
2031
clusterClientPromise: Promise<EsClusterClient>;
32+
context: EsContext;
2133
}
2234

2335
export interface QueryEventsBySavedObjectResult {
@@ -30,14 +42,71 @@ export interface QueryEventsBySavedObjectResult {
3042
export class ClusterClientAdapter {
3143
private readonly logger: Logger;
3244
private readonly clusterClientPromise: Promise<EsClusterClient>;
45+
private readonly docBuffer$: Subject<Doc>;
46+
private readonly docsBufferedSubscription: Subscription;
47+
private readonly doneWriting: ReadySignal;
48+
private readonly context: EsContext;
3349

3450
constructor(opts: ConstructorOpts) {
3551
this.logger = opts.logger;
3652
this.clusterClientPromise = opts.clusterClientPromise;
53+
this.context = opts.context;
54+
this.docBuffer$ = new Subject<Doc>();
55+
this.doneWriting = createReadySignal();
56+
57+
// buffer event log docs for time / buffer length, ignore empty
58+
// buffers, then index the buffered docs
59+
const docsBuffered$ = this.docBuffer$.pipe(
60+
bufferTime(EVENT_BUFFER_TIME, null, EVENT_BUFFER_LENGTH),
61+
filter((docs) => docs.length > 0),
62+
switchMap(async (docs) => await this.indexDocuments(docs))
63+
);
64+
65+
// kick everything off, when the stream closes, signal finished
66+
const { doneWriting } = this;
67+
this.docsBufferedSubscription = docsBuffered$.subscribe({
68+
complete() {
69+
doneWriting.signal();
70+
},
71+
});
72+
}
73+
74+
// This will be called at plugin stop() time; the assumption is any plugins
75+
// depending on the event_log will already be stopped, and so will not be
76+
// writing more event docs. So we shut down the docBuffer$ observable,
77+
// and wait for the docsBufffered$ observable to complete
78+
public async shutdown(): Promise<void> {
79+
this.docBuffer$.complete();
80+
await this.doneWriting.wait();
81+
this.docsBufferedSubscription.unsubscribe();
82+
}
83+
84+
public indexDocument(doc: Doc): void {
85+
this.docBuffer$.next(doc);
3786
}
3887

39-
public async indexDocument(doc: unknown): Promise<void> {
40-
await this.callEs<ReturnType<Client['index']>>('index', doc);
88+
async indexDocuments(docs: Doc[]): Promise<void> {
89+
// if es initialization failed, don't try to index
90+
if (!(await this.context.waitTillReady())) {
91+
return;
92+
}
93+
94+
const bulkBody: Array<Record<string, unknown>> = [];
95+
96+
for (const doc of docs) {
97+
if (doc.body === undefined) continue;
98+
99+
bulkBody.push({ create: { _index: doc.index } });
100+
bulkBody.push(doc.body);
101+
}
102+
103+
try {
104+
await this.callEs<ReturnType<Client['bulk']>>('bulk', { body: bulkBody });
105+
} catch (err) {
106+
this.logger.error(
107+
`error writing bulk events: "${err.message}"; docs: ${JSON.stringify(bulkBody)}`
108+
);
109+
}
41110
}
42111

43112
public async doesIlmPolicyExist(policyName: string): Promise<boolean> {

x-pack/plugins/event_log/server/es/context.mock.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const createContextMock = () => {
1818
logger: loggingSystemMock.createLogger(),
1919
esNames: namesMock.create(),
2020
initialize: jest.fn(),
21+
shutdown: jest.fn(),
2122
waitTillReady: jest.fn(async () => true),
2223
esAdapter: clusterClientAdapterMock.create(),
2324
initialized: true,

x-pack/plugins/event_log/server/es/context.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export interface EsContext {
1818
esNames: EsNames;
1919
esAdapter: IClusterClientAdapter;
2020
initialize(): void;
21+
shutdown(): Promise<void>;
2122
waitTillReady(): Promise<boolean>;
2223
initialized: boolean;
2324
}
@@ -52,6 +53,7 @@ class EsContextImpl implements EsContext {
5253
this.esAdapter = new ClusterClientAdapter({
5354
logger: params.logger,
5455
clusterClientPromise: params.clusterClientPromise,
56+
context: this,
5557
});
5658
}
5759

@@ -74,6 +76,10 @@ class EsContextImpl implements EsContext {
7476
});
7577
}
7678

79+
async shutdown() {
80+
await this.esAdapter.shutdown();
81+
}
82+
7783
// waits till the ES initialization is done, returns true if it was successful,
7884
// false if it was not successful
7985
async waitTillReady(): Promise<boolean> {

x-pack/plugins/event_log/server/event_logger.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ describe('EventLogger', () => {
5959
eventLogger.logEvent({});
6060
await waitForLogEvent(systemLogger);
6161
delay(WRITE_LOG_WAIT_MILLIS); // sleep a bit longer since event logging is async
62-
expect(esContext.esAdapter.indexDocument).not.toHaveBeenCalled();
62+
expect(esContext.esAdapter.indexDocument).toHaveBeenCalled();
63+
expect(esContext.esAdapter.indexDocuments).not.toHaveBeenCalled();
6364
});
6465

6566
test('method logEvent() writes expected default values', async () => {

x-pack/plugins/event_log/server/event_logger.ts

Lines changed: 3 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,10 @@ import {
2020
EventSchema,
2121
} from './types';
2222
import { SAVED_OBJECT_REL_PRIMARY } from './types';
23+
import { Doc } from './es/cluster_client_adapter';
2324

2425
type SystemLogger = Plugin['systemLogger'];
2526

26-
interface Doc {
27-
index: string;
28-
body: IEvent;
29-
}
30-
3127
interface IEventLoggerCtorParams {
3228
esContext: EsContext;
3329
eventLogService: EventLogService;
@@ -159,44 +155,9 @@ function validateEvent(eventLogService: IEventLogService, event: IEvent): IValid
159155
export const EVENT_LOGGED_PREFIX = `event logged: `;
160156

161157
function logEventDoc(logger: Logger, doc: Doc): void {
162-
setImmediate(() => {
163-
logger.info(`${EVENT_LOGGED_PREFIX}${JSON.stringify(doc.body)}`);
164-
});
158+
logger.info(`event logged: ${JSON.stringify(doc.body)}`);
165159
}
166160

167161
function indexEventDoc(esContext: EsContext, doc: Doc): void {
168-
// TODO:
169-
// the setImmediate() on an async function is a little overkill, but,
170-
// setImmediate() may be tweakable via node params, whereas async
171-
// tweaking is in the v8 params realm, which is very dicey.
172-
// Long-term, we should probably create an in-memory queue for this, so
173-
// we can explictly see/set the queue lengths.
174-
175-
// already verified this.clusterClient isn't null above
176-
setImmediate(async () => {
177-
try {
178-
await indexLogEventDoc(esContext, doc);
179-
} catch (err) {
180-
esContext.logger.warn(`error writing event doc: ${err.message}`);
181-
writeLogEventDocOnError(esContext, doc);
182-
}
183-
});
184-
}
185-
186-
// whew, the thing that actually writes the event log document!
187-
async function indexLogEventDoc(esContext: EsContext, doc: unknown) {
188-
esContext.logger.debug(`writing to event log: ${JSON.stringify(doc)}`);
189-
const success = await esContext.waitTillReady();
190-
if (!success) {
191-
esContext.logger.debug(`event log did not initialize correctly, event not written`);
192-
return;
193-
}
194-
195-
await esContext.esAdapter.indexDocument(doc);
196-
esContext.logger.debug(`writing to event log complete`);
197-
}
198-
199-
// TODO: write log entry to a bounded queue buffer
200-
function writeLogEventDocOnError(esContext: EsContext, doc: unknown) {
201-
esContext.logger.warn(`unable to write event doc: ${JSON.stringify(doc)}`);
162+
esContext.esAdapter.indexDocument(doc);
202163
}

0 commit comments

Comments
 (0)