Skip to content

Propagate cancellation in DataTiersUsageTransportAction #100253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/100253.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100253
summary: Propagate cancellation in `DataTiersUsageTransportAction`
area: Data streams
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.rest.action;

import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
import org.elasticsearch.xpack.core.action.XPackUsageAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageResponse;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class DataTiersUsageRestCancellationIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(getTestTransportPlugin(), DataTiersUsageOnlyXPackPlugin.class, MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int ordinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(ordinal, otherSettings))
.put(NetworkModule.HTTP_DEFAULT_TYPE_SETTING.getKey(), Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
.build();
}

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

public void testCancellation() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();

final CountDownLatch tasksBlockedLatch = new CountDownLatch(1);
final SubscribableListener<Void> nodeStatsRequestsReleaseListener = new SubscribableListener<>();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
((MockTransportService) transportService).addRequestHandlingBehavior(
NodesStatsAction.NAME + "[n]",
(handler, request, channel, task) -> {
tasksBlockedLatch.countDown();
nodeStatsRequestsReleaseListener.addListener(
ActionListener.wrap(ignored -> handler.messageReceived(request, channel, task), e -> {
throw new AssertionError("unexpected", e);
})
);
}
);
}

final Request request = new Request(HttpGet.METHOD_NAME, "/_xpack/usage");
final PlainActionFuture<Response> future = new PlainActionFuture<>();
final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future));

assertFalse(future.isDone());
safeAwait(tasksBlockedLatch); // must wait for the node-level tasks to start to avoid cancelling being handled earlier
cancellable.cancel();

// NB this test works by blocking node-level stats requests; when #100230 is addressed this will need to target a different action.
assertAllCancellableTasksAreCancelled(NodesStatsAction.NAME);
assertAllCancellableTasksAreCancelled(XPackUsageAction.NAME);

nodeStatsRequestsReleaseListener.onResponse(null);
expectThrows(CancellationException.class, future::actionGet);

assertAllTasksHaveFinished(NodesStatsAction.NAME);
assertAllTasksHaveFinished(XPackUsageAction.NAME);
}

public static class DataTiersUsageOnlyXPackPlugin extends LocalStateCompositeXPackPlugin {
public DataTiersUsageOnlyXPackPlugin(Settings settings, Path configPath) {
super(settings, configPath);
}

@Override
protected Class<? extends TransportAction<XPackUsageRequest, XPackUsageResponse>> getUsageAction() {
return DataTiersOnlyTransportXPackUsageAction.class;
}
}

public static class DataTiersOnlyTransportXPackUsageAction extends TransportXPackUsageAction {
@Inject
public DataTiersOnlyTransportXPackUsageAction(
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
NodeClient client
) {
super(threadPool, transportService, clusterService, actionFilters, indexNameExpressionResolver, client);
}

@Override
protected List<XPackUsageFeatureAction> usageActions() {
return List.of(XPackUsageFeatureAction.DATA_TIERS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -74,7 +75,7 @@ protected void masterOperation(
ClusterState state,
ActionListener<XPackUsageFeatureResponse> listener
) {
client.admin()
new ParentTaskAssigningClient(client, clusterService.localNode(), task).admin()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL about ParentTaskAssigningClient

.cluster()
.prepareNodesStats()
.all()
Expand Down