-
Notifications
You must be signed in to change notification settings - Fork 8.5k
[event_log] index event docs in bulk instead of individually #80941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I'm considering this a research spike, but seems like it's working the way we want. I might not be doing my Observables quite right :-). One of the issues that came up with this is trying to get all the buffered docs written to ES in the case of an "orderly" shutdown of Kibana. I've made the plugin's The current level of code (before this PR), was likely dropping the existing "queued" documents at shutdown anyway, as they were "queued up" as an unbounded number of |
|
I think this would be a good optimization to get in, especially when we're talking scaling alerting and looking for efficiencies / improvements.
One note here, I'm not sure if the platform will wait for the |
I had a slack discussion with @pgayvallet on this a week ago, as I had the same concern. Platform is currently doing an await on the invocation, but there's some signature differences when you look at the plugin lifecycle interfaces vs implementation, so it's not clear when you start looking at it. They will need to introduce some kind of timeout, like they do for setup/start today (there's a reference to some shutdown changes here). Other than that, got a 👍 on the approach in this PR. |
|
Awesome! Let me know when it's ready for review, I'll be happy to go over the changes 👍 |
93536e0 to
4ede838
Compare
resolves elastic#55634 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call.
4ede838 to
1a7134d
Compare
|
Pinging @elastic/kibana-alerting-services (Team:Alerting Services) |
gmmorris
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend cleaning up some of the Rxjs lifecycle (notes added) but other than that sems to work as expected.
I don't think there's a way to actually test this locally other than checking "by eye" that it uses bulk.. so LGTM 🤷
| async indexDocuments(docs: Doc[]): Promise<void> { | ||
| // if es initialization failed, don't try to index | ||
| if (!(await this.context.waitTillReady())) { | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this throw an error instead of just returning silently? 🤔
Or at least log it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally, it did log. However, I realized that it would log anytime the messages got flushed, and the entire pipeline will still run even if the initialization failed (indicated by waitTillReady() resolving to false). The entire pipeline still runs because the code path for logging an event from a client is now **completely** synchronous, replacing the old setImmediate()forking of the log writing, to the newnext()on the observable - this is fabulous! But also means we can't checkwaitTillReady()` until we get to this point. And we won't want to be spamming an error message here :-)
So, instead, I put one of these waitTillReady() calls with a single log message in the plugin code itself:
kibana/x-pack/plugins/event_log/server/plugin.ts
Lines 120 to 125 in 1a7134d
| // log an error if initialiization didn't succeed | |
| this.esContext.waitTillReady().then((success) => { | |
| if (!success) { | |
| this.systemLogger.error(`initialization failed, events will not be indexed`); | |
| } | |
| }); |
I'm not convinced this is the best approach, perhaps waitTillReady() should do the logging itself, in it's error paths, but then that's really too specific - I'm kinda treating waitTillReady() as a boolean getter at this point, felt safer to not have it log specific outcomes itself as a side effect.
But clearly needs a comment, in both locations, it's certainly not completely obvious.
| await this.doneWriting.wait(); | ||
| this.docsBufferedSubscription.unsubscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 81 is actually redundant - once the docBuffer$ completes it will reactively unsubscribe docsBufferedSubscription so no need to call this explicitly.
| const docsBuffered$ = this.docBuffer$.pipe( | ||
| bufferTime(EVENT_BUFFER_TIME, null, EVENT_BUFFER_LENGTH), | ||
| filter((docs) => docs.length > 0), | ||
| switchMap(async (docs) => await this.indexDocuments(docs)) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can actually use this instead of the ReadySignal.
All Observables expose a toPromise which returns a promise which resolves when they complete.
As docBuffered$ completes when the docBuffer$ completes, you can do something like:
this.docBufferedFlushed = docsBuffered$.toPromise();
and then in shutdown() you can do:
wait this.docBufferedFlushed;
There are some tests that the bulk is working, for time and count at the
What turned out to be impossible to test, is the new plugin shutdown code that flushes the remaining events. We do test at the I have been testing the |
ymao1
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Verified that it works as expected
| this.systemLogger.info('shutdown: waiting to finish'); | ||
| await this.esContext?.shutdown(); | ||
| this.systemLogger.info('shutdown: finished'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is esContext?.shutdown doing exactly?
Just want to point out that plugin.stop will only be called in case of graceful shutdowns. Process termination will obviously not call this, or the process may even be killed during this invocation. So this cleanup needs to be resilient to such scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
esContext.shutdown() will "stop" the rxjs pipeline batching event log documents, and wait till the final batch is written out (bulk indexed into the event log index). Assuming the general rxjs "complete" processing is very quick, the only latency will be from the bulk index call, which would be max 100 docs (max ~1K in size)
Totally understand that it only gets called under "nice" circumstances. We don't really have a better story though, and given the current buffering (100 docs or 1 sec elapsed), it's shouldn't lose too much even in an OOM kind of non-clean shutdown.
The event log is historic data, and we've always treated it as not critical data - it's not a source of truth.
And I think the story with this is better than what it's replacing, which was doing setImmediate()s to force the indexing of individual documents off the main loop, which could have gotten really chaotic (but we never saw that).
We're also now getting the "event log stopped" message that we've been logging since the very beginning, actually indexed - before this, it was never indexed, as presumably the Kibana process never waited for unfinished setImmediate() processing to finish (and rightly so!).
Still, it does scare me a bit!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that awaiting for this in case of 'normal' shutdown is way better than a setImmediate implementation (which was likely never finishing as the delay between plugins shutdown and termination is rather quick).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #83612 FYI
|
@elasticmachine merge upstream |
💚 Build SucceededMetrics [docs]Distributable file count
History
To update your PR or re-run it, just comment with: |
…#80941) resolves elastic#55634 resolves elastic#65746 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call. Also now flushing those buffers at plugin stop() time, which we couldn't do before with the single index calls, which were run via `setImmediate()`.
|
Sorry @pmuellr, but once this was merged it started to cause type and jest failures on master, and the same failures popped up in your backport, so I reverted the PR and request that you resubmit the PR. I think it would make sense to close the backport and backport the second PR too. Failures: https://kibana-ci.elastic.co/job/elastic+kibana+pipeline-pull-request/89145/execution/node/385/log/ |
resolves elastic#55634 resolves elastic#65746 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call. Also now flushing those buffers at plugin stop() time, which we couldn't do before with the single index calls, which were run via `setImmediate()`. This is a redo of PR elastic#80941 which had to be reverted.
…83927) resolves #55634 resolves #65746 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call. Also now flushing those buffers at plugin stop() time, which we couldn't do before with the single index calls, which were run via `setImmediate()`. This is a redo of PR #80941 which had to be reverted.
…lastic#83927) resolves elastic#55634 resolves elastic#65746 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call. Also now flushing those buffers at plugin stop() time, which we couldn't do before with the single index calls, which were run via `setImmediate()`. This is a redo of PR elastic#80941 which had to be reverted.
…83927) (#83962) resolves #55634 resolves #65746 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call. Also now flushing those buffers at plugin stop() time, which we couldn't do before with the single index calls, which were run via `setImmediate()`. This is a redo of PR #80941 which had to be reverted.
resolves #55634
resolves #65746
Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.