Skip to content

Commit 53e3e7f

Browse files
authored
Avoid transport_worker thread in TransportBroadcastAction (#98001)
`TransportBroadcastAction` derivatives do work which scales as the number of shards in the cluster, both during coordination and when processing responses. We must therefore not do this work on `transport_worker` threads. Relates #97920
1 parent 3a2a1c2 commit 53e3e7f

File tree

3 files changed

+49
-57
lines changed

3 files changed

+49
-57
lines changed

docs/changelog/98001.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 98001
2+
summary: Avoid `transport_worker` thread in `TransportBroadcastAction`
3+
area: Distributed
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.ActionRunnable;
1414
import org.elasticsearch.action.NoShardAvailableActionException;
1515
import org.elasticsearch.action.support.ActionFilters;
16+
import org.elasticsearch.action.support.ChannelActionListener;
1617
import org.elasticsearch.action.support.HandledTransportAction;
1718
import org.elasticsearch.action.support.TransportActions;
1819
import org.elasticsearch.cluster.ClusterState;
@@ -26,14 +27,15 @@
2627
import org.elasticsearch.cluster.service.ClusterService;
2728
import org.elasticsearch.common.io.stream.StreamInput;
2829
import org.elasticsearch.common.io.stream.Writeable;
30+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2931
import org.elasticsearch.core.Nullable;
3032
import org.elasticsearch.tasks.Task;
3133
import org.elasticsearch.threadpool.ThreadPool;
32-
import org.elasticsearch.transport.TransportChannel;
33-
import org.elasticsearch.transport.TransportRequestHandler;
3434
import org.elasticsearch.transport.TransportService;
35+
import org.elasticsearch.transport.Transports;
3536

3637
import java.io.IOException;
38+
import java.util.concurrent.Executor;
3739
import java.util.concurrent.atomic.AtomicInteger;
3840
import java.util.concurrent.atomic.AtomicReferenceArray;
3941

@@ -49,31 +51,47 @@ public abstract class TransportBroadcastAction<
4951
protected final TransportService transportService;
5052
protected final IndexNameExpressionResolver indexNameExpressionResolver;
5153

52-
final String transportShardAction;
53-
private final String shardExecutor;
54+
private final String transportShardAction;
55+
private final Executor executor;
5456

5557
protected TransportBroadcastAction(
5658
String actionName,
5759
ClusterService clusterService,
5860
TransportService transportService,
5961
ActionFilters actionFilters,
6062
IndexNameExpressionResolver indexNameExpressionResolver,
61-
Writeable.Reader<Request> request,
62-
Writeable.Reader<ShardRequest> shardRequest,
63-
String shardExecutor
63+
Writeable.Reader<Request> requestReader,
64+
Writeable.Reader<ShardRequest> shardRequestReader,
65+
String executor
6466
) {
65-
super(actionName, transportService, actionFilters, request);
67+
// TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
68+
super(actionName, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
6669
this.clusterService = clusterService;
6770
this.transportService = transportService;
6871
this.indexNameExpressionResolver = indexNameExpressionResolver;
6972
this.transportShardAction = actionName + "[s]";
70-
this.shardExecutor = shardExecutor;
71-
72-
transportService.registerRequestHandler(transportShardAction, ThreadPool.Names.SAME, shardRequest, new ShardTransportHandler());
73+
this.executor = transportService.getThreadPool().executor(executor);
74+
assert this.executor != EsExecutors.DIRECT_EXECUTOR_SERVICE : "O(#shards) work must always fork to an appropriate executor";
75+
76+
transportService.registerRequestHandler(
77+
transportShardAction,
78+
executor,
79+
shardRequestReader,
80+
(request, channel, task) -> ActionListener.completeWith(
81+
new ChannelActionListener<>(channel),
82+
() -> shardOperation(request, task)
83+
)
84+
);
7385
}
7486

7587
@Override
7688
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
89+
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
90+
executor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, listener)));
91+
}
92+
93+
protected void doExecuteForked(Task task, Request request, ActionListener<Response> listener) {
94+
assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor");
7795
new AsyncBroadcastAction(task, request, listener).start();
7896
}
7997

