Skip to content

Commit 744ed93

Browse files
authored
Wait for task on master in testGetMappingsCancellation (#91709) (#91916)
1 parent 1312225 commit 744ed93

File tree

3 files changed

+30
-7
lines changed

3 files changed

+30
-7
lines changed

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.elasticsearch.core.TimeValue;
2727
import org.elasticsearch.rest.RestStatus;
2828
import org.elasticsearch.test.ESIntegTestCase;
29-
import org.elasticsearch.test.junit.annotations.TestLogging;
29+
import org.hamcrest.Matchers;
3030

3131
import java.util.EnumSet;
3232
import java.util.concurrent.CancellationException;
@@ -35,11 +35,10 @@
3535
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
3636
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
3737
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
38-
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
38+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefixOnMaster;
3939
import static org.hamcrest.core.IsEqual.equalTo;
4040

4141
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
42-
@TestLogging(value = "org.elasticsearch.tasks.TaskManager:TRACE,org.elasticsearch.test.TaskAssertions:TRACE", reason = "debugging")
4342
public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase {
4443

4544
public void testGetMappingsCancellation() throws Exception {
@@ -77,8 +76,18 @@ private void runTest(String actionName, String endpoint) throws Exception {
7776
final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future));
7877

7978
assertThat(future.isDone(), equalTo(false));
80-
awaitTaskWithPrefix(actionName);
81-
79+
awaitTaskWithPrefixOnMaster(actionName);
80+
// To ensure that the task is executing on master, we wait until the first blocked execution of the task registers its cluster state
81+
// observer for further retries. This ensures that a task is not cancelled before we have started its execution, which could result
82+
// in the task being unregistered and the test not being able to find any cancelled tasks.
83+
assertBusy(
84+
() -> assertThat(
85+
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
86+
.getClusterApplierService()
87+
.getTimeoutClusterStateListenersSize(),
88+
Matchers.greaterThan(0)
89+
)
90+
);
8291
cancellable.cancel();
8392
assertAllCancellableTasksAreCancelled(actionName);
8493

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,4 +654,9 @@ protected boolean applicationMayFail() {
654654
public ClusterApplierRecordingService.Stats getStats() {
655655
return recordingService.getStats();
656656
}
657+
658+
// Exposed only for testing
659+
public int getTimeoutClusterStateListenersSize() {
660+
return timeoutClusterStateListeners.size();
661+
}
657662
}

test/framework/src/main/java/org/elasticsearch/test/TaskAssertions.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,18 @@ public class TaskAssertions {
3232
private TaskAssertions() {}
3333

3434
public static void awaitTaskWithPrefix(String actionPrefix) throws Exception {
35+
awaitTaskWithPrefix(actionPrefix, internalCluster().getInstances(TransportService.class));
36+
}
37+
38+
public static void awaitTaskWithPrefixOnMaster(String actionPrefix) throws Exception {
39+
awaitTaskWithPrefix(actionPrefix, List.of(internalCluster().getCurrentMasterNodeInstance(TransportService.class)));
40+
}
41+
42+
private static void awaitTaskWithPrefix(String actionPrefix, Iterable<TransportService> transportServiceInstances) throws Exception {
3543
logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);
3644

3745
assertBusy(() -> {
38-
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
46+
for (TransportService transportService : transportServiceInstances) {
3947
List<Task> matchingTasks = transportService.getTaskManager()
4048
.getTasks()
4149
.values()
@@ -61,12 +69,13 @@ public static void assertAllCancellableTasksAreCancelled(String actionPrefix) th
6169
assertTrue(taskManager.assertCancellableTaskConsistency());
6270
for (CancellableTask cancellableTask : taskManager.getCancellableTasks().values()) {
6371
if (cancellableTask.getAction().startsWith(actionPrefix)) {
64-
logger.trace("--> found task with prefix [{}] marked as cancelled: [{}]", actionPrefix, cancellableTask);
72+
logger.trace("--> found task with prefix [{}]: [{}]", actionPrefix, cancellableTask);
6573
foundTask = true;
6674
assertTrue(
6775
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
6876
cancellableTask.isCancelled()
6977
);
78+
logger.trace("--> Task with prefix [{}] is marked as cancelled: [{}]", actionPrefix, cancellableTask);
7079
}
7180
}
7281
}

0 commit comments

Comments
 (0)