Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve finding peers #7626

Merged
merged 18 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeer;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.rlp.RLPOutput;
import org.hyperledger.besu.plugin.data.EnodeURL;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.schema.NodeRecord;
Expand All @@ -37,9 +37,7 @@ public class DiscoveryPeer extends DefaultPeer {
private final Endpoint endpoint;

// Timestamps.
private long firstDiscovered = 0;
private long lastContacted = 0;
private long lastSeen = 0;
private final AtomicLong firstDiscovered = new AtomicLong(0L);
private long lastAttemptedConnection = 0;

private NodeRecord nodeRecord;
Expand Down Expand Up @@ -96,20 +94,11 @@ public void setStatus(final PeerDiscoveryStatus status) {
}

public long getFirstDiscovered() {
return firstDiscovered;
return firstDiscovered.get();
}

public PeerId setFirstDiscovered(final long firstDiscovered) {
this.firstDiscovered = firstDiscovered;
return this;
}

public long getLastContacted() {
return lastContacted;
}

public void setLastContacted(final long lastContacted) {
this.lastContacted = lastContacted;
public void setFirstDiscovered(final long firstDiscovered) {
this.firstDiscovered.compareAndExchange(0L, firstDiscovered);
}

public long getLastAttemptedConnection() {
Expand All @@ -120,14 +109,6 @@ public void setLastAttemptedConnection(final long lastAttemptedConnection) {
this.lastAttemptedConnection = lastAttemptedConnection;
}

public long getLastSeen() {
return lastSeen;
}

public void setLastSeen(final long lastSeen) {
this.lastSeen = lastSeen;
}

public Endpoint getEndpoint() {
return endpoint;
}
Expand Down Expand Up @@ -163,8 +144,6 @@ public String toString() {
sb.append("status=").append(status);
sb.append(", enode=").append(this.getEnodeURL());
sb.append(", firstDiscovered=").append(firstDiscovered);
sb.append(", lastContacted=").append(lastContacted);
sb.append(", lastSeen=").append(lastSeen);
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ protected void handleOutgoingPacket(final DiscoveryPeer peer, final Packet packe
(res, err) -> {
if (err != null) {
handleOutgoingPacketError(err, peer, packet);
return;
}
peer.setLastContacted(System.currentTimeMillis());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ public enum PeerDiscoveryStatus {
* We have successfully bonded with this {@link DiscoveryPeer}, and we are able to exchange
* messages with them.
*/
BONDED,

/** We have requested the ENR record from this {@link DiscoveryPeer} */
ENR_REQUESTED;
BONDED;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -321,7 +323,6 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
switch (packet.getType()) {
case PING:
if (peerPermissions.allowInboundBonding(peer)) {
peer.setLastSeen(System.currentTimeMillis());
final PingPacketData ping = packet.getPacketData(PingPacketData.class).get();
if (!PeerDiscoveryStatus.BONDED.equals(peer.getStatus())
&& (bondingPeers.getIfPresent(sender.getId()) == null)) {
Expand All @@ -338,7 +339,7 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
requestENR(peer);
}
bondingPeers.invalidate(peerId);
addToPeerTable(peer);
checkBeforeAddingToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer);
Optional.ofNullable(cachedEnrRequests.getIfPresent(peerId))
.ifPresent(cachedEnrRequest -> processEnrRequest(peer, cachedEnrRequest));
Expand Down Expand Up @@ -405,38 +406,45 @@ private List<DiscoveryPeer> getPeersFromNeighborsPacket(final Packet packet) {
.collect(Collectors.toList());
}

private boolean addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() != PeerTable.AddResult.AddOutcome.INVALID) {

// Reset the last seen timestamp.
final long now = System.currentTimeMillis();
if (peer.getFirstDiscovered() == 0) {
peer.setFirstDiscovered(now);
}
peer.setLastSeen(now);
private void checkBeforeAddingToPeerTable(final DiscoveryPeer peer) {
if (peerTable.isIpAddressInvalid(peer.getEndpoint())) {
return;
}

if (peer.getStatus() != PeerDiscoveryStatus.BONDED) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
connectOnRlpxLayer(peer);
}
if (peer.getFirstDiscovered() == 0L) {
connectOnRlpxLayer(peer)
.whenComplete(
(pc, th) -> {
if (th == null || !(th.getCause() instanceof TimeoutException)) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
peer.setFirstDiscovered(System.currentTimeMillis());
addToPeerTable(peer);
} else {
LOG.debug("Handshake timed out with peer {}", peer.getLoggableId(), th);
peerTable.invalidateIP(peer.getEndpoint());
}
});
} else {
peer.setStatus(PeerDiscoveryStatus.BONDED);
addToPeerTable(peer);
}
}

if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}
public void addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);

return true;
if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}
return false;
}

void connectOnRlpxLayer(final DiscoveryPeer peer) {
rlpxAgent.connect(peer);
CompletableFuture<PeerConnection> connectOnRlpxLayer(final DiscoveryPeer peer) {
return rlpxAgent.connect(peer);
}

private Optional<PeerInteractionState> matchInteraction(final Packet packet) {
Expand Down Expand Up @@ -512,7 +520,6 @@ void bond(final DiscoveryPeer peer) {
return;
}

peer.setFirstDiscovered(System.currentTimeMillis());
peer.setStatus(PeerDiscoveryStatus.BONDING);
bondingPeers.put(peer.getId(), peer);

Expand Down Expand Up @@ -719,7 +726,7 @@ public void handleBondingRequest(final DiscoveryPeer peer) {

// Load the peer first from the table, then from bonding cache or use the instance that comes in.
private DiscoveryPeer resolvePeer(final DiscoveryPeer peer) {
if (peerTable.ipAddressIsInvalid(peer.getEndpoint())) {
if (peerTable.isIpAddressInvalid(peer.getEndpoint())) {
return null;
}
final Optional<DiscoveryPeer> maybeKnownPeer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Optional<DiscoveryPeer> get(final PeerId peer) {
* @see AddOutcome
*/
public AddResult tryAdd(final DiscoveryPeer peer) {
if (ipAddressIsInvalid(peer.getEndpoint())) {
if (isIpAddressInvalid(peer.getEndpoint())) {
return AddResult.invalid();
}
final Bytes id = peer.getId();
Expand Down Expand Up @@ -212,7 +212,7 @@ public Stream<DiscoveryPeer> streamAllPeers() {
return Arrays.stream(table).flatMap(e -> e.getPeers().stream());
}

boolean ipAddressIsInvalid(final Endpoint endpoint) {
public boolean isIpAddressInvalid(final Endpoint endpoint) {
final String key = getKey(endpoint);
if (invalidIPs.contains(key)) {
return true;
Expand All @@ -223,21 +223,21 @@ boolean ipAddressIsInvalid(final Endpoint endpoint) {
for (final Bucket bucket : table) {
bucket.getPeers().stream()
.filter(p -> p.getEndpoint().getHost().equals(endpoint.getHost()))
.forEach(p -> evictAndStore(p, bucket, key));
.forEach(bucket::evict);
}
return true;
} else {
return false;
}
}

private void evictAndStore(final DiscoveryPeer peer, final Bucket bucket, final String key) {
bucket.evict(peer);
public void invalidateIP(final Endpoint endpoint) {
final String key = getKey(endpoint);
invalidIPs.add(key);
}

private static String getKey(final Endpoint endpoint) {
return endpoint.getHost() + endpoint.getFunctionalTcpPort();
return endpoint.getHost() + ":" + endpoint.getFunctionalTcpPort();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private boolean satisfiesMapAdditionCriteria(final DiscoveryPeer discoPeer) {
return !oneTrueMap.containsKey(discoPeer.getId())
&& (initialPeers.contains(discoPeer) || !peerTable.get(discoPeer).isPresent())
&& !discoPeer.getId().equals(localPeer.getId())
&& !peerTable.ipAddressIsInvalid(discoPeer.getEndpoint());
&& !peerTable.isIpAddressInvalid(discoPeer.getEndpoint());
}

void onNeighboursReceived(final DiscoveryPeer peer, final List<DiscoveryPeer> peers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
Expand Down Expand Up @@ -174,10 +173,6 @@ public void subscribeIncomingConnect(final ConnectCallback callback) {
public CompletableFuture<PeerConnection> connect(final Peer peer) {
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();

if (peer instanceof DiscoveryPeer) {
((DiscoveryPeer) peer).setLastAttemptedConnection(System.currentTimeMillis());
}

final EnodeURL enode = peer.getEnodeURL();
new Bootstrap()
.group(workers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void neighborsPacketFromUnbondedPeerIsDropped() {
}

@Test
public void neighborsPacketLimited() {
public void neighborsPacketLimited() throws InterruptedException {
// Start 20 agents with no bootstrap peers.
final List<MockPeerDiscoveryAgent> otherAgents =
helper.startDiscoveryAgents(20, Collections.emptyList());
Expand All @@ -192,8 +192,9 @@ public void neighborsPacketLimited() {
.map(Optional::get)
.collect(Collectors.toList());

// Start another peer pointing to those 20 agents.
// Start another peer
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(otherPeers);

// We used to do a hasSize match but we had issues with duplicate peers getting added to the
// list. By moving to a contains we make sure that all the peers are loaded with tolerance for
// duplicates. If we fix the duplication problem we should use containsExactlyInAnyOrder to
Expand Down Expand Up @@ -222,7 +223,7 @@ public void neighborsPacketLimited() {
final List<IncomingPacket> incomingPackets =
testAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.NEIGHBORS))
.collect(toList());
.toList();
assertThat(incomingPackets.size()).isEqualTo(1);
final IncomingPacket neighborsPacket = incomingPackets.get(0);
assertThat(neighborsPacket.fromAgent).isEqualTo(agent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ public void pongSentUponPing() {
final List<IncomingPacket> otherAgentIncomingPongs =
otherAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.PONG))
.collect(Collectors.toList());
.toList();
assertThat(otherAgentIncomingPongs.size()).isEqualTo(1);

assertThat(
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).isPresent())
otherAgentIncomingPongs
.getFirst()
.packet
.getPacketData(PongPacketData.class)
.isPresent())
.isTrue();
final PongPacketData pong =
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -44,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -301,15 +303,12 @@ public MockPeerDiscoveryAgent build() {
final ForkId forkId = new ForkId(Bytes.EMPTY, Bytes.EMPTY);
when(mockForkIdManager.getForkIdForChainHead()).thenReturn(forkId);
when(mockForkIdManager.peerCheck(forkId)).thenReturn(true);
final RlpxAgent rlpxAgent = mock(RlpxAgent.class);
when(rlpxAgent.connect(any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
final MockPeerDiscoveryAgent mockPeerDiscoveryAgent =
new MockPeerDiscoveryAgent(
nodeKey,
config,
peerPermissions,
agents,
natService,
mockForkIdManager,
mock(RlpxAgent.class));
nodeKey, config, peerPermissions, agents, natService, mockForkIdManager, rlpxAgent);
mockPeerDiscoveryAgent.getAdvertisedPeer().ifPresent(peer -> peer.setNodeRecord(nodeRecord));

return mockPeerDiscoveryAgent;
Expand Down
Loading
Loading