Skip to content

Commit

Permalink
delete useless loop in TransportBulkAction$BulkOperation.doRun()
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <kewei.11@bytedance.com>
Signed-off-by: kkewwei <kkewwei@163.com>
  • Loading branch information
kkewwei committed Jan 5, 2025
1 parent c0f7806 commit 9f29f7c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
- Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707))
- Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909))
- Delete useless loop in `TransportBulkAction$BulkOperation.doRun`

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ protected void doRun() {
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
Metadata metadata = clusterState.metadata();
// go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
// the request can only be null because we set it to null in the previous step, so it gets ignored
Expand Down Expand Up @@ -587,6 +589,12 @@ protected void doRun() {
default:
throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}

ShardId shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex.getName(), docWriteRequest.id(), docWriteRequest.routing())
.shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
} catch (OpenSearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
Expand All @@ -596,21 +604,6 @@ protected void doRun() {
}
}

// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex, request.id(), request.routing())
.shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}

if (requestsByShard.isEmpty()) {
BulkItemResponse[] response = responses.toArray(new BulkItemResponse[responses.length()]);
long tookMillis = buildTookInMillis(startTimeNanos);
Expand Down

0 comments on commit 9f29f7c

Please sign in to comment.