Skip to content

Limit request size #17133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
*/
public interface CircuitBreaker {

public static final String PARENT = "parent";
public static final String FIELDDATA = "fielddata";
public static final String REQUEST = "request";
String PARENT = "parent";
String FIELDDATA = "fielddata";
String REQUEST = "request";
String IN_FLIGHT_REQUESTS = "in_flight_requests";

public static enum Type {
enum Type {
// A regular or child MemoryCircuitBreaker
MEMORY,
// A special parent-type for the hierarchy breaker service
Expand All @@ -59,43 +60,43 @@ public static Type parseValue(String value) {
* @param fieldName name of the field responsible for tripping the breaker
* @param bytesNeeded bytes asked for but unable to be allocated
*/
public void circuitBreak(String fieldName, long bytesNeeded);
void circuitBreak(String fieldName, long bytesNeeded);

/**
* add bytes to the breaker and maybe trip
* @param bytes number of bytes to add
* @param label string label describing the bytes being added
* @return the number of "used" bytes for the circuit breaker
*/
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;
double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;

/**
* Adjust the circuit breaker without tripping
*/
public long addWithoutBreaking(long bytes);
long addWithoutBreaking(long bytes);

/**
* @return the currently used bytes the breaker is tracking
*/
public long getUsed();
long getUsed();

/**
* @return maximum number of bytes the circuit breaker can track before tripping
*/
public long getLimit();
long getLimit();

/**
* @return overhead of circuit breaker
*/
public double getOverhead();
double getOverhead();

/**
* @return the number of times the circuit breaker has been tripped
*/
public long getTrippedCount();
long getTrippedCount();

/**
* @return the name of the breaker
*/
public String getName();
String getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

Expand Down Expand Up @@ -66,6 +67,11 @@ public long getByteLimit() {
return this.byteLimit;
}

@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}

@Override
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("bytes_wanted", bytesWanted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void circuitBreak(String fieldName, long bytesNeeded) throws CircuitBreak
final String message = "Data too large, data for field [" + fieldName + "] would be larger than limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
logger.debug("{}", message);
throw new CircuitBreakingException(message);
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* basically noops
*/
public class NoopCircuitBreaker implements CircuitBreaker {
public static final int LIMIT = -1;

private final String name;

Expand Down Expand Up @@ -53,7 +54,7 @@ public long getUsed() {

@Override
public long getLimit() {
return 0;
return LIMIT;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ public void apply(Settings value, Settings current, Settings previous) {
HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING,
ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
Expand Down
92 changes: 83 additions & 9 deletions core/src/main/java/org/elasticsearch/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,29 @@

package org.elasticsearch.http;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
Expand All @@ -43,24 +50,22 @@
* A component to serve http requests, backed by rest handlers.
*/
public class HttpServer extends AbstractLifecycleComponent<HttpServer> implements HttpServerAdapter {

private final Environment environment;

private final HttpServerTransport transport;

private final RestController restController;

private final NodeService nodeService;

private final CircuitBreakerService circuitBreakerService;

@Inject
public HttpServer(Settings settings, Environment environment, HttpServerTransport transport,
RestController restController,
NodeService nodeService) {
public HttpServer(Settings settings, HttpServerTransport transport, RestController restController, NodeService nodeService,
CircuitBreakerService circuitBreakerService) {
super(settings);
this.environment = environment;
this.transport = transport;
this.restController = restController;
this.nodeService = nodeService;
this.circuitBreakerService = circuitBreakerService;
nodeService.setHttpServer(this);
transport.httpServerAdapter(this);
}
Expand Down Expand Up @@ -99,7 +104,15 @@ public void dispatchRequest(RestRequest request, RestChannel channel, ThreadCont
handleFavicon(request, channel);
return;
}
restController.dispatchRequest(request, channel, threadContext);
RestChannel responseChannel = channel;
try {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(request.content().length(), "<http_request>");
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService);
restController.dispatchRequest(request, responseChannel, threadContext);
} catch (Throwable t) {
restController.sendErrorResponse(request, responseChannel, t);
}
}

void handleFavicon(RestRequest request, RestChannel channel) {
Expand All @@ -118,4 +131,65 @@ void handleFavicon(RestRequest request, RestChannel channel) {
channel.sendResponse(new BytesRestResponse(FORBIDDEN));
}
}

private static final class ResourceHandlingHttpChannel implements RestChannel {
private final RestChannel delegate;
private final CircuitBreakerService circuitBreakerService;
private final AtomicBoolean closed = new AtomicBoolean();

public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService) {
this.delegate = delegate;
this.circuitBreakerService = circuitBreakerService;
}

@Override
public XContentBuilder newBuilder() throws IOException {
return delegate.newBuilder();
}

@Override
public XContentBuilder newErrorBuilder() throws IOException {
return delegate.newErrorBuilder();
}

@Override
public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException {
return delegate.newBuilder(autoDetectSource, useFiltering);
}

@Override
public BytesStreamOutput bytesOutput() {
return delegate.bytesOutput();
}

@Override
public RestRequest request() {
return delegate.request();
}

@Override
public boolean detailedErrorsEnabled() {
return delegate.detailedErrorsEnabled();
}

@Override
public void sendResponse(RestResponse response) {
close();
delegate.sendResponse(response);
}

private void close() {
// attempt to close once atomically
if (closed.compareAndSet(false, true) == false) {
throw new IllegalStateException("Channel is already closed");
}
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-request().content().length());
}

}

private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) {
// We always obtain a fresh breaker to reflect changes to the breaker configuration.
return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
// when reading, or using a cumalation buffer
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());
if (oue != null) {
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled));
} else {
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, detailedErrorsEnabled));
}
NettyHttpChannel channel = new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled);
serverTransport.dispatchRequest(httpRequest, channel);
super.messageReceived(ctx, e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.http.netty;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
Expand Down Expand Up @@ -58,19 +59,22 @@ public final class NettyHttpChannel extends AbstractRestChannel {
private final NettyHttpServerTransport transport;
private final Channel channel;
private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest;
private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null;

private final OrderedUpstreamMessageEvent orderedUpstreamMessageEvent;

/**
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel.
* @param orderedUpstreamMessageEvent If HTTP pipelining is enabled provide the corresponding Netty upstream event. May be null if
* HTTP pipelining is disabled.
* @param detailedErrorsEnabled true iff error messages should include stack traces.
*/
public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request,
@Nullable OrderedUpstreamMessageEvent orderedUpstreamMessageEvent,
boolean detailedErrorsEnabled) {
super(request, detailedErrorsEnabled);
this.transport = transport;
this.channel = request.getChannel();
this.nettyRequest = request.request();
}

public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request,
OrderedUpstreamMessageEvent orderedUpstreamMessageEvent, boolean detailedErrorsEnabled) {
this(transport, request, detailedErrorsEnabled);
this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {

private static final String CHILD_LOGGER_PREFIX = "org.elasticsearch.indices.breaker.";

private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap();
private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();

public static final Setting<ByteSizeValue> TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.byteSizeSetting("indices.breaker.total.limit", "70%", Property.Dynamic, Property.NodeScope);
Expand All @@ -64,10 +64,16 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public static final Setting<CircuitBreaker.Type> REQUEST_CIRCUIT_BREAKER_TYPE_SETTING =
new Setting<>("indices.breaker.request.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);


public static final Setting<ByteSizeValue> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.byteSizeSetting("network.breaker.inflight_requests.limit", "100%", Property.Dynamic, Property.NodeScope);
public static final Setting<Double> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING =
Setting.doubleSetting("network.breaker.inflight_requests.overhead", 1.0d, 0.0d, Property.Dynamic, Property.NodeScope);
public static final Setting<CircuitBreaker.Type> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING =
new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);

private volatile BreakerSettings parentSettings;
private volatile BreakerSettings fielddataSettings;
private volatile BreakerSettings inFlightRequestsSettings;
private volatile BreakerSettings requestSettings;

// Tripped count for when redistribution was attempted but wasn't successful
Expand All @@ -82,6 +88,12 @@ public HierarchyCircuitBreakerService(Settings settings, ClusterSettings cluster
FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
);

this.inFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS,
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(),
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
);

this.requestSettings = new BreakerSettings(CircuitBreaker.REQUEST,
REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(),
REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
Expand All @@ -95,11 +107,14 @@ public HierarchyCircuitBreakerService(Settings settings, ClusterSettings cluster

registerBreaker(this.requestSettings);
registerBreaker(this.fielddataSettings);
registerBreaker(this.inFlightRequestsSettings);

clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit);
clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setFieldDataBreakerLimit);
clusterSettings.addSettingsUpdateConsumer(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit);
clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setRequestBreakerLimit);
}

private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newRequestOverhead) {
BreakerSettings newRequestSettings = new BreakerSettings(CircuitBreaker.REQUEST, newRequestMax.bytes(), newRequestOverhead,
HierarchyCircuitBreakerService.this.requestSettings.getType());
Expand All @@ -108,6 +123,14 @@ private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newReque
logger.info("Updated breaker settings request: {}", newRequestSettings);
}

private void setInFlightRequestsBreakerLimit(ByteSizeValue newInFlightRequestsMax, Double newInFlightRequestsOverhead) {
BreakerSettings newInFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, newInFlightRequestsMax.bytes(),
newInFlightRequestsOverhead, HierarchyCircuitBreakerService.this.inFlightRequestsSettings.getType());
registerBreaker(newInFlightRequestsSettings);
HierarchyCircuitBreakerService.this.inFlightRequestsSettings = newInFlightRequestsSettings;
logger.info("Updated breaker settings for in-flight requests: {}", newInFlightRequestsSettings);
}

private void setFieldDataBreakerLimit(ByteSizeValue newFielddataMax, Double newFielddataOverhead) {
long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes();
newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead;
Expand Down
Loading