-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Use RefCountingRunnable when creating indices in TransportBulkAction #103786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use RefCountingRunnable when creating indices in TransportBulkAction #103786
Conversation
Pinging @elastic/es-data-management (Team:Data Management) |
forkExecuteBulk(ActionListener.wrap(listener::onResponse, inner -> { | ||
inner.addSuppressed(e); | ||
listener.onFailure(inner); | ||
})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am aware that this wrapping of the listener is not happening in the new code. I will explain why:
If my understanding is correct, this wrapping of the listener only happens if the last processed create index response is a failure. For example, the following two scenarios that are semantically equivalent will call or not call this code.
Scenario 1:
- Creating index "good" succeeds
- Creating index "invalid" fails
- The following
forkExecuteBulk
will be executed:
forkExecuteBulk(ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}))
Scenario 2:
- Creating index "invalid" fails
- Creating index "good" succeeds
- The following
forkExecuteBulk
will be executed:
forkExecuteBulk(listener)
For this reason I believe the simplicity of the new code is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok to me; I left a suggestion.
public void onResponse(CreateIndexResponse result) { | ||
if (counter.decrementAndGet() == 0) { | ||
forkExecuteBulk(listener); | ||
createIndex(index, bulkRequest.timeout(), refs.acquireListener().delegateResponse((l, e) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chaining delegateResponse
and map
in one expression is kind of suspicious, you're effectively overriding both onResponse
and onFailure
at the same time. Maybe just create a new ActionListener
directly, with something like ActionListener.releaseAfter(new ActionListener<>{...}, refs.acquire())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @DaveCTurner ! Good point, let me try it.
I see some failures locally during manual testing. I am investigating further. |
@elasticmachine update branch |
After extensive testing I wasn't able to reproduce it. What I saw initially, was Kibana reporting that the index already exists. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@DaveCTurner thank you for the feedback and the quick reactions! |
When an index can be auto-created, the
TransportBulkAction
will first try to create all the missing indices and then it's going to index the documents. This means that it's going to perform a number of async create index requests. This number is equal to the number of distinct indices that can be auto-created in the request.In this refactoring we use
RefCountingRunnable
to facilitate these async request. TheRefCountingRunnable
is already handling gracefully the different calls and can execute the bulk request when they all complete.This will also makes the code easier to extend when adding more pre-requisite actions, such as #103309.