Skip to content

[Zen2] Add PeerFinder#onFoundPeersUpdated #32939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 47 additions & 16 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,26 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) {
logger.trace("activating with {}", lastAcceptedNodes);

synchronized (mutex) {
assert active == false;
assert assertInactiveWithNoKnownPeers();
active = true;
this.lastAcceptedNodes = lastAcceptedNodes;
leader = Optional.empty();
handleWakeUp();
handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
}
}

public void deactivate(DiscoveryNode leader) {
final boolean peersRemoved;
synchronized (mutex) {
logger.trace("deactivating and setting leader to {}", leader);
active = false;
handleWakeUp();
peersRemoved = handleWakeUp();
this.leader = Optional.of(leader);
assert assertInactiveWithNoKnownPeers();
}
if (peersRemoved) {
onFoundPeersUpdated();
}
}

// exposed to subclasses for testing
Expand All @@ -114,7 +118,7 @@ protected final boolean holdsLock() {

boolean assertInactiveWithNoKnownPeers() {
assert active == false;
assert peersByAddress.isEmpty();
assert peersByAddress.isEmpty() : peersByAddress.keySet();
return true;
}

Expand Down Expand Up @@ -142,10 +146,20 @@ private DiscoveryNode getLocalNode() {
}

/**
* Called on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
* Invoked on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
* Note that invocations of this method are not synchronised. By the time it is called we may have been deactivated.
*/
protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term);

/**
* Invoked when the set of found peers changes. Note that invocations of this method are not fully synchronised, so we only guarantee
* that the change to the set of found peers happens before this method is invoked. If there are multiple concurrent changes then there
* will be multiple concurrent invocations of this method, with no guarantee as to their order. For this reason we do not pass the
* updated set of peers as an argument to this method, leaving it to the implementation to call getFoundPeers() with appropriate
* synchronisation to avoid lost updates. Also, by the time this method is invoked we may have been deactivated.
*/
protected abstract void onFoundPeersUpdated();

public interface TransportAddressConnector {
/**
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
Expand All @@ -170,7 +184,6 @@ public Iterable<DiscoveryNode> getFoundPeers() {
}

private List<DiscoveryNode> getFoundPeersUnderLock() {
assert active;
assert holdsLock() : "PeerFinder mutex not held";
return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList());
}
Expand All @@ -181,16 +194,21 @@ private Peer createConnectingPeer(TransportAddress transportAddress) {
return peer;
}

private void handleWakeUp() {
/**
* @return whether any peers were removed due to disconnection
*/
private boolean handleWakeUp() {
assert holdsLock() : "PeerFinder mutex not held";

boolean peersRemoved = false;

for (final Peer peer : peersByAddress.values()) {
peer.handleWakeUp();
peersRemoved = peer.handleWakeUp() || peersRemoved; // care: avoid short-circuiting, each peer needs waking up
}

if (active == false) {
logger.trace("not active");
return;
return peersRemoved;
}

logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes);
Expand Down Expand Up @@ -220,15 +238,20 @@ public void onFailure(Exception e) {
@Override
protected void doRun() {
synchronized (mutex) {
handleWakeUp();
if (handleWakeUp() == false) {
return;
}
}
onFoundPeersUpdated();
}

@Override
public String toString() {
return "PeerFinder::handleWakeUp";
}
});

return peersRemoved;
}

private void startProbe(TransportAddress transportAddress) {
Expand Down Expand Up @@ -260,12 +283,12 @@ DiscoveryNode getDiscoveryNode() {
return discoveryNode.get();
}

void handleWakeUp() {
boolean handleWakeUp() {
assert holdsLock() : "PeerFinder mutex not held";

if (active == false) {
removePeer();
return;
return true;
}

final DiscoveryNode discoveryNode = getDiscoveryNode();
Expand All @@ -279,8 +302,11 @@ void handleWakeUp() {
} else {
logger.trace("{} no longer connected", this);
removePeer();
return true;
}
}

return false;
}

void establishConnection() {
Expand All @@ -295,12 +321,17 @@ public void onResponse(DiscoveryNode remoteNode) {
assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible";
assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node";
synchronized (mutex) {
if (active) {
assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();
discoveryNode.set(remoteNode);
requestPeers();
if (active == false) {
return;
}

assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();
discoveryNode.set(remoteNode);
requestPeers();
}

assert holdsLock() == false : "PeerFinder mutex is held in error";
onFoundPeersUpdated();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class PeerFinderTests extends ESTestCase {
private Set<DiscoveryNode> connectedNodes = new HashSet<>();
private DiscoveryNodes lastAcceptedNodes;
private TransportService transportService;
private Iterable<DiscoveryNode> foundPeersFromNotification;

private static long CONNECTION_TIMEOUT_MILLIS = 30000;

Expand Down Expand Up @@ -156,6 +157,13 @@ protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
discoveredMasterNode = masterNode;
discoveredMasterTerm = OptionalLong.of(term);
}

@Override
protected void onFoundPeersUpdated() {
assert holdsLock() == false : "PeerFinder lock held in error";
foundPeersFromNotification = getFoundPeers();
logger.trace("onFoundPeersUpdated({})", foundPeersFromNotification);
}
}

private void resolveConfiguredHosts(Consumer<List<TransportAddress>> onResult) {
Expand Down Expand Up @@ -214,13 +222,13 @@ public void setup() {
lastAcceptedNodes = DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build();

peerFinder = new TestPeerFinder(settings, transportService, transportAddressConnector);
foundPeersFromNotification = emptyList();
}

@After
public void deactivateAndRunRemainingTasks() {
peerFinder.deactivate(localNode);
deterministicTaskQueue.runAllTasks(); // termination ensures that everything is properly cleaned up
peerFinder.assertInactiveWithNoKnownPeers(); // should eventually have no nodes when deactivated
deterministicTaskQueue.runAllRunnableTasks(random());
}

public void testAddsReachableNodesFromUnicastHostsList() {
Expand Down Expand Up @@ -693,14 +701,33 @@ private void assertFoundPeers(DiscoveryNode... expectedNodesArray) {
final Stream<DiscoveryNode> expectedNodes = Arrays.stream(expectedNodesArray);
final Stream<DiscoveryNode> actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false);
assertThat(actualNodes.collect(Collectors.toSet()), equalTo(expectedNodes.collect(Collectors.toSet())));
assertNotifiedOfAllUpdates();
}

private void assertNotifiedOfAllUpdates() {
final Stream<DiscoveryNode> actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false);
final Stream<DiscoveryNode> notifiedNodes = StreamSupport.stream(foundPeersFromNotification.spliterator(), false);
assertThat(notifiedNodes.collect(Collectors.toSet()), equalTo(actualNodes.collect(Collectors.toSet())));
}

private DiscoveryNode newDiscoveryNode(String nodeId) {
return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), Version.CURRENT);
}

private void runAllRunnableTasks() {
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
PeerFinderTests.this.assertNotifiedOfAllUpdates();
}

@Override
public String toString() {
return "assertNotifiedOfAllUpdates";
}
});
deterministicTaskQueue.runAllRunnableTasks(random());
assertNotifiedOfAllUpdates();
}
}