Skip to content

Commit cfc981a

Browse files
Chunked encoding for snapshot status API (#90801)
Simple implementation of chunked encoding for the snapshot status API. Tested with 100 snapshots of 25k shards (all in-progress) where it can produce the 1G+ response in less than 10s.
1 parent f2f9a1f commit cfc981a

File tree

5 files changed

+73
-42
lines changed

5 files changed

+73
-42
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,25 @@
1111
import org.elasticsearch.cluster.SnapshotsInProgress;
1212
import org.elasticsearch.cluster.SnapshotsInProgress.State;
1313
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.collect.Iterators;
1415
import org.elasticsearch.common.io.stream.StreamInput;
1516
import org.elasticsearch.common.io.stream.StreamOutput;
1617
import org.elasticsearch.common.io.stream.Writeable;
1718
import org.elasticsearch.common.util.Maps;
19+
import org.elasticsearch.common.xcontent.ChunkedToXContent;
1820
import org.elasticsearch.core.Nullable;
1921
import org.elasticsearch.snapshots.Snapshot;
2022
import org.elasticsearch.snapshots.SnapshotId;
2123
import org.elasticsearch.xcontent.ConstructingObjectParser;
2224
import org.elasticsearch.xcontent.ObjectParser;
2325
import org.elasticsearch.xcontent.ParseField;
24-
import org.elasticsearch.xcontent.ToXContentObject;
25-
import org.elasticsearch.xcontent.XContentBuilder;
26+
import org.elasticsearch.xcontent.ToXContent;
2627
import org.elasticsearch.xcontent.XContentParser;
2728

2829
import java.io.IOException;
2930
import java.util.ArrayList;
3031
import java.util.HashMap;
32+
import java.util.Iterator;
3133
import java.util.List;
3234
import java.util.Map;
3335
import java.util.Objects;
@@ -41,7 +43,7 @@
4143
/**
4244
* Status of a snapshot
4345
*/
44-
public class SnapshotStatus implements ToXContentObject, Writeable {
46+
public class SnapshotStatus implements ChunkedToXContent, Writeable {
4547

4648
private final Snapshot snapshot;
4749

@@ -188,24 +190,20 @@ public SnapshotStats getStats() {
188190
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";
189191

190192
@Override
191-
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
192-
builder.startObject();
193-
builder.field(SNAPSHOT, snapshot.getSnapshotId().getName());
194-
builder.field(REPOSITORY, snapshot.getRepository());
195-
builder.field(UUID, snapshot.getSnapshotId().getUUID());
196-
builder.field(STATE, state.name());
197-
if (includeGlobalState != null) {
198-
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
199-
}
200-
builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, params);
201-
builder.field(SnapshotStats.Fields.STATS, stats, params);
202-
builder.startObject(INDICES);
203-
for (SnapshotIndexStatus indexStatus : getIndices().values()) {
204-
indexStatus.toXContent(builder, params);
205-
}
206-
builder.endObject();
207-
builder.endObject();
208-
return builder;
193+
public Iterator<? extends ToXContent> toXContentChunked() {
194+
return Iterators.concat(Iterators.single((ToXContent) (b, p) -> {
195+
b.startObject()
196+
.field(SNAPSHOT, snapshot.getSnapshotId().getName())
197+
.field(REPOSITORY, snapshot.getRepository())
198+
.field(UUID, snapshot.getSnapshotId().getUUID())
199+
.field(STATE, state.name());
200+
if (includeGlobalState != null) {
201+
b.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
202+
}
203+
return b.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, p)
204+
.field(SnapshotStats.Fields.STATS, stats, p)
205+
.startObject(INDICES);
206+
}), getIndices().values().iterator(), Iterators.single((b, p) -> b.endObject().endObject()));
209207
}
210208

211209
static final ConstructingObjectParser<SnapshotStatus, Void> PARSER = new ConstructingObjectParser<>(

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,29 @@
99
package org.elasticsearch.action.admin.cluster.snapshots.status;
1010

1111
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.common.collect.Iterators;
1213
import org.elasticsearch.common.io.stream.StreamInput;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.xcontent.ChunkedToXContent;
1416
import org.elasticsearch.xcontent.ConstructingObjectParser;
1517
import org.elasticsearch.xcontent.ParseField;
16-
import org.elasticsearch.xcontent.ToXContentObject;
17-
import org.elasticsearch.xcontent.XContentBuilder;
18+
import org.elasticsearch.xcontent.ToXContent;
1819
import org.elasticsearch.xcontent.XContentParser;
1920

2021
import java.io.IOException;
22+
import java.util.Iterator;
2123
import java.util.List;
2224
import java.util.Objects;
25+
import java.util.Spliterator;
26+
import java.util.Spliterators;
27+
import java.util.stream.StreamSupport;
2328

2429
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
2530

2631
/**
2732
* Snapshot status response
2833
*/
29-
public class SnapshotsStatusResponse extends ActionResponse implements ToXContentObject {
34+
public class SnapshotsStatusResponse extends ActionResponse implements ChunkedToXContent {
3035

3136
private final List<SnapshotStatus> snapshots;
3237

@@ -53,18 +58,6 @@ public void writeTo(StreamOutput out) throws IOException {
5358
out.writeList(snapshots);
5459
}
5560

56-
@Override
57-
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
58-
builder.startObject();
59-
builder.startArray("snapshots");
60-
for (SnapshotStatus snapshot : snapshots) {
61-
snapshot.toXContent(builder, params);
62-
}
63-
builder.endArray();
64-
builder.endObject();
65-
return builder;
66-
}
67-
6861
private static final ConstructingObjectParser<SnapshotsStatusResponse, Void> PARSER = new ConstructingObjectParser<>(
6962
"snapshots_status_response",
7063
true,
@@ -94,4 +87,15 @@ public boolean equals(Object o) {
9487
public int hashCode() {
9588
return snapshots != null ? snapshots.hashCode() : 0;
9689
}
90+
91+
@Override
92+
public Iterator<? extends ToXContent> toXContentChunked() {
93+
return Iterators.concat(
94+
Iterators.single((ToXContent) (b, p) -> b.startObject().startArray("snapshots")),
95+
snapshots.stream()
96+
.flatMap(s -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(s.toXContentChunked(), Spliterator.ORDERED), false))
97+
.iterator(),
98+
Iterators.single((b, p) -> b.endArray().endObject())
99+
);
100+
}
97101
}

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.elasticsearch.rest.BaseRestHandler;
1515
import org.elasticsearch.rest.RestRequest;
1616
import org.elasticsearch.rest.action.RestCancellableNodeClient;
17-
import org.elasticsearch.rest.action.RestToXContentListener;
17+
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
1818

1919
import java.io.IOException;
2020
import java.util.List;
@@ -54,6 +54,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
5454
snapshotsStatusRequest.masterNodeTimeout(request.paramAsTime("master_timeout", snapshotsStatusRequest.masterNodeTimeout()));
5555
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
5656
.cluster()
57-
.snapshotsStatus(snapshotsStatusRequest, new RestToXContentListener<>(channel));
57+
.snapshotsStatus(snapshotsStatusRequest, new RestChunkedToXContentListener<>(channel));
5858
}
5959
}

server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,19 @@
1010

1111
import org.elasticsearch.cluster.SnapshotsInProgress;
1212
import org.elasticsearch.common.UUIDs;
13+
import org.elasticsearch.common.io.stream.Writeable;
1314
import org.elasticsearch.index.shard.ShardId;
1415
import org.elasticsearch.snapshots.Snapshot;
1516
import org.elasticsearch.snapshots.SnapshotId;
16-
import org.elasticsearch.test.AbstractXContentTestCase;
17+
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
1718
import org.elasticsearch.xcontent.XContentParser;
1819

1920
import java.io.IOException;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.function.Predicate;
2324

24-
public class SnapshotStatusTests extends AbstractXContentTestCase<SnapshotStatus> {
25+
public class SnapshotStatusTests extends AbstractChunkedSerializingTestCase<SnapshotStatus> {
2526

2627
public void testToString() throws Exception {
2728
SnapshotsInProgress.State state = randomFrom(SnapshotsInProgress.State.values());
@@ -180,4 +181,9 @@ protected SnapshotStatus doParseInstance(XContentParser parser) throws IOExcepti
180181
protected boolean supportsUnknownFields() {
181182
return true;
182183
}
184+
185+
@Override
186+
protected Writeable.Reader<SnapshotStatus> instanceReader() {
187+
return SnapshotStatus::new;
188+
}
183189
}

server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponseTests.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@
88

99
package org.elasticsearch.action.admin.cluster.snapshots.status;
1010

11-
import org.elasticsearch.test.AbstractXContentTestCase;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
1213
import org.elasticsearch.xcontent.XContentParser;
1314

1415
import java.io.IOException;
1516
import java.util.ArrayList;
1617
import java.util.List;
1718
import java.util.function.Predicate;
1819

19-
public class SnapshotsStatusResponseTests extends AbstractXContentTestCase<SnapshotsStatusResponse> {
20+
public class SnapshotsStatusResponseTests extends AbstractChunkedSerializingTestCase<SnapshotsStatusResponse> {
2021

2122
@Override
2223
protected SnapshotsStatusResponse doParseInstance(XContentParser parser) throws IOException {
@@ -43,4 +44,26 @@ protected SnapshotsStatusResponse createTestInstance() {
4344
}
4445
return new SnapshotsStatusResponse(snapshotStatuses);
4546
}
47+
48+
@Override
49+
protected Writeable.Reader<SnapshotsStatusResponse> instanceReader() {
50+
return SnapshotsStatusResponse::new;
51+
}
52+
53+
public void testChunkCount() {
54+
final var instance = createTestInstance();
55+
// open and close chunk
56+
int chunksExpected = 2;
57+
for (SnapshotStatus snapshot : instance.getSnapshots()) {
58+
// open and close chunk + one chunk per index
59+
chunksExpected += 2 + snapshot.getIndices().size();
60+
}
61+
final var iterator = instance.toXContentChunked();
62+
int chunksSeen = 0;
63+
while (iterator.hasNext()) {
64+
iterator.next();
65+
chunksSeen++;
66+
}
67+
assertEquals(chunksExpected, chunksSeen);
68+
}
4669
}

0 commit comments

Comments
 (0)