Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chunked encoding for indices stats response #91760

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,25 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -33,7 +38,7 @@

import static java.util.Collections.unmodifiableMap;

public class IndicesStatsResponse extends BroadcastResponse {
public class IndicesStatsResponse extends BaseBroadcastResponse implements ChunkedToXContent {

private final Map<String, ClusterHealthStatus> indexHealthMap;

Expand Down Expand Up @@ -171,30 +176,19 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
final String level = params.param("level", "indices");
final boolean isLevelValid = "cluster".equalsIgnoreCase(level)
|| "indices".equalsIgnoreCase(level)
|| "shards".equalsIgnoreCase(level);
if (isLevelValid == false) {
throw new IllegalArgumentException("level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]");
}

builder.startObject("_all");

builder.startObject("primaries");
getPrimaries().toXContent(builder, params);
builder.endObject();

builder.startObject("total");
getTotal().toXContent(builder, params);
builder.endObject();

builder.endObject();

if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) {
builder.startObject(Fields.INDICES);
for (IndexStats indexStats : getIndices().values()) {
return Iterators.concat(Iterators.single(((builder, p) -> {
headerAndCommonStats(builder, p);
return builder.startObject(Fields.INDICES);
})), getIndices().values().stream().<ToXContent>map(indexStats -> (builder, p) -> {
builder.startObject(indexStats.getIndex());
builder.field("uuid", indexStats.getUuid());
if (indexStats.getHealth() != null) {
Expand All @@ -204,11 +198,11 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t
builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT));
}
builder.startObject("primaries");
indexStats.getPrimaries().toXContent(builder, params);
indexStats.getPrimaries().toXContent(builder, p);
builder.endObject();

builder.startObject("total");
indexStats.getTotal().toXContent(builder, params);
indexStats.getTotal().toXContent(builder, p);
builder.endObject();

if ("shards".equalsIgnoreCase(level)) {
Expand All @@ -217,17 +211,36 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t
builder.startArray(Integer.toString(indexShardStats.getShardId().id()));
for (ShardStats shardStats : indexShardStats) {
builder.startObject();
shardStats.toXContent(builder, params);
shardStats.toXContent(builder, p);
builder.endObject();
}
builder.endArray();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder.endObject();
}).iterator(), Iterators.single((b, p) -> b.endObject().endObject()));
}
return Iterators.single((b, p) -> {
headerAndCommonStats(b, p);
return b.endObject();
});
}

private void headerAndCommonStats(XContentBuilder builder, ToXContent.Params p) throws IOException {
builder.startObject();
RestActions.buildBroadcastShardsHeader(builder, p, this);
builder.startObject("_all");

builder.startObject("primaries");
getPrimaries().toXContent(builder, p);
builder.endObject();

builder.startObject("total");
getTotal().toXContent(builder, p);
builder.endObject();

builder.endObject();
}

static final class Fields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.rest.action.document.RestMultiTermVectorsAction;

import java.io.IOException;
Expand Down Expand Up @@ -140,7 +140,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
.indices()
.stats(indicesStatsRequest, new RestToXContentListener<>(channel));
.stats(indicesStatsRequest, new RestChunkedToXContentListener<>(channel));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -41,7 +45,7 @@ public void testInvalidLevel() {
final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level));
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> response.toXContent(JsonXContent.contentBuilder(), params)
() -> response.toXContentChunked(params).next().toXContent(JsonXContent.contentBuilder(), params)
);
assertThat(
e,
Expand All @@ -64,7 +68,7 @@ public void testGetIndices() {
ShardId shId = new ShardId(index, shardId);
Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardId));
ShardPath shardPath = new ShardPath(false, path, path, shId);
ShardRouting routing = createShardRouting(index, shId, (shardId == 0));
ShardRouting routing = createShardRouting(shId, (shardId == 0));
shards.add(new ShardStats(routing, shardPath, null, null, null, null));
AtomicLong primaryShardsCounter = expectedIndexToPrimaryShardsCount.computeIfAbsent(
index.getName(),
Expand Down Expand Up @@ -105,7 +109,45 @@ public void testGetIndices() {
}
}

private ShardRouting createShardRouting(Index index, ShardId shardId, boolean isPrimary) {
public void testChunkedEncodingPerIndex() throws IOException {
final int shards = randomIntBetween(1, 10);
final List<ShardStats> stats = new ArrayList<>(shards);
for (int i = 0; i < shards; i++) {
ShardId shId = new ShardId(createIndex("index-" + i), randomIntBetween(0, 1));
Path path = createTempDir().resolve("indices").resolve(shId.getIndex().getUUID()).resolve(String.valueOf(shId.id()));
ShardPath shardPath = new ShardPath(false, path, path, shId);
ShardRouting routing = createShardRouting(shId, (shId.id() == 0));
stats.add(new ShardStats(routing, shardPath, new CommonStats(), null, null, null));
}
final IndicesStatsResponse indicesStatsResponse = new IndicesStatsResponse(
stats.toArray(new ShardStats[0]),
shards,
shards,
0,
null,
ClusterState.EMPTY_STATE
);
final ToXContent.Params paramsClusterLevel = new ToXContent.MapParams(Map.of("level", "cluster"));
final var iteratorClusterLevel = indicesStatsResponse.toXContentChunked(paramsClusterLevel);
int chunksSeenClusterLevel = 0;
final XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), Streams.NULL_OUTPUT_STREAM);
while (iteratorClusterLevel.hasNext()) {
iteratorClusterLevel.next().toXContent(builder, paramsClusterLevel);
chunksSeenClusterLevel++;
}
assertEquals(1, chunksSeenClusterLevel);

final ToXContent.Params paramsIndexLevel = new ToXContent.MapParams(Map.of("level", "indices"));
final var iteratorIndexLevel = indicesStatsResponse.toXContentChunked(paramsIndexLevel);
int chunksSeenIndexLevel = 0;
while (iteratorIndexLevel.hasNext()) {
iteratorIndexLevel.next().toXContent(builder, paramsIndexLevel);
chunksSeenIndexLevel++;
}
assertEquals(2 + shards, chunksSeenIndexLevel);
}

private ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) {
return TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(4), isPrimary, ShardRoutingState.STARTED);
}

Expand Down