Skip to content

Conversation

@pmuellr
Copy link
Member

@pmuellr pmuellr commented Oct 18, 2020

resolves #55634
resolves #65746

Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.

@pmuellr
Copy link
Member Author

pmuellr commented Oct 20, 2020

I'm considering this a research spike, but seems like it's working the way we want. I might not be doing my Observables quite right :-).

One of the issues that came up with this is trying to get all the buffered docs written to ES in the case of an "orderly" shutdown of Kibana. I've made the plugin's stop() method be async and wait for the observables to complete.

The current level of code (before this PR), was likely dropping the existing "queued" documents at shutdown anyway, as they were "queued up" as an unbounded number of setImmediate()'s, each writing a single doc to ES, so we didn't really have any control over them.

@mikecote
Copy link
Contributor

I think this would be a good optimization to get in, especially when we're talking scaling alerting and looking for efficiencies / improvements.

I've made the plugin's stop() method be async and wait for the observables to complete.

One note here, I'm not sure if the platform will wait for the stop promise. If so, they may be removing support for this soon (#74395).

@pmuellr
Copy link
Member Author

pmuellr commented Oct 28, 2020

One note here, I'm not sure if the platform will wait for the stop promise. If so, they may be removing support for this soon (#74395).

I had a slack discussion with @pgayvallet on this a week ago, as I had the same concern. Platform is currently doing an await on the invocation, but there's some signature differences when you look at the plugin lifecycle interfaces vs implementation, so it's not clear when you start looking at it. They will need to introduce some kind of timeout, like they do for setup/start today (there's a reference to some shutdown changes here). Other than that, got a 👍 on the approach in this PR.

@mikecote
Copy link
Contributor

Awesome! Let me know when it's ready for review, I'll be happy to go over the changes 👍

@pmuellr pmuellr force-pushed the event-log/rx-buffer branch 2 times, most recently from 93536e0 to 4ede838 Compare November 16, 2020 14:14
resolves elastic#55634

Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.
@pmuellr pmuellr force-pushed the event-log/rx-buffer branch from 4ede838 to 1a7134d Compare November 16, 2020 14:20
@pmuellr pmuellr added Feature:Alerting release_note:skip Skip the PR/issue when compiling release notes Team:ResponseOps Platform ResponseOps team (formerly the Cases and Alerting teams) t// v7.11.0 v8.0.0 labels Nov 17, 2020
@pmuellr pmuellr marked this pull request as ready for review November 17, 2020 03:41
@pmuellr pmuellr requested a review from a team as a code owner November 17, 2020 03:41
@elasticmachine
Copy link
Contributor

Pinging @elastic/kibana-alerting-services (Team:Alerting Services)

@gmmorris gmmorris self-requested a review November 17, 2020 11:59
Copy link
Contributor

@gmmorris gmmorris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend cleaning up some of the Rxjs lifecycle (notes added) but other than that sems to work as expected.

I don't think there's a way to actually test this locally other than checking "by eye" that it uses bulk.. so LGTM 🤷

async indexDocuments(docs: Doc[]): Promise<void> {
// if es initialization failed, don't try to index
if (!(await this.context.waitTillReady())) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw an error instead of just returning silently? 🤔
Or at least log it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, it did log. However, I realized that it would log anytime the messages got flushed, and the entire pipeline will still run even if the initialization failed (indicated by waitTillReady() resolving to false). The entire pipeline still runs because the code path for logging an event from a client is now **completely** synchronous, replacing the old setImmediate()forking of the log writing, to the newnext()on the observable - this is fabulous! But also means we can't checkwaitTillReady()` until we get to this point. And we won't want to be spamming an error message here :-)

So, instead, I put one of these waitTillReady() calls with a single log message in the plugin code itself:

// log an error if initialiization didn't succeed
this.esContext.waitTillReady().then((success) => {
if (!success) {
this.systemLogger.error(`initialization failed, events will not be indexed`);
}
});

I'm not convinced this is the best approach, perhaps waitTillReady() should do the logging itself, in it's error paths, but then that's really too specific - I'm kinda treating waitTillReady() as a boolean getter at this point, felt safer to not have it log specific outcomes itself as a side effect.

But clearly needs a comment, in both locations, it's certainly not completely obvious.

Comment on lines 80 to 81
await this.doneWriting.wait();
this.docsBufferedSubscription.unsubscribe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 81 is actually redundant - once the docBuffer$ completes it will reactively unsubscribe docsBufferedSubscription so no need to call this explicitly.

Comment on lines 59 to 63
const docsBuffered$ = this.docBuffer$.pipe(
bufferTime(EVENT_BUFFER_TIME, null, EVENT_BUFFER_LENGTH),
filter((docs) => docs.length > 0),
switchMap(async (docs) => await this.indexDocuments(docs))
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can actually use this instead of the ReadySignal.

All Observables expose a toPromise which returns a promise which resolves when they complete.
As docBuffered$ completes when the docBuffer$ completes, you can do something like:

  this.docBufferedFlushed = docsBuffered$.toPromise();

and then in shutdown() you can do:

wait  this.docBufferedFlushed;

@pmuellr
Copy link
Member Author

pmuellr commented Nov 17, 2020

I don't think there's a way to actually test this locally other than checking "by eye" that it uses bulk.. so LGTM 🤷

There are some tests that the bulk is working, for time and count at the cluster_client_adapter level anyway - that seemed good enough to me:

describe('buffering documents', () => {
test('should write buffered docs after timeout', async () => {

What turned out to be impossible to test, is the new plugin shutdown code that flushes the remaining events. We do test at the cluster_client_adapter level, but the plugin stop test is a bit lame, but best I could figure out ATM. I think I searched and no one else was really testing plugin stop, but that likely makes sense because who else is doing significant work in plugin stop? That reminds me, I guess I should have someone in platform take a look at this bit ... :-)

I have been testing the stop flushing "by eye". Kill Kibana, and you'll now see the plugin's own "stopping" event gets logged, which never happened before! (check by curling the event log directly via es)

Copy link
Contributor

@ymao1 ymao1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Verified that it works as expected

Comment on lines 158 to 160
this.systemLogger.info('shutdown: waiting to finish');
await this.esContext?.shutdown();
this.systemLogger.info('shutdown: finished');
Copy link
Contributor

@pgayvallet pgayvallet Nov 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is esContext?.shutdown doing exactly?

Just want to point out that plugin.stop will only be called in case of graceful shutdowns. Process termination will obviously not call this, or the process may even be killed during this invocation. So this cleanup needs to be resilient to such scenarios.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

esContext.shutdown() will "stop" the rxjs pipeline batching event log documents, and wait till the final batch is written out (bulk indexed into the event log index). Assuming the general rxjs "complete" processing is very quick, the only latency will be from the bulk index call, which would be max 100 docs (max ~1K in size)

Totally understand that it only gets called under "nice" circumstances. We don't really have a better story though, and given the current buffering (100 docs or 1 sec elapsed), it's shouldn't lose too much even in an OOM kind of non-clean shutdown.

The event log is historic data, and we've always treated it as not critical data - it's not a source of truth.

And I think the story with this is better than what it's replacing, which was doing setImmediate()s to force the indexing of individual documents off the main loop, which could have gotten really chaotic (but we never saw that).

We're also now getting the "event log stopped" message that we've been logging since the very beginning, actually indexed - before this, it was never indexed, as presumably the Kibana process never waited for unfinished setImmediate() processing to finish (and rightly so!).

Still, it does scare me a bit!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that awaiting for this in case of 'normal' shutdown is way better than a setImmediate implementation (which was likely never finishing as the delay between plugins shutdown and termination is rather quick).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #83612 FYI

@mikecote
Copy link
Contributor

@elasticmachine merge upstream

@kibanamachine
Copy link
Contributor

💚 Build Succeeded

Metrics [docs]

Distributable file count

id before after diff
default 42887 42886 -1

History

To update your PR or re-run it, just comment with:
@elasticmachine merge upstream

@pmuellr pmuellr merged commit 5bfe665 into elastic:master Nov 20, 2020
pmuellr added a commit to pmuellr/kibana that referenced this pull request Nov 20, 2020
…#80941)

resolves elastic#55634
resolves elastic#65746

Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.

Also now flushing those buffers at plugin stop() time, which
we couldn't do before with the single index calls, which were
run via `setImmediate()`.
spalger added a commit that referenced this pull request Nov 20, 2020
@spalger
Copy link
Contributor

spalger commented Nov 20, 2020

Sorry @pmuellr, but once this was merged it started to cause type and jest failures on master, and the same failures popped up in your backport, so I reverted the PR and request that you resubmit the PR. I think it would make sense to close the backport and backport the second PR too.

Failures: https://kibana-ci.elastic.co/job/elastic+kibana+pipeline-pull-request/89145/execution/node/385/log/
Master build: https://kibana-ci.elastic.co/job/elastic+kibana+master/9788/

pmuellr added a commit to pmuellr/kibana that referenced this pull request Nov 20, 2020
resolves elastic#55634
resolves elastic#65746

Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.

Also now flushing those buffers at plugin stop() time, which
we couldn't do before with the single index calls, which were
run via `setImmediate()`.

This is a redo of PR elastic#80941 which
had to be reverted.
@pmuellr
Copy link
Member Author

pmuellr commented Nov 20, 2020

replacement PR here: #83927

Had to be reverted because I didn't merge upstream after PR #81891 got merged, which changed the spaces plugin setup/start, and the reverted PR contained a new test module for the plugin. Had to change where the spaces plugin is passed around, in setup/start

@pmuellr pmuellr added the backport:skip This PR does not require backporting label Nov 20, 2020
pmuellr added a commit that referenced this pull request Nov 20, 2020
…83927)

resolves #55634
resolves #65746

Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.

Also now flushing those buffers at plugin stop() time, which
we couldn't do before with the single index calls, which were
run via `setImmediate()`.

This is a redo of PR #80941 which
had to be reverted.
pmuellr added a commit to pmuellr/kibana that referenced this pull request Nov 20, 2020
…lastic#83927)

resolves elastic#55634
resolves elastic#65746

Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.

Also now flushing those buffers at plugin stop() time, which
we couldn't do before with the single index calls, which were
run via `setImmediate()`.

This is a redo of PR elastic#80941 which
had to be reverted.
pmuellr added a commit that referenced this pull request Nov 20, 2020
…83927) (#83962)

resolves #55634
resolves #65746

Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.

Also now flushing those buffers at plugin stop() time, which
we couldn't do before with the single index calls, which were
run via `setImmediate()`.

This is a redo of PR #80941 which
had to be reverted.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport:skip This PR does not require backporting Feature:Alerting release_note:skip Skip the PR/issue when compiling release notes reverted Team:ResponseOps Platform ResponseOps team (formerly the Cases and Alerting teams) t// v7.11.0 v8.0.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Investigate event log write performance / stress testing [alerting event log] buffer events being written instead of writing when logged

8 participants