Skip to content

Use RefCountingRunnable when creating indices in TransportBulkAction #103786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

gmarouli
Copy link
Contributor

@gmarouli gmarouli commented Jan 2, 2024

When an index can be auto-created, the TransportBulkAction will first try to create all the missing indices and then it's going to index the documents. This means that it's going to perform a number of async create index requests. This number is equal to the number of distinct indices that can be auto-created in the request.

In this refactoring we use RefCountingRunnable to facilitate these async request. The RefCountingRunnable is already handling gracefully the different calls and can execute the bulk request when they all complete.

This will also makes the code easier to extend when adding more pre-requisite actions, such as #103309.

@gmarouli gmarouli added :Data Management/Indices APIs APIs to create and manage indices and templates >refactoring labels Jan 2, 2024
@gmarouli gmarouli requested a review from dakrone January 2, 2024 12:38
@elasticsearchmachine elasticsearchmachine added Team:Data Management Meta label for data/management team v8.13.0 labels Jan 2, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

Comment on lines -423 to -426
forkExecuteBulk(ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}));
Copy link
Contributor Author

@gmarouli gmarouli Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware that this wrapping of the listener is not happening in the new code. I will explain why:

If my understanding is correct, this wrapping of the listener only happens if the last processed create index response is a failure. For example, the following two scenarios that are semantically equivalent will call or not call this code.

Scenario 1:

  • Creating index "good" succeeds
  • Creating index "invalid" fails
  • The following forkExecuteBulk will be executed:
forkExecuteBulk(ActionListener.wrap(listener::onResponse, inner -> {
    inner.addSuppressed(e);
    listener.onFailure(inner);
}))

Scenario 2:

  • Creating index "invalid" fails
  • Creating index "good" succeeds
  • The following forkExecuteBulk will be executed:
forkExecuteBulk(listener)

For this reason I believe the simplicity of the new code is better.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok to me; I left a suggestion.

public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
forkExecuteBulk(listener);
createIndex(index, bulkRequest.timeout(), refs.acquireListener().delegateResponse((l, e) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chaining delegateResponse and map in one expression is kind of suspicious, you're effectively overriding both onResponse and onFailure at the same time. Maybe just create a new ActionListener directly, with something like ActionListener.releaseAfter(new ActionListener<>{...}, refs.acquire()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @DaveCTurner ! Good point, let me try it.

@gmarouli
Copy link
Contributor Author

gmarouli commented Jan 2, 2024

I see some failures locally during manual testing. I am investigating further.

@gmarouli
Copy link
Contributor Author

gmarouli commented Jan 3, 2024

@elasticmachine update branch

@gmarouli gmarouli requested a review from DaveCTurner January 3, 2024 09:07
@gmarouli
Copy link
Contributor Author

gmarouli commented Jan 3, 2024

I see some failures locally during manual testing. I am investigating further.

After extensive testing I wasn't able to reproduce it. What I saw initially, was Kibana reporting that the index already exists.
I haven't been able to reproduce it since so I am considering maybe my local set-up was a bit off and after the clean rebuild it was fixed.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@gmarouli
Copy link
Contributor Author

gmarouli commented Jan 3, 2024

@DaveCTurner thank you for the feedback and the quick reactions!

@gmarouli gmarouli merged commit 0a90316 into elastic:main Jan 3, 2024
@gmarouli gmarouli deleted the use-ref-counting-listener-bulk-indexing branch August 20, 2024 07:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Indices APIs APIs to create and manage indices and templates >refactoring Team:Data Management Meta label for data/management team v8.13.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants