Skip to content

Exclude specific transport actions from request size limit check #17951

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ClusterName clusterName, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, GatewayAllocator gatewayAllocator) {
super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterHealthRequest::new);
super(settings, ClusterHealthAction.NAME, false, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, ClusterHealthRequest::new);
this.clusterName = clusterName;
this.gatewayAllocator = gatewayAllocator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
@Inject
public TransportClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ClusterName clusterName, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ClusterStateAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterStateRequest::new);
super(settings, ClusterStateAction.NAME, false, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterStateRequest::new);
this.clusterName = clusterName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,18 @@
*/
public abstract class HandledTransportAction<Request extends ActionRequest<Request>, Response extends ActionResponse>
extends TransportAction<Request, Response> {
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request) {
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
}

protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker,
new TransportHandler());
}

class TransportHandler implements TransportRequestHandler<Request> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
protected TransportMasterNodeAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this(settings, actionName, true, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, request);
}

protected TransportMasterNodeAction(Settings settings, String actionName, boolean canTripCircuitBreaker,
TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request) {
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver,
request);
this.transportService = transportService;
this.clusterService = clusterService;
this.executor = executor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ public abstract class TransportMasterNodeReadAction<Request extends MasterNodeRe
protected TransportMasterNodeReadAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request);
this(settings, actionName, true, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request);
}

protected TransportMasterNodeReadAction(Settings settings, String actionName, boolean checkSizeLimit, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, checkSizeLimit, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver,request);
this.forceLocal = FORCE_LOCAL_SETTING.get(settings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ protected TransportNodesAction(Settings settings, String actionName, ClusterName

this.transportNodeAction = actionName + "[n]";

transportService.registerRequestHandler(transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler());
transportService.registerRequestHandler(
transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
final private WriteConsistencyLevel defaultWriteConsistencyLevel;
final private TransportRequestOptions transportOptions;

final private String transportReplicaAction;
final private String transportPrimaryAction;
// package private for testing
final String transportReplicaAction;
final String transportPrimaryAction;
final private ReplicasProxy replicasProxy;

protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
Expand All @@ -113,7 +114,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, request, executor, new PrimaryOperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true,
transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielmitterdorfer One thing that we do with TransportReplicationAction is that we ensure that if a replication action is successful on a primary, it will not be subjected to queue length limits on replicas (you don't want request on replicas to be subjected to EsRejectedExecutionException). That's what the first true parameter corresponding to forceExecution here is doing. But it seems like we are subjecting the replication requests to the size limits and I think that that might be wrong, or at least inconsistent with how we manage the queues for replication requests. Can you clarify?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasontedor This change just ensured that I don't change the previous behavior (i.e. leave limiting enabled). With your explanation it makes sense to exclude this action from request size limiting. I'll change that.

new ReplicaOperationTransportHandler());

this.transportOptions = transportOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa

this.joinThreadControl = new JoinThreadControl(threadPool);

transportService.registerRequestHandler(DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
transportService.registerRequestHandler(
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportS

logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);

transportService.registerRequestHandler(MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, new MasterPingRequestHandler());
transportService.registerRequestHandler(
MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());
}

public DiscoveryNode masterNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportSe

logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);

transportService.registerRequestHandler(PING_ACTION_NAME, PingRequest::new, ThreadPool.Names.SAME, new PingRequestHandler());
transportService.registerRequestHandler(
PING_ACTION_NAME, PingRequest::new, ThreadPool.Names.SAME, false, false, new PingRequestHandler());
}

public void setLocalNode(DiscoveryNode localNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
private final String action;
private final TransportRequestHandler<Request> handler;
private final boolean forceExecution;
private final boolean canTripCircuitBreaker;
private final String executor;
private final Supplier<Request> requestFactory;
private final TaskManager taskManager;

public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager,
TransportRequestHandler<Request> handler, String executor, boolean forceExecution) {
TransportRequestHandler<Request> handler, String executor, boolean forceExecution,
boolean canTripCircuitBreaker) {
this.action = action;
this.requestFactory = requestFactory;
assert newRequest() != null;
this.handler = handler;
this.forceExecution = forceExecution;
this.canTripCircuitBreaker = canTripCircuitBreaker;
this.executor = executor;
this.taskManager = taskManager;
}
Expand Down Expand Up @@ -77,6 +80,10 @@ public boolean isForceExecution() {
return forceExecution;
}

public boolean canTripCircuitBreaker() {
return canTripCircuitBreaker;
}

public String getExecutor() {
return executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,23 +580,27 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor,
TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestFactory, taskManager, handler, executor, false, true);
registerRequestHandler(reg);
}

/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param handler The handler itself that implements the request handling
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request,
String executor, boolean forceExecution,
boolean canTripCircuitBreaker,
TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,16 @@ private void handleRequest(StreamInput stream, long requestId, int messageLength
Version version) throws Exception {
stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry);
final String action = stream.readString();
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
transportServiceAdapter.onRequestReceived(requestId, action);
inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
if (reg != null && reg.canTripCircuitBreaker()) {
inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
} else {
inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
}
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action,
requestId, version, messageLengthBytes);
try {
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,17 @@ protected String handleRequest(Channel channel, Marker marker, StreamInput buffe
transportServiceAdapter.onRequestReceived(requestId, action);
NettyTransportChannel transportChannel = null;
try {
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName, messageLengthBytes);

final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException(action);
}
if (reg.canTripCircuitBreaker()) {
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
} else {
transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
}
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName, messageLengthBytes);
final TransportRequest request = reg.newRequest();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
Expand Down
Loading