Skip to content

Chunked encoding for cluster reroute API #92615

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,32 @@

package org.elasticsearch.action.admin.cluster.reroute;

import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.IsAcknowledgedSupplier;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;

import static org.elasticsearch.action.support.master.AcknowledgedResponse.ACKNOWLEDGED_KEY;

/**
* Response returned after a cluster reroute request
*/
public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXContentObject {
public class ClusterRerouteResponse extends ActionResponse implements IsAcknowledgedSupplier, ChunkedToXContentObject {

private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestSearchAction.class);
public static final String STATE_FIELD_DEPRECATION_MESSAGE = "The [state] field in the response to the reroute API is deprecated "
Expand All @@ -38,15 +44,17 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC
*/
private final ClusterState state;
private final RoutingExplanations explanations;
private final boolean acknowledged;

ClusterRerouteResponse(StreamInput in) throws IOException {
super(in);
acknowledged = in.readBoolean();
state = ClusterState.readFrom(in, null);
explanations = RoutingExplanations.readFrom(in);
}

ClusterRerouteResponse(boolean acknowledged, ClusterState state, RoutingExplanations explanations) {
super(acknowledged);
this.acknowledged = acknowledged;
this.state = state;
this.explanations = explanations;
}
Expand All @@ -62,27 +70,45 @@ public RoutingExplanations getExplanations() {
return this.explanations;
}

@Override
public final boolean isAcknowledged() {
return acknowledged;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
state.writeTo(out);
RoutingExplanations.writeTo(explanations, out);
}

private boolean emitState(ToXContent.Params params) {
return Objects.equals(params.param("metric"), "none") == false;
}

@Override
protected void addCustomFields(XContentBuilder builder, Params params) throws IOException {
if (Objects.equals(params.param("metric"), "none") == false) {
if (builder.getRestApiVersion() != RestApiVersion.V_7) {
deprecationLogger.critical(DeprecationCategory.API, "reroute_cluster_state", STATE_FIELD_DEPRECATION_MESSAGE);
}
builder.startObject("state");
// TODO this should be chunked, see #89838
ChunkedToXContent.wrapAsToXContent(state).toXContent(builder, params);
builder.endObject();
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
if (emitState(outerParams)) {
deprecationLogger.critical(DeprecationCategory.API, "reroute_cluster_state", STATE_FIELD_DEPRECATION_MESSAGE);
}
return toXContentChunkedV7(outerParams);
}

if (params.paramAsBoolean("explain", false)) {
explanations.toXContent(builder, params);
}
@Override
public Iterator<? extends ToXContent> toXContentChunkedV7(ToXContent.Params outerParams) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startObject().field(ACKNOWLEDGED_KEY, isAcknowledged())),
emitState(outerParams)
? ChunkedToXContentHelper.wrapWithObject("state", state.toXContentChunked(outerParams))
: Collections.emptyIterator(),
Iterators.single((builder, params) -> {
if (params.paramAsBoolean("explain", false)) {
explanations.toXContent(builder, params);
}

builder.endObject();
return builder;
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.action.support.master;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.internal.ElasticsearchClient;
import org.elasticsearch.core.TimeValue;
Expand All @@ -16,7 +17,7 @@
*/
public abstract class AcknowledgedRequestBuilder<
Request extends AcknowledgedRequest<Request>,
Response extends AcknowledgedResponse,
Response extends ActionResponse & IsAcknowledgedSupplier,
RequestBuilder extends AcknowledgedRequestBuilder<Request, Response, RequestBuilder>> extends MasterNodeOperationRequestBuilder<
Request,
Response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
/**
* A response that indicates that a request has been acknowledged
*/
public class AcknowledgedResponse extends ActionResponse implements ToXContentObject {
public class AcknowledgedResponse extends ActionResponse implements IsAcknowledgedSupplier, ToXContentObject {

public static final AcknowledgedResponse TRUE = new AcknowledgedResponse(true);

public static final AcknowledgedResponse FALSE = new AcknowledgedResponse(false);

private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
public static final String ACKNOWLEDGED_KEY = "acknowledged";
private static final ParseField ACKNOWLEDGED = new ParseField(ACKNOWLEDGED_KEY);

protected static <T extends AcknowledgedResponse> void declareAcknowledgedField(ConstructingObjectParser<T, Void> objectParser) {
objectParser.declareField(
Expand Down Expand Up @@ -65,6 +66,7 @@ protected AcknowledgedResponse(boolean acknowledged) {
* Returns whether the response is acknowledged or not
* @return true if the response is acknowledged, false otherwise
*/
@Override
public final boolean isAcknowledged() {
return acknowledged;
}
Expand All @@ -77,7 +79,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ACKNOWLEDGED.getPreferredName(), isAcknowledged());
builder.field(ACKNOWLEDGED_KEY, isAcknowledged());
addCustomFields(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.support.master;

public interface IsAcknowledgedSupplier {
boolean isAcknowledged();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.common.xcontent;

import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -16,21 +17,46 @@
import java.util.Iterator;

/**
* An extension of {@link ToXContent} that can be serialized in chunks by creating an {@link Iterator<ToXContent>}.
* This is used by the REST layer to implement flow control that does not rely on blocking the serializing thread when writing the
* serialized bytes to a non-blocking channel.
* An alternative to {@link ToXContent} allowing for progressive serialization by creating an {@link Iterator} of {@link ToXContent} chunks.
* <p>
* The REST layer only serializes enough chunks at once to keep an outbound buffer full, rather than consuming all the time and memory
* needed to serialize the entire response as must be done with the regular {@link ToXContent} responses.
*/
public interface ChunkedToXContent {

/**
* Create an iterator of {@link ToXContent} chunks, that must be serialized individually with the same {@link XContentBuilder} and
* {@link ToXContent.Params} for each call until it is fully drained.
* Create an iterator of {@link ToXContent} chunks for a REST response. Each chunk is serialized with the same {@link XContentBuilder}
* and {@link ToXContent.Params}, which is also the same as the {@link ToXContent.Params} passed as the {@code params} argument. For
* best results, all chunks should be {@code O(1)} size. See also {@link ChunkedToXContentHelper} for some handy utilities.
* <p>
* Note that chunked response bodies cannot send deprecation warning headers once transmission has started, so implementations must
* check for deprecated feature use before returning.
*
* @return iterator over chunks of {@link ToXContent}
*/
Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params);

/**
* Create an iterator of {@link ToXContent} chunks for a response to the {@link RestApiVersion#V_7} API. Each chunk is serialized with
* the same {@link XContentBuilder} and {@link ToXContent.Params}, which is also the same as the {@link ToXContent.Params} passed as the
* {@code params} argument. For best results, all chunks should be {@code O(1)} size. See also {@link ChunkedToXContentHelper} for some
* handy utilities.
* <p>
* Similar to {@link #toXContentChunked} but for the {@link RestApiVersion#V_7} API. By default this method delegates to {@link
* #toXContentChunked}.
* <p>
* Note that chunked response bodies cannot send deprecation warning headers once transmission has started, so implementations must
* check for deprecated feature use before returning.
*
* @return iterator over chunks of {@link ToXContent}
*/
default Iterator<? extends ToXContent> toXContentChunkedV7(ToXContent.Params params) {
return toXContentChunked(params);
}

/**
* Wraps the given instance in a {@link ToXContent} that will fully serialize the instance when serialized.
*
* @param chunkedToXContent instance to wrap
* @return x-content instance
*/
Expand All @@ -53,7 +79,7 @@ public boolean isFragment() {
}

/**
* @return true if this instances serializes as an x-content fragment. See {@link ToXContentObject} for additional details.
* @return true iff this instance serializes as a fragment. See {@link ToXContentObject} for additional details.
*/
default boolean isFragment() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -82,7 +83,9 @@ public void write(byte[] b, int off, int len) throws IOException {
Streams.noCloseStream(out)
);

private final Iterator<? extends ToXContent> serialization = chunkedToXContent.toXContentChunked(params);
private final Iterator<? extends ToXContent> serialization = builder.getRestApiVersion() == RestApiVersion.V_7
? chunkedToXContent.toXContentChunkedV7(params)
: chunkedToXContent.toXContentChunked(params);

private BytesStream target;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ObjectParser.ValueType;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -82,7 +82,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
if (metric == null) {
request.params().put("metric", DEFAULT_METRICS);
}
return channel -> client.admin().cluster().reroute(clusterRerouteRequest, new RestToXContentListener<>(channel));
return channel -> client.admin().cluster().reroute(clusterRerouteRequest, new RestChunkedToXContentListener<>(channel));
}

@Override
Expand Down
Loading