Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.hamcrest.Matchers;

import java.util.EnumSet;
import java.util.concurrent.CancellationException;
Expand All @@ -35,11 +35,10 @@
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefixOnMaster;
import static org.hamcrest.core.IsEqual.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
@TestLogging(value = "org.elasticsearch.tasks.TaskManager:TRACE,org.elasticsearch.test.TaskAssertions:TRACE", reason = "debugging")
public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase {

public void testGetMappingsCancellation() throws Exception {
Expand Down Expand Up @@ -77,8 +76,18 @@ private void runTest(String actionName, String endpoint) throws Exception {
final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future));

assertThat(future.isDone(), equalTo(false));
awaitTaskWithPrefix(actionName);

awaitTaskWithPrefixOnMaster(actionName);
// To ensure that the task is executing on master, we wait until the first blocked execution of the task registers its cluster state
// observer for further retries. This ensures that a task is not cancelled before we have started its execution, which could result
// in the task being unregistered and the test not being able to find any cancelled tasks.
assertBusy(
() -> assertThat(
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.getClusterApplierService()
.getTimeoutClusterStateListenersSize(),
Matchers.greaterThan(0)
)
);
Comment on lines +79 to +90
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into where else we might need this on master or non-master node receiving the task, and move/merge these asserts if necessary in a followup.

cancellable.cancel();
assertAllCancellableTasksAreCancelled(actionName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,4 +654,9 @@ protected boolean applicationMayFail() {
public ClusterApplierRecordingService.Stats getStats() {
return recordingService.getStats();
}

// Exposed only for testing
public int getTimeoutClusterStateListenersSize() {
return timeoutClusterStateListeners.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,18 @@ public class TaskAssertions {
private TaskAssertions() {}

public static void awaitTaskWithPrefix(String actionPrefix) throws Exception {
awaitTaskWithPrefix(actionPrefix, internalCluster().getInstances(TransportService.class));
}

public static void awaitTaskWithPrefixOnMaster(String actionPrefix) throws Exception {
awaitTaskWithPrefix(actionPrefix, List.of(internalCluster().getCurrentMasterNodeInstance(TransportService.class)));
}

private static void awaitTaskWithPrefix(String actionPrefix, Iterable<TransportService> transportServiceInstances) throws Exception {
logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);

assertBusy(() -> {
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
for (TransportService transportService : transportServiceInstances) {
List<Task> matchingTasks = transportService.getTaskManager()
.getTasks()
.values()
Expand All @@ -61,12 +69,13 @@ public static void assertAllCancellableTasksAreCancelled(String actionPrefix) th
assertTrue(taskManager.assertCancellableTaskConsistency());
for (CancellableTask cancellableTask : taskManager.getCancellableTasks().values()) {
if (cancellableTask.getAction().startsWith(actionPrefix)) {
logger.trace("--> found task with prefix [{}] marked as cancelled: [{}]", actionPrefix, cancellableTask);
logger.trace("--> found task with prefix [{}]: [{}]", actionPrefix, cancellableTask);
foundTask = true;
assertTrue(
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
cancellableTask.isCancelled()
);
logger.trace("--> Task with prefix [{}] is marked as cancelled: [{}]", actionPrefix, cancellableTask);
}
}
}
Expand Down