Skip to content

[Zen2] Fix some rarely-failing tests #35198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,8 @@ public void onFailure(String source, Exception e) {
}

// for tests
boolean hasJoinVoteFrom(DiscoveryNode localNode) {
return coordinationState.get().containsJoinVoteFor(localNode);
boolean hasJoinVoteFrom(DiscoveryNode node) {
return coordinationState.get().containsJoinVoteFor(node);
}

private void handleJoin(Join join) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public String toString() {
followersChecker.setCurrentNodes(discoveryNodes);
nodeFailed.set(false);
assertThat(followersChecker.getFaultyNodes(), empty());
deterministicTaskQueue.runAllTasks();
deterministicTaskQueue.runAllTasksInTimeOrder();
assertTrue(nodeFailed.get());
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -468,50 +470,56 @@ public void testConcurrentJoining() {
possiblyFailingJoinRequests.addAll(randomSubsetOf(possiblyFailingJoinRequests));

CyclicBarrier barrier = new CyclicBarrier(correctJoinRequests.size() + possiblyFailingJoinRequests.size() + 1);
List<Thread> threads = new ArrayList<>();
threads.add(new Thread(() -> {

final AtomicBoolean stopAsserting = new AtomicBoolean();
final Thread assertionThread = new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
for (int i = 0; i < 30; i++) {
while (stopAsserting.get() == false) {
coordinator.invariant();
}
}));
threads.addAll(correctJoinRequests.stream().map(joinRequest -> new Thread(
() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
joinNode(joinRequest);
})).collect(Collectors.toList()));
threads.addAll(possiblyFailingJoinRequests.stream().map(joinRequest -> new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
try {
joinNode(joinRequest);
} catch (CoordinationStateRejectedException ignore) {
// ignore
}
})).collect(Collectors.toList()));

threads.forEach(Thread::start);
threads.forEach(t -> {
}, "assert invariants");

final List<Thread> joinThreads = Stream.concat(correctJoinRequests.stream(), possiblyFailingJoinRequests.stream())
.map(joinRequest ->
new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
try {
joinNode(joinRequest);
} catch (CoordinationStateRejectedException ignore) {
// ignore: even the "correct" requests may fail as a duplicate because a concurrent election may cause a node to
// spontaneously join.
}
}, "process " + joinRequest)).collect(Collectors.toList());

assertionThread.start();
joinThreads.forEach(Thread::start);
joinThreads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
stopAsserting.set(true);
try {
assertionThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster());
successfulNodes.forEach(node -> assertTrue(clusterStateHasNode(node)));
for (DiscoveryNode successfulNode : successfulNodes) {
assertTrue(successfulNode.toString(), clusterStateHasNode(successfulNode));
assertTrue(successfulNode.toString(), coordinator.hasJoinVoteFrom(successfulNode));
}
}

private boolean isLocalNodeElectedMaster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -43,6 +44,11 @@

public class ReconfiguratorTests extends ESTestCase {

@Before
public void resetPortCounterBeforeTest() {
resetPortCounter();
}

public void testReconfigurationExamples() {

check(nodes("a"), conf("a"), 0, conf("a"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ public void runAllTasks() {
}
}

public void runAllTasksInTimeOrder() {
while (hasDeferredTasks() || hasRunnableTasks()) {
if (hasRunnableTasks()) {
runRandomTask();
} else {
advanceTime();
}
}
}

/**
* @return whether there are any runnable tasks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ public void testExecutesTasksInTimeOrder() {
assertFalse(taskQueue.hasDeferredTasks());
}

public void testRunInTimeOrder() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);

final long executionTimeMillis1 = randomLongBetween(1, 100);
final long executionTimeMillis2 = randomLongBetween(executionTimeMillis1 + 1, 200);

taskQueue.scheduleAt(executionTimeMillis1, () -> strings.add("foo"));
taskQueue.scheduleAt(executionTimeMillis2, () -> strings.add("bar"));

taskQueue.runAllTasksInTimeOrder();
assertThat(strings, contains("foo", "bar"));
}

public void testExecutorServiceEnqueuesTasks() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);
Expand Down