Skip to content

Commit 50dbe8f

Browse files
authored
[event_log] index event docs in bulk instead of individually (redo) (#83927)
resolves #55634 resolves #65746 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call. Also now flushing those buffers at plugin stop() time, which we couldn't do before with the single index calls, which were run via `setImmediate()`. This is a redo of PR #80941 which had to be reverted.
1 parent a8913d3 commit 50dbe8f

File tree

15 files changed

+314
-373
lines changed

15 files changed

+314
-373
lines changed

x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import { IClusterClientAdapter } from './cluster_client_adapter';
99
const createClusterClientMock = () => {
1010
const mock: jest.Mocked<IClusterClientAdapter> = {
1111
indexDocument: jest.fn(),
12+
indexDocuments: jest.fn(),
1213
doesIlmPolicyExist: jest.fn(),
1314
createIlmPolicy: jest.fn(),
1415
doesIndexTemplateExist: jest.fn(),
1516
createIndexTemplate: jest.fn(),
1617
doesAliasExist: jest.fn(),
1718
createIndex: jest.fn(),
1819
queryEventsBySavedObject: jest.fn(),
20+
shutdown: jest.fn(),
1921
};
2022
return mock;
2123
};

x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts

Lines changed: 154 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,22 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
import { LegacyClusterClient, Logger } from 'src/core/server';
7+
import { LegacyClusterClient } from 'src/core/server';
88
import { elasticsearchServiceMock, loggingSystemMock } from 'src/core/server/mocks';
9-
import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter';
9+
import {
10+
ClusterClientAdapter,
11+
IClusterClientAdapter,
12+
EVENT_BUFFER_LENGTH,
13+
} from './cluster_client_adapter';
14+
import { contextMock } from './context.mock';
1015
import { findOptionsSchema } from '../event_log_client';
16+
import { delay } from '../lib/delay';
17+
import { times } from 'lodash';
1118

1219
type EsClusterClient = Pick<jest.Mocked<LegacyClusterClient>, 'callAsInternalUser' | 'asScoped'>;
20+
type MockedLogger = ReturnType<typeof loggingSystemMock['createLogger']>;
1321

14-
let logger: Logger;
22+
let logger: MockedLogger;
1523
let clusterClient: EsClusterClient;
1624
let clusterClientAdapter: IClusterClientAdapter;
1725

@@ -21,22 +29,130 @@ beforeEach(() => {
2129
clusterClientAdapter = new ClusterClientAdapter({
2230
logger,
2331
clusterClientPromise: Promise.resolve(clusterClient),
32+
context: contextMock.create(),
2433
});
2534
});
2635

2736
describe('indexDocument', () => {
28-
test('should call cluster client with given doc', async () => {
29-
await clusterClientAdapter.indexDocument({ args: true });
30-
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('index', {
31-
args: true,
37+
test('should call cluster client bulk with given doc', async () => {
38+
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
39+
40+
await retryUntil('cluster client bulk called', () => {
41+
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
42+
});
43+
44+
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', {
45+
body: [{ create: { _index: 'event-log' } }, { message: 'foo' }],
3246
});
3347
});
3448

35-
test('should throw error when cluster client throws an error', async () => {
36-
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
37-
await expect(
38-
clusterClientAdapter.indexDocument({ args: true })
39-
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`);
49+
test('should log an error when cluster client throws an error', async () => {
50+
clusterClient.callAsInternalUser.mockRejectedValue(new Error('expected failure'));
51+
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
52+
await retryUntil('cluster client bulk called', () => {
53+
return logger.error.mock.calls.length !== 0;
54+
});
55+
56+
const expectedMessage = `error writing bulk events: "expected failure"; docs: [{"create":{"_index":"event-log"}},{"message":"foo"}]`;
57+
expect(logger.error).toHaveBeenCalledWith(expectedMessage);
58+
});
59+
});
60+
61+
describe('shutdown()', () => {
62+
test('should work if no docs have been written', async () => {
63+
const result = await clusterClientAdapter.shutdown();
64+
expect(result).toBeFalsy();
65+
});
66+
67+
test('should work if some docs have been written', async () => {
68+
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
69+
const resultPromise = clusterClientAdapter.shutdown();
70+
71+
await retryUntil('cluster client bulk called', () => {
72+
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
73+
});
74+
75+
const result = await resultPromise;
76+
expect(result).toBeFalsy();
77+
});
78+
});
79+
80+
describe('buffering documents', () => {
81+
test('should write buffered docs after timeout', async () => {
82+
// write EVENT_BUFFER_LENGTH - 1 docs
83+
for (let i = 0; i < EVENT_BUFFER_LENGTH - 1; i++) {
84+
clusterClientAdapter.indexDocument({ body: { message: `foo ${i}` }, index: 'event-log' });
85+
}
86+
87+
await retryUntil('cluster client bulk called', () => {
88+
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
89+
});
90+
91+
const expectedBody = [];
92+
for (let i = 0; i < EVENT_BUFFER_LENGTH - 1; i++) {
93+
expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` });
94+
}
95+
96+
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', {
97+
body: expectedBody,
98+
});
99+
});
100+
101+
test('should write buffered docs after buffer exceeded', async () => {
102+
// write EVENT_BUFFER_LENGTH + 1 docs
103+
for (let i = 0; i < EVENT_BUFFER_LENGTH + 1; i++) {
104+
clusterClientAdapter.indexDocument({ body: { message: `foo ${i}` }, index: 'event-log' });
105+
}
106+
107+
await retryUntil('cluster client bulk called', () => {
108+
return clusterClient.callAsInternalUser.mock.calls.length >= 2;
109+
});
110+
111+
const expectedBody = [];
112+
for (let i = 0; i < EVENT_BUFFER_LENGTH; i++) {
113+
expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` });
114+
}
115+
116+
expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(1, 'bulk', {
117+
body: expectedBody,
118+
});
119+
120+
expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(2, 'bulk', {
121+
body: [{ create: { _index: 'event-log' } }, { message: `foo 100` }],
122+
});
123+
});
124+
125+
test('should handle lots of docs correctly with a delay in the bulk index', async () => {
126+
// @ts-ignore
127+
clusterClient.callAsInternalUser.mockImplementation = async () => await delay(100);
128+
129+
const docs = times(EVENT_BUFFER_LENGTH * 10, (i) => ({
130+
body: { message: `foo ${i}` },
131+
index: 'event-log',
132+
}));
133+
134+
// write EVENT_BUFFER_LENGTH * 10 docs
135+
for (const doc of docs) {
136+
clusterClientAdapter.indexDocument(doc);
137+
}
138+
139+
await retryUntil('cluster client bulk called', () => {
140+
return clusterClient.callAsInternalUser.mock.calls.length >= 10;
141+
});
142+
143+
for (let i = 0; i < 10; i++) {
144+
const expectedBody = [];
145+
for (let j = 0; j < EVENT_BUFFER_LENGTH; j++) {
146+
expectedBody.push(
147+
{ create: { _index: 'event-log' } },
148+
{ message: `foo ${i * EVENT_BUFFER_LENGTH + j}` }
149+
);
150+
}
151+
152+
expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(i + 1, 'bulk', {
153+
body: expectedBody,
154+
});
155+
}
40156
});
41157
});
42158

@@ -575,3 +691,29 @@ describe('queryEventsBySavedObject', () => {
575691
`);
576692
});
577693
});
694+
695+
type RetryableFunction = () => boolean;
696+
697+
const RETRY_UNTIL_DEFAULT_COUNT = 20;
698+
const RETRY_UNTIL_DEFAULT_WAIT = 1000; // milliseconds
699+
700+
async function retryUntil(
701+
label: string,
702+
fn: RetryableFunction,
703+
count: number = RETRY_UNTIL_DEFAULT_COUNT,
704+
wait: number = RETRY_UNTIL_DEFAULT_WAIT
705+
): Promise<boolean> {
706+
while (count > 0) {
707+
count--;
708+
709+
if (fn()) return true;
710+
711+
// eslint-disable-next-line no-console
712+
console.log(`attempt failed waiting for "${label}", attempts left: ${count}`);
713+
714+
if (count === 0) return false;
715+
await delay(wait);
716+
}
717+
718+
return false;
719+
}