@@ -184,7 +202,7 @@ protected void sendShardRequest(DiscoveryNode node, ShardRequest shardRequest, A
184202
node,
185203
transportShardAction,
186204
shardRequest,
187-
new ActionListenerResponseHandler<>(listener, TransportBroadcastAction.this::readShardResponse)
205+
new ActionListenerResponseHandler<>(listener, TransportBroadcastAction.this::readShardResponse, executor)
188206
);
189207
}
190208

@@ -238,11 +256,8 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
238256
}
239257
}
240258

241-
protected AtomicReferenceArray<Object> shardsResponses() {
242-
return shardsResponses;
243-
}
244-
245259
protected void finishHim() {
260+
assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor");
246261
ActionListener.completeWith(listener, () -> newResponse(request, shardsResponses, clusterState));
247262
}
248263

@@ -269,24 +284,4 @@ void setFailure(ShardIterator shardIt, int shardIndex, Exception e) {
269284
}
270285
}
271286
}
272-
273-
class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {
274-
275-
@Override
276-
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
277-
asyncShardOperation(request, task, ActionListener.wrap(channel::sendResponse, e -> {
278-
try {
279-
channel.sendResponse(e);
280-
} catch (Exception e1) {
281-
logger.warn(() -> format("Failed to send error response for action [%s] and request [%s]", actionName, request), e1);
282-
}
283-
}));
284-
}
285-
}
286-
287-
private void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
288-
transportService.getThreadPool()
289-
.executor(shardExecutor)
290-
.execute(ActionRunnable.supply(listener, () -> shardOperation(request, task)));
291-
}
292287
}

server/src/test/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryActionTests.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
package org.elasticsearch.action.admin.indices.validate.query;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.PlainActionFuture;
13+
import org.elasticsearch.index.IndexNotFoundException;
1214
import org.elasticsearch.test.ESSingleNodeTestCase;
1315

14-
import java.util.concurrent.atomic.AtomicBoolean;
15-
16-
import static org.hamcrest.Matchers.equalTo;
16+
import java.util.concurrent.TimeUnit;
1717

1818
public class TransportValidateQueryActionTests extends ESSingleNodeTestCase {
1919

@@ -24,24 +24,16 @@ public class TransportValidateQueryActionTests extends ESSingleNodeTestCase {
2424
* them garbled together, or trying to write one after the channel had closed, etc.
2525
*/
2626
public void testListenerOnlyInvokedOnceWhenIndexDoesNotExist() {
27-
final AtomicBoolean invoked = new AtomicBoolean();
28-
final ActionListener<ValidateQueryResponse> listener = new ActionListener<>() {
29-
30-
@Override
31-
public void onResponse(final ValidateQueryResponse validateQueryResponse) {
32-
fail("onResponse should not be invoked in this failure case");
33-
}
34-
35-
@Override
36-
public void onFailure(final Exception e) {
37-
if (invoked.compareAndSet(false, true) == false) {
38-
fail("onFailure invoked more than once");
39-
}
40-
}
41-
42-
};
43-
client().admin().indices().validateQuery(new ValidateQueryRequest("non-existent-index"), listener);
44-
assertThat(invoked.get(), equalTo(true)); // ensure that onFailure was invoked
27+
expectThrows(
28+
IndexNotFoundException.class,
29+
() -> PlainActionFuture.<ValidateQueryResponse, RuntimeException>get(
30+
future -> client().admin()
31+
.indices()
32+
.validateQuery(new ValidateQueryRequest("non-existent-index"), ActionListener.assertOnce(future)),
33+
10,
34+
TimeUnit.SECONDS
35+
)
36+
);
4537
}
4638

4739
}

0 commit comments

Comments
 (0)