Skip to content

Use RefCountingRunnable when creating indices in TransportBulkAction #103786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -390,18 +391,22 @@ protected void createMissingIndicesAndIndexData(
long startTime
) {
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
// Optimizing when there are no prerequisite actions
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
return;
}
Runnable executeBulkRunnable = () -> threadPool.executor(executorName).execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
}
});
try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) {
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<>() {
createIndex(index, bulkRequest.timeout(), ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
forkExecuteBulk(listener);
}
}
public void onResponse(CreateIndexResponse createIndexResponse) {}

@Override
public void onFailure(Exception e) {
Expand All @@ -419,23 +424,8 @@ public void onFailure(Exception e) {
}
}
}
if (counter.decrementAndGet() == 0) {
forkExecuteBulk(ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}));
Comment on lines -423 to -426
Copy link
Contributor Author

@gmarouli gmarouli Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware that this wrapping of the listener is not happening in the new code. I will explain why:

If my understanding is correct, this wrapping of the listener only happens if the last processed create index response is a failure. For example, the following two scenarios that are semantically equivalent will call or not call this code.

Scenario 1:

  • Creating index "good" succeeds
  • Creating index "invalid" fails
  • The following forkExecuteBulk will be executed:
forkExecuteBulk(ActionListener.wrap(listener::onResponse, inner -> {
    inner.addSuppressed(e);
    listener.onFailure(inner);
}))

Scenario 2:

  • Creating index "invalid" fails
  • Creating index "good" succeeds
  • The following forkExecuteBulk will be executed:
forkExecuteBulk(listener)

For this reason I believe the simplicity of the new code is better.

}
}

private void forkExecuteBulk(ActionListener<BulkResponse> finalListener) {
threadPool.executor(executorName).execute(new ActionRunnable<>(finalListener) {
@Override
protected void doRun() {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
}
});
}
});
}, refs.acquire()));
}
}
}
Expand Down