x-pack/plugins/event_log/server/es/cluster_client_adapter.ts

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,31 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7+
import { Subject } from 'rxjs';
8+
import { bufferTime, filter, switchMap } from 'rxjs/operators';
79
import { reject, isUndefined } from 'lodash';
810
import { SearchResponse, Client } from 'elasticsearch';
911
import type { PublicMethodsOf } from '@kbn/utility-types';
1012
import { Logger, LegacyClusterClient } from 'src/core/server';
11-
12-
import { IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
13+
import { EsContext } from '.';
14+
import { IEvent, IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
1315
import { FindOptionsType } from '../event_log_client';
1416

17+
export const EVENT_BUFFER_TIME = 1000; // milliseconds
18+
export const EVENT_BUFFER_LENGTH = 100;
19+
1520
export type EsClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
1621
export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>;
1722

23+
export interface Doc {
24+
index: string;
25+
body: IEvent;
26+
}
27+
1828
export interface ConstructorOpts {
1929
logger: Logger;
2030
clusterClientPromise: Promise<EsClusterClient>;
31+
context: EsContext;
2132
}
2233

2334
export interface QueryEventsBySavedObjectResult {
@@ -30,14 +41,67 @@ export interface QueryEventsBySavedObjectResult {
3041
export class ClusterClientAdapter {
3142
private readonly logger: Logger;
3243
private readonly clusterClientPromise: Promise<EsClusterClient>;
44+
private readonly docBuffer$: Subject<Doc>;
45+
private readonly context: EsContext;
46+
private readonly docsBufferedFlushed: Promise<void>;
3347

3448
constructor(opts: ConstructorOpts) {
3549
this.logger = opts.logger;
3650
this.clusterClientPromise = opts.clusterClientPromise;
51+
this.context = opts.context;
52+
this.docBuffer$ = new Subject<Doc>();
53+
54+
// buffer event log docs for time / buffer length, ignore empty
55+
// buffers, then index the buffered docs; kick things off with a
56+
// promise on the observable, which we'll wait on in shutdown
57+
this.docsBufferedFlushed = this.docBuffer$
58+
.pipe(
59+
bufferTime(EVENT_BUFFER_TIME, null, EVENT_BUFFER_LENGTH),
60+
filter((docs) => docs.length > 0),
61+
switchMap(async (docs) => await this.indexDocuments(docs))
62+
)
63+
.toPromise();
3764
}
3865

39-
public async indexDocument(doc: unknown): Promise<void> {
40-
await this.callEs<ReturnType<Client['index']>>('index', doc);
66+
// This will be called at plugin stop() time; the assumption is any plugins
67+
// depending on the event_log will already be stopped, and so will not be
68+
// writing more event docs. We complete the docBuffer$ observable,
69+
// and wait for the docsBufffered$ observable to complete via it's promise,
70+
// and so should end up writing all events out that pass through, before
71+
// Kibana shuts down (cleanly).
72+
public async shutdown(): Promise<void> {
73+
this.docBuffer$.complete();
74+
await this.docsBufferedFlushed;
75+
}
76+
77+
public indexDocument(doc: Doc): void {
78+
this.docBuffer$.next(doc);
79+
}
80+
81+
async indexDocuments(docs: Doc[]): Promise<void> {
82+
// If es initialization failed, don't try to index.
83+
// Also, don't log here, we log the failure case in plugin startup
84+
// instead, otherwise we'd be spamming the log (if done here)
85+
if (!(await this.context.waitTillReady())) {
86+
return;
87+
}
88+
89+
const bulkBody: Array<Record<string, unknown>> = [];
90+
91+
for (const doc of docs) {
92+
if (doc.body === undefined) continue;
93+
94+
bulkBody.push({ create: { _index: doc.index } });
95+
bulkBody.push(doc.body);
96+
}
97+
98+
try {
99+
await this.callEs<ReturnType<Client['bulk']>>('bulk', { body: bulkBody });
100+
} catch (err) {
101+
this.logger.error(
102+
`error writing bulk events: "${err.message}"; docs: ${JSON.stringify(bulkBody)}`
103+
);
104+
}
41105
}
42106

43107
public async doesIlmPolicyExist(policyName: string): Promise<boolean> {

x-pack/plugins/event_log/server/es/context.mock.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const createContextMock = () => {
1818
logger: loggingSystemMock.createLogger(),
1919
esNames: namesMock.create(),
2020
initialize: jest.fn(),
21+
shutdown: jest.fn(),
2122
waitTillReady: jest.fn(async () => true),
2223
esAdapter: clusterClientAdapterMock.create(),
2324
initialized: true,

x-pack/plugins/event_log/server/es/context.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export interface EsContext {
1818
esNames: EsNames;
1919
esAdapter: IClusterClientAdapter;
2020
initialize(): void;
21+
shutdown(): Promise<void>;
2122
waitTillReady(): Promise<boolean>;
2223
initialized: boolean;
2324
}
@@ -52,6 +53,7 @@ class EsContextImpl implements EsContext {
5253
this.esAdapter = new ClusterClientAdapter({
5354
logger: params.logger,
5455
clusterClientPromise: params.clusterClientPromise,
56+
context: this,
5557
});
5658
}
5759

@@ -74,6 +76,10 @@ class EsContextImpl implements EsContext {
7476
});
7577
}
7678

79+
async shutdown() {
80+
await this.esAdapter.shutdown();
81+
}
82+
7783
// waits till the ES initialization is done, returns true if it was successful,
7884
// false if it was not successful
7985
async waitTillReady(): Promise<boolean> {

x-pack/plugins/event_log/server/event_logger.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ describe('EventLogger', () => {
5959
eventLogger.logEvent({});
6060
await waitForLogEvent(systemLogger);
6161
delay(WRITE_LOG_WAIT_MILLIS); // sleep a bit longer since event logging is async
62-
expect(esContext.esAdapter.indexDocument).not.toHaveBeenCalled();
62+
expect(esContext.esAdapter.indexDocument).toHaveBeenCalled();
63+
expect(esContext.esAdapter.indexDocuments).not.toHaveBeenCalled();
6364
});
6465

6566
test('method logEvent() writes expected default values', async () => {

0 commit comments

Comments
 (0)