Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunked encoding for snapshot status API #90801

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -41,7 +43,7 @@
/**
* Status of a snapshot
*/
public class SnapshotStatus implements ToXContentObject, Writeable {
public class SnapshotStatus implements ChunkedToXContent, Writeable {

private final Snapshot snapshot;

Expand Down Expand Up @@ -188,24 +190,20 @@ public SnapshotStats getStats() {
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SNAPSHOT, snapshot.getSnapshotId().getName());
builder.field(REPOSITORY, snapshot.getRepository());
builder.field(UUID, snapshot.getSnapshotId().getUUID());
builder.field(STATE, state.name());
if (includeGlobalState != null) {
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
}
builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, params);
builder.field(SnapshotStats.Fields.STATS, stats, params);
builder.startObject(INDICES);
for (SnapshotIndexStatus indexStatus : getIndices().values()) {
indexStatus.toXContent(builder, params);
}
builder.endObject();
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked() {
return Iterators.concat(Iterators.single((ToXContent) (b, p) -> {
var builder = b.startObject()
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
.field(SNAPSHOT, snapshot.getSnapshotId().getName())
.field(REPOSITORY, snapshot.getRepository())
.field(UUID, snapshot.getSnapshotId().getUUID())
.field(STATE, state.name());
if (includeGlobalState != null) {
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
}
return builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, p)
.field(SnapshotStats.Fields.STATS, stats, p)
.startObject(INDICES);
}), getIndices().values().iterator(), Iterators.single((b, p) -> b.endObject().endObject()));
}

static final ConstructingObjectParser<SnapshotStatus, Void> PARSER = new ConstructingObjectParser<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,29 @@
package org.elasticsearch.action.admin.cluster.snapshots.status;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.StreamSupport;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

/**
* Snapshot status response
*/
public class SnapshotsStatusResponse extends ActionResponse implements ToXContentObject {
public class SnapshotsStatusResponse extends ActionResponse implements ChunkedToXContent {

private final List<SnapshotStatus> snapshots;

Expand All @@ -53,18 +58,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeList(snapshots);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("snapshots");
for (SnapshotStatus snapshot : snapshots) {
snapshot.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

private static final ConstructingObjectParser<SnapshotsStatusResponse, Void> PARSER = new ConstructingObjectParser<>(
"snapshots_status_response",
true,
Expand Down Expand Up @@ -94,4 +87,15 @@ public boolean equals(Object o) {
public int hashCode() {
return snapshots != null ? snapshots.hashCode() : 0;
}

@Override
public Iterator<? extends ToXContent> toXContentChunked() {
return Iterators.concat(
Iterators.single((ToXContent) (b, p) -> b.startObject().startArray("snapshots")),
snapshots.stream()
.flatMap(s -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(s.toXContentChunked(), Spliterator.ORDERED), false))
.iterator(),
Comment on lines +95 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: this seems a pretty useful pattern, maybe we should extract a utility for it now. Kinda surprised this is the first time it appears tbh.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some variations of this here and there. I have a plan for this already in the making :) PR incoming.

Iterators.single((b, p) -> b.endArray().endObject())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -54,6 +54,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
snapshotsStatusRequest.masterNodeTimeout(request.paramAsTime("master_timeout", snapshotsStatusRequest.masterNodeTimeout()));
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
.cluster()
.snapshotsStatus(snapshotsStatusRequest, new RestToXContentListener<>(channel));
.snapshotsStatus(snapshotsStatusRequest, new RestChunkedToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@

import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;

public class SnapshotStatusTests extends AbstractXContentTestCase<SnapshotStatus> {
public class SnapshotStatusTests extends AbstractChunkedSerializingTestCase<SnapshotStatus> {

public void testToString() throws Exception {
SnapshotsInProgress.State state = randomFrom(SnapshotsInProgress.State.values());
Expand Down Expand Up @@ -180,4 +181,9 @@ protected SnapshotStatus doParseInstance(XContentParser parser) throws IOExcepti
protected boolean supportsUnknownFields() {
return true;
}

@Override
protected Writeable.Reader<SnapshotStatus> instanceReader() {
return SnapshotStatus::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@

package org.elasticsearch.action.admin.cluster.snapshots.status;

import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;

public class SnapshotsStatusResponseTests extends AbstractXContentTestCase<SnapshotsStatusResponse> {
public class SnapshotsStatusResponseTests extends AbstractChunkedSerializingTestCase<SnapshotsStatusResponse> {

@Override
protected SnapshotsStatusResponse doParseInstance(XContentParser parser) throws IOException {
Expand All @@ -43,4 +44,26 @@ protected SnapshotsStatusResponse createTestInstance() {
}
return new SnapshotsStatusResponse(snapshotStatuses);
}

@Override
protected Writeable.Reader<SnapshotsStatusResponse> instanceReader() {
return SnapshotsStatusResponse::new;
}

public void testChunkCount() {
final var instance = createTestInstance();
// open and close chunk
int chunksExpected = 2;
for (SnapshotStatus snapshot : instance.getSnapshots()) {
// open and close chunk + one chunk per index
chunksExpected += 2 + snapshot.getIndices().size();
}
final var iterator = instance.toXContentChunked();
int chunksSeen = 0;
while (iterator.hasNext()) {
iterator.next();
chunksSeen++;
}
assertEquals(chunksExpected, chunksSeen);
}
}