Skip to content

Commit c99896e

Browse files
authored
Propagate cancellation in DataTiersUsageTransportAction (#100253)
This action invokes a subsidiary action but does not set up the proper parent/child relationship, so cancellations of the parent task do not propagate to the child. Relates #100230
1 parent c75ba2c commit c99896e

File tree

3 files changed

+146
-1
lines changed

3 files changed

+146
-1
lines changed

docs/changelog/100253.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 100253
2+
summary: Propagate cancellation in `DataTiersUsageTransportAction`
3+
area: Data streams
4+
type: bug
5+
issues: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.rest.action;
9+
10+
import org.apache.http.client.methods.HttpGet;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.action.support.PlainActionFuture;
15+
import org.elasticsearch.action.support.SubscribableListener;
16+
import org.elasticsearch.action.support.TransportAction;
17+
import org.elasticsearch.client.Cancellable;
18+
import org.elasticsearch.client.Request;
19+
import org.elasticsearch.client.Response;
20+
import org.elasticsearch.client.internal.node.NodeClient;
21+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.service.ClusterService;
23+
import org.elasticsearch.common.inject.Inject;
24+
import org.elasticsearch.common.network.NetworkModule;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.plugins.Plugin;
27+
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
28+
import org.elasticsearch.test.ESIntegTestCase;
29+
import org.elasticsearch.test.transport.MockTransportService;
30+
import org.elasticsearch.threadpool.ThreadPool;
31+
import org.elasticsearch.transport.TransportService;
32+
import org.elasticsearch.transport.netty4.Netty4Plugin;
33+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
34+
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
35+
import org.elasticsearch.xpack.core.action.XPackUsageAction;
36+
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
37+
import org.elasticsearch.xpack.core.action.XPackUsageResponse;
38+
39+
import java.nio.file.Path;
40+
import java.util.Arrays;
41+
import java.util.Collection;
42+
import java.util.List;
43+
import java.util.concurrent.CancellationException;
44+
import java.util.concurrent.CountDownLatch;
45+
46+
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
47+
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
48+
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
49+
50+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
51+
public class DataTiersUsageRestCancellationIT extends ESIntegTestCase {
52+
53+
@Override
54+
protected Collection<Class<? extends Plugin>> nodePlugins() {
55+
return Arrays.asList(getTestTransportPlugin(), DataTiersUsageOnlyXPackPlugin.class, MockTransportService.TestPlugin.class);
56+
}
57+
58+
@Override
59+
protected Settings nodeSettings(int ordinal, Settings otherSettings) {
60+
return Settings.builder()
61+
.put(super.nodeSettings(ordinal, otherSettings))
62+
.put(NetworkModule.HTTP_DEFAULT_TYPE_SETTING.getKey(), Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
63+
.build();
64+
}
65+
66+
@Override
67+
protected boolean addMockHttpTransport() {
68+
return false; // enable http
69+
}
70+
71+
public void testCancellation() throws Exception {
72+
internalCluster().startMasterOnlyNode();
73+
internalCluster().startDataOnlyNode();
74+
75+
final CountDownLatch tasksBlockedLatch = new CountDownLatch(1);
76+
final SubscribableListener<Void> nodeStatsRequestsReleaseListener = new SubscribableListener<>();
77+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
78+
((MockTransportService) transportService).addRequestHandlingBehavior(
79+
NodesStatsAction.NAME + "[n]",
80+
(handler, request, channel, task) -> {
81+
tasksBlockedLatch.countDown();
82+
nodeStatsRequestsReleaseListener.addListener(
83+
ActionListener.wrap(ignored -> handler.messageReceived(request, channel, task), e -> {
84+
throw new AssertionError("unexpected", e);
85+
})
86+
);
87+
}
88+
);
89+
}
90+
91+
final Request request = new Request(HttpGet.METHOD_NAME, "/_xpack/usage");
92+
final PlainActionFuture<Response> future = new PlainActionFuture<>();
93+
final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future));
94+
95+
assertFalse(future.isDone());
96+
safeAwait(tasksBlockedLatch); // must wait for the node-level tasks to start to avoid cancelling being handled earlier
97+
cancellable.cancel();
98+
99+
// NB this test works by blocking node-level stats requests; when #100230 is addressed this will need to target a different action.
100+
assertAllCancellableTasksAreCancelled(NodesStatsAction.NAME);
101+
assertAllCancellableTasksAreCancelled(XPackUsageAction.NAME);
102+
103+
nodeStatsRequestsReleaseListener.onResponse(null);
104+
expectThrows(CancellationException.class, future::actionGet);
105+
106+
assertAllTasksHaveFinished(NodesStatsAction.NAME);
107+
assertAllTasksHaveFinished(XPackUsageAction.NAME);
108+
}
109+
110+
public static class DataTiersUsageOnlyXPackPlugin extends LocalStateCompositeXPackPlugin {
111+
public DataTiersUsageOnlyXPackPlugin(Settings settings, Path configPath) {
112+
super(settings, configPath);
113+
}
114+
115+
@Override
116+
protected Class<? extends TransportAction<XPackUsageRequest, XPackUsageResponse>> getUsageAction() {
117+
return DataTiersOnlyTransportXPackUsageAction.class;
118+
}
119+
}
120+
121+
public static class DataTiersOnlyTransportXPackUsageAction extends TransportXPackUsageAction {
122+
@Inject
123+
public DataTiersOnlyTransportXPackUsageAction(
124+
ThreadPool threadPool,
125+
TransportService transportService,
126+
ClusterService clusterService,
127+
ActionFilters actionFilters,
128+
IndexNameExpressionResolver indexNameExpressionResolver,
129+
NodeClient client
130+
) {
131+
super(threadPool, transportService, clusterService, actionFilters, indexNameExpressionResolver, client);
132+
}
133+
134+
@Override
135+
protected List<XPackUsageFeatureAction> usageActions() {
136+
return List.of(XPackUsageFeatureAction.DATA_TIERS);
137+
}
138+
}
139+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTiersUsageTransportAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
1414
import org.elasticsearch.action.support.ActionFilters;
1515
import org.elasticsearch.client.internal.Client;
16+
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
1617
import org.elasticsearch.cluster.ClusterState;
1718
import org.elasticsearch.cluster.metadata.IndexMetadata;
1819
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -74,7 +75,7 @@ protected void masterOperation(
7475
ClusterState state,
7576
ActionListener<XPackUsageFeatureResponse> listener
7677
) {
77-
client.admin()
78+
new ParentTaskAssigningClient(client, clusterService.localNode(), task).admin()
7879
.cluster()
7980
.prepareNodesStats()
8081
.all()

0 commit comments

Comments
 (0)