Skip to content

Await all pending activity in testConnectAndDisconnect #40037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,45 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
runnables.forEach(Runnable::run);
}

void ensureConnections(Runnable onCompletion) {
// Called by tests after some disruption has concluded. It is possible that one or more targets are currently CONNECTING and have
// been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection.
// If so, we cannot simply add our listener to the queue because it will be notified when this CONNECTING activity completes even
// though it was disrupted. We must therefore wait for all the current activity to finish and then go through and reconnect to
// any missing nodes.
awaitPendingActivity(() -> connectDisconnectedTargets(onCompletion));
}

private void awaitPendingActivity(Runnable onCompletion) {
final List<Runnable> runnables = new ArrayList<>();
synchronized (mutex) {
final Collection<ConnectionTarget> connectionTargets = targetsByNode.values();
if (connectionTargets.isEmpty()) {
runnables.add(onCompletion);
} else {
final GroupedActionListener<Void> listener = new GroupedActionListener<>(
ActionListener.wrap(onCompletion), connectionTargets.size());
for (final ConnectionTarget connectionTarget : connectionTargets) {
runnables.add(connectionTarget.awaitCurrentActivity(listener));
}
}
}
runnables.forEach(Runnable::run);
}

/**
* Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any
* nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection
* attempts have completed.
*/
void ensureConnections(Runnable onCompletion) {
private void connectDisconnectedTargets(Runnable onCompletion) {
final List<Runnable> runnables = new ArrayList<>();
synchronized (mutex) {
final Collection<ConnectionTarget> connectionTargets = targetsByNode.values();
if (connectionTargets.isEmpty()) {
runnables.add(onCompletion);
} else {
logger.trace("ensuring connections to {}", targetsByNode);
logger.trace("connectDisconnectedTargets: {}", targetsByNode);
final GroupedActionListener<Void> listener = new GroupedActionListener<>(
ActionListener.wrap(onCompletion), connectionTargets.size());
for (final ConnectionTarget connectionTarget : connectionTargets) {
Expand All @@ -182,7 +208,7 @@ void ensureConnections(Runnable onCompletion) {
class ConnectionChecker extends AbstractRunnable {
protected void doRun() {
if (connectionChecker == this) {
ensureConnections(this::scheduleNextCheck);
connectDisconnectedTargets(this::scheduleNextCheck);
}
}

Expand Down Expand Up @@ -352,6 +378,18 @@ Runnable ensureConnected(@Nullable ActionListener<Void> listener) {
}
}

Runnable awaitCurrentActivity(ActionListener<Void> listener) {
assert Thread.holdsLock(mutex) : "mutex not held";

if (activityType == ActivityType.IDLE) {
return () -> listener.onResponse(null);
} else {
addListener(listener);
return () -> {
};
}
}

private void addListener(@Nullable ActionListener<Void> listener) {
assert Thread.holdsLock(mutex) : "mutex not held";
assert activityType != ActivityType.IDLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ private DiscoveryNodes discoveryNodesFromList(List<DiscoveryNode> discoveryNodes
return builder.build();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40030")
public void testConnectAndDisconnect() throws Exception {
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);

Expand All @@ -106,46 +105,56 @@ public void testConnectAndDisconnect() throws Exception {
service.ensureConnections(() -> future.onResponse(null));
future.actionGet();
}
});
}, "reconnection thread");
reconnectionThread.start();

final List<DiscoveryNode> allNodes = generateNodes();
for (int iteration = 0; iteration < 3; iteration++) {
try {

final List<DiscoveryNode> allNodes = generateNodes();
for (int iteration = 0; iteration < 3; iteration++) {

final boolean isDisrupting = randomBoolean();
final AtomicBoolean stopDisrupting = new AtomicBoolean();
final Thread disruptionThread = new Thread(() -> {
while (isDisrupting && stopDisrupting.get() == false) {
transportService.disconnectFromNode(randomFrom(allNodes));
final boolean isDisrupting = randomBoolean();
if (isDisrupting == false) {
// if the previous iteration was a disrupting one then there could still be some pending disconnections which would
// prevent us from asserting that all nodes are connected in this iteration without this call.
ensureConnections(service);
}
});
disruptionThread.start();

final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes));
final PlainActionFuture<Void> future = new PlainActionFuture<>();
service.connectToNodes(nodes, () -> future.onResponse(null));
future.actionGet();
if (isDisrupting == false) {
assertConnected(nodes);
}
service.disconnectFromNodesExcept(nodes);
final AtomicBoolean stopDisrupting = new AtomicBoolean();
final Thread disruptionThread = new Thread(() -> {
while (isDisrupting && stopDisrupting.get() == false) {
transportService.disconnectFromNode(randomFrom(allNodes));
}
}, "disruption thread " + iteration);
disruptionThread.start();

final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes));
final PlainActionFuture<Void> future = new PlainActionFuture<>();
service.connectToNodes(nodes, () -> future.onResponse(null));
future.actionGet();
if (isDisrupting == false) {
assertConnected(nodes);
}
service.disconnectFromNodesExcept(nodes);

assertTrue(stopDisrupting.compareAndSet(false, true));
disruptionThread.join();
assertTrue(stopDisrupting.compareAndSet(false, true));
disruptionThread.join();

if (randomBoolean()) {
// sometimes do not wait for the disconnections to complete before starting the next connections
if (usually()) {
ensureConnections(service);
assertConnectedExactlyToNodes(nodes);
} else {
assertBusy(() -> assertConnectedExactlyToNodes(nodes));
if (randomBoolean()) {
// sometimes do not wait for the disconnections to complete before starting the next connections
if (usually()) {
ensureConnections(service);
assertConnectedExactlyToNodes(nodes);
} else {
assertBusy(() -> assertConnectedExactlyToNodes(nodes));
}
}
}
} finally {
assertTrue(stopReconnecting.compareAndSet(false, true));
reconnectionThread.join();
}

assertTrue(stopReconnecting.compareAndSet(false, true));
reconnectionThread.join();
ensureConnections(service);
}

public void testPeriodicReconnection() {
Expand Down Expand Up @@ -206,7 +215,6 @@ public String toString() {
assertConnectedExactlyToNodes(targetNodes);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40030")
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);

Expand Down