diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index 6fed1ff423f..c3a1d43960e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.manager; import static com.google.common.base.Preconditions.checkArgument; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Difficulty; @@ -54,6 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import javax.annotation.Nonnull; import com.google.common.annotations.VisibleForTesting; import org.apache.tuweni.bytes.Bytes; @@ -61,7 +63,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EthPeer { +public class EthPeer implements Comparable { private static final Logger LOG = LoggerFactory.getLogger(EthPeer.class); private static final int MAX_OUTSTANDING_REQUESTS = 5; @@ -395,7 +397,14 @@ public Map timeoutCounts() { return reputation.timeoutCounts(); } + public PeerReputation getReputation() { + return reputation; + } + void handleDisconnect() { + traceLambda( + LOG, "handleDisconnect - peer... {}, {}", this::getShortNodeId, this::getReputation); + requestManagers.forEach( (protocolName, map) -> map.forEach((code, requestManager) -> requestManager.close())); } @@ -522,7 +531,28 @@ public boolean hasSupportForMessage(final int messageCode) { @Override public String toString() { - return String.format("Peer %s...", nodeId().toString().substring(0, 20)); + return String.format( + "Peer %s... %s, validated? %s, disconnected? %s", + getShortNodeId(), reputation, isFullyValidated(), isDisconnected()); + } + + @Nonnull + public String getShortNodeId() { + return nodeId().toString().substring(0, 20); + } + + @Override + public int compareTo(final @Nonnull EthPeer ethPeer) { + int repCompare = this.reputation.compareTo(ethPeer.reputation); + if (repCompare != 0) return repCompare; + + int headStateCompare = + Long.compare( + this.chainHeadState.getBestBlock().getNumber(), + ethPeer.chainHeadState.getBestBlock().getNumber()); + if (headStateCompare != 0) return headStateCompare; + + return getConnection().getPeerInfo().compareTo(ethPeer.getConnection().getPeerInfo()); } @FunctionalInterface diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 9268def8ded..b35c2aab6b8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -204,9 +204,15 @@ public interface ConnectCallback { @Override public String toString() { + if (connections.isEmpty()) { + return "0 EthPeers {}"; + } final String connectionsList = - connections.values().stream().map(EthPeer::toString).collect(Collectors.joining(",")); - return "EthPeers{connections=" + connectionsList + '}'; + connections.values().stream() + .sorted() + .map(EthPeer::toString) + .collect(Collectors.joining(", \n")); + return connections.size() + " EthPeers {\n" + connectionsList + '}'; } private void invokeConnectionCallbacks(final EthPeer peer) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index d90503a14eb..5cd38791543 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -331,6 +331,7 @@ public void handleNewConnection(final PeerConnection connection) { } catch (final PeerNotConnected peerNotConnected) { // Nothing to do. } + LOG.trace("{}", ethPeers); } @Override @@ -345,6 +346,7 @@ public void handleDisconnect( reason, connection.getPeerInfo(), ethPeers.peerCount()); + LOG.trace("{}", ethPeers); } private void handleStatusMessage(final EthPeer peer, final MessageData data) { @@ -374,7 +376,7 @@ private void handleStatusMessage(final EthPeer peer, final MessageData data) { } } catch (final RLPException e) { LOG.debug("Unable to parse status message.", e); - // Parsing errors can happen when clients broadcast network ids outside of the int range, + // Parsing errors can happen when clients broadcast network ids outside the int range, // So just disconnect with "subprotocol" error rather than "breach of protocol". peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerReputation.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerReputation.java index 2955c713eda..8293f0dfa09 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerReputation.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerReputation.java @@ -24,11 +24,12 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PeerReputation { +public class PeerReputation implements Comparable { private static final Logger LOG = LoggerFactory.getLogger(PeerReputation.class); private static final int TIMEOUT_THRESHOLD = 3; private static final int USELESS_RESPONSE_THRESHOLD = 5; @@ -39,12 +40,20 @@ public class PeerReputation { new ConcurrentHashMap<>(); private final Queue uselessResponseTimes = new ConcurrentLinkedQueue<>(); + private static final int DEFAULT_SCORE = 100; + private static final int SMALL_ADJUSTMENT = 1; + private static final int LARGE_ADJUSTMENT = 10; + + private int score = DEFAULT_SCORE; + public Optional recordRequestTimeout(final int requestCode) { final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet(); if (newTimeoutCount >= TIMEOUT_THRESHOLD) { LOG.debug("Disconnection triggered by repeated timeouts"); + score -= LARGE_ADJUSTMENT; return Optional.of(DisconnectReason.TIMEOUT); } else { + score -= SMALL_ADJUSTMENT; return Optional.empty(); } } @@ -67,9 +76,11 @@ public Optional recordUselessResponse(final long timestamp) { uselessResponseTimes.poll(); } if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) { + score -= LARGE_ADJUSTMENT; LOG.debug("Disconnection triggered by exceeding useless response threshold"); return Optional.of(DisconnectReason.USELESS_PEER); } else { + score -= SMALL_ADJUSTMENT; return Optional.empty(); } } @@ -77,4 +88,14 @@ public Optional recordUselessResponse(final long timestamp) { private boolean shouldRemove(final Long timestamp, final long currentTimestamp) { return timestamp != null && timestamp + USELESS_RESPONSE_WINDOW_IN_MILLIS < currentTimestamp; } + + @Override + public String toString() { + return String.format("PeerReputation " + score); + } + + @Override + public int compareTo(final @Nonnull PeerReputation otherReputation) { + return Integer.compare(this.score, otherReputation.score); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessage.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessage.java index 565ed706a0b..d3b7d076445 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessage.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessage.java @@ -215,5 +215,28 @@ public static EthStatus readFrom(final RLPInput in) { return new EthStatus( protocolVersion, networkId, totalDifficulty, bestHash, genesisHash, forkId); } + + @Override + public String toString() { + return "EthStatus{" + + "protocolVersion=" + + protocolVersion + + ", networkId=" + + networkId + + ", totalDifficulty=" + + totalDifficulty + + ", bestHash=" + + bestHash + + ", genesisHash=" + + genesisHash + + ", forkId=" + + forkId + + '}'; + } + } + + @Override + public String toString() { + return status().toString(); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java index 8ed9c715bc4..9ac7948febc 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java @@ -35,6 +35,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.PingMessage; import org.hyperledger.besu.plugin.services.permissioning.NodeMessagePermissioningProvider; import org.hyperledger.besu.testutil.TestClock; @@ -47,11 +48,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.tuweni.bytes.Bytes; import org.junit.Test; public class EthPeerTest { private static final BlockDataGenerator gen = new BlockDataGenerator(); private final TestClock clock = new TestClock(); + private static final Bytes NODE_ID = Bytes.random(32); @Test public void getHeadersStream() throws PeerNotConnected { @@ -342,6 +345,22 @@ public void message_permissioning_any_false_permission_preventsMessageFromSendin verify(peer.getConnection(), times(0)).sendForProtocol(any(), eq(PingMessage.get())); } + @Test + public void compareTo_withSameNodeId() { + final EthPeer peer1 = createPeerWithPeerInfo(NODE_ID); + final EthPeer peer2 = createPeerWithPeerInfo(NODE_ID); + assertThat(peer1.compareTo(peer2)).isEqualTo(0); + assertThat(peer2.compareTo(peer1)).isEqualTo(0); + } + + @Test + public void compareTo_withDifferentNodeId() { + final EthPeer peer1 = createPeerWithPeerInfo(NODE_ID); + final EthPeer peer2 = createPeerWithPeerInfo(Bytes.fromHexString("0x00")); + assertThat(peer1.compareTo(peer2)).isEqualTo(1); + assertThat(peer2.compareTo(peer1)).isEqualTo(-1); + } + private void messageStream( final ResponseStreamSupplier getStream, final MessageData targetMessage, @@ -431,6 +450,22 @@ private EthPeer createPeer(final List peerValidators) { return createPeer(peerValidators, Collections.emptyList()); } + private EthPeer createPeerWithPeerInfo(final Bytes nodeId) { + final PeerConnection peerConnection = mock(PeerConnection.class); + final Consumer onPeerReady = (peer) -> {}; + // Use a non-eth protocol name to ensure that EthPeer with sub-protocols such as Istanbul + // that extend the sub-protocol work correctly + PeerInfo peerInfo = new PeerInfo(1, "clientId", Collections.emptyList(), 30303, nodeId); + when(peerConnection.getPeerInfo()).thenReturn(peerInfo); + return new EthPeer( + peerConnection, + "foo", + onPeerReady, + Collections.emptyList(), + clock, + Collections.emptyList()); + } + private EthPeer createPeer( final List peerValidators, final List permissioningProviders) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java index ddf1edb13de..e696f78d8cb 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java @@ -32,6 +32,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; +import java.util.Collections; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CancellationException; @@ -264,6 +265,18 @@ public void shouldFailRequestWithBusyDisconnectedAssignedPeer() throws Exception assertRequestFailure(pendingRequest, CancellationException.class); } + @Test + public void toString_hasExpectedInfo() { + assertThat(ethPeers.toString()).isEqualTo("0 EthPeers {}"); + + final EthPeer peerA = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(50), 20) + .getEthPeer(); + ethPeers.registerConnection(peerA.getConnection(), Collections.emptyList()); + assertThat(ethPeers.toString()).contains("1 EthPeers {"); + assertThat(ethPeers.toString()).contains(peerA.getShortNodeId()); + } + private void freeUpCapacity(final EthPeer ethPeer) { ethPeers.dispatchMessage(ethPeer, new EthMessage(ethPeer, NodeDataMessage.create(emptyList()))); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessageTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessageTest.java index 44fd1e42ce4..e84c795f4b7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessageTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessageTest.java @@ -90,6 +90,25 @@ public void serializeDeserializeWithForkId() { assertThat(copy.forkId()).isEqualTo(forkId); } + @Test + public void toStringHasExpectedInfo() { + final int version = EthProtocol.EthVersion.V64; + final BigInteger networkId = BigInteger.ONE; + final Difficulty td = Difficulty.of(1000L); + final Hash bestHash = randHash(1L); + final Hash genesisHash = randHash(2L); + final ForkId forkId = new ForkId(Bytes.fromHexString("0xa00bc334"), 0L); + + final MessageData msg = + StatusMessage.create(version, networkId, td, bestHash, genesisHash, forkId); + + final StatusMessage copy = new StatusMessage(msg.getData()); + final String copyToString = copy.toString(); + + assertThat(copyToString).contains("bestHash=" + bestHash); + assertThat(copyToString).contains("genesisHash=" + genesisHash); + } + private Hash randHash(final long seed) { final Random random = new Random(seed); final byte[] bytes = new byte[32]; diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java index 1e16e1d692f..5691c63bdba 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java @@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.isNull; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; import org.hyperledger.besu.crypto.NodeKey; import org.hyperledger.besu.crypto.SECPPublicKey; @@ -180,6 +181,14 @@ public void connect(final Stream peerStream) { .forEach(this::connect); } + private String logConnectionsByIdToString() { + final String connectionsList = + connectionsById.values().stream() + .map(RlpxConnection::toString) + .collect(Collectors.joining(",\n")); + return connectionsById.size() + " ConnectionsById {\n" + connectionsList + "}"; + } + public void disconnect(final Bytes peerId, final DisconnectReason reason) { final RlpxConnection connection = connectionsById.remove(peerId); if (connection != null) { @@ -227,7 +236,7 @@ public CompletableFuture connect(final Peer peer) { // Check max peers if (!peerPrivileges.canExceedConnectionLimits(peer) && getConnectionCount() >= maxConnections) { final String errorMsg = - "Max peer peer connections established (" + "Max peer connections established (" + maxConnections + "). Cannot connect to peer: " + peer; @@ -263,6 +272,8 @@ public CompletableFuture connect(final Peer peer) { } }); + traceLambda(LOG, "{}", this::logConnectionsByIdToString); + return connectionFuture.get(); } @@ -276,6 +287,7 @@ private void handleDisconnect( final PeerConnection peerConnection, final DisconnectReason disconnectReason, final boolean initiatedByPeer) { + traceLambda(LOG, "{}", this::logConnectionsByIdToString); cleanUpPeerConnection(peerConnection.getPeer().getId()); } @@ -416,6 +428,7 @@ && getConnectionCount() >= maxConnections) { // Check remote connections again to control for race conditions enforceRemoteConnectionLimits(); enforceConnectionLimits(); + traceLambda(LOG, "{}", this::logConnectionsByIdToString); } private boolean shouldLimitRemoteConnections() { @@ -518,6 +531,7 @@ private int compareDuplicateConnections(final RlpxConnection a, final RlpxConnec final Bytes peerId = a.getPeer().getId(); final Bytes localId = localNode.getPeer().getId(); + // at this point a.Id == b.Id if (a.initiatedRemotely() != b.initiatedRemotely()) { // If we have connections initiated in different directions, keep the connection initiated // by the node with the lower id @@ -528,6 +542,7 @@ private int compareDuplicateConnections(final RlpxConnection a, final RlpxConnec } } // Otherwise, keep older connection + LOG.info("comparing timestamps " + a.getInitiatedAt() + " with " + b.getInitiatedAt()); return Math.toIntExact(a.getInitiatedAt() - b.getInitiatedAt()); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/RlpxConnection.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/RlpxConnection.java index a56e9b8fef6..ae0ba7f4b4a 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/RlpxConnection.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/RlpxConnection.java @@ -143,6 +143,14 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(peerConnection); } + + @Override + public String toString() { + return "RemotelyInitiatedRlpxConnection initiatedAt:" + + getInitiatedAt() + + " to " + + peerConnection.getPeer().getId(); + } } private static class LocallyInitiatedRlpxConnection extends RlpxConnection { @@ -213,6 +221,16 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(peer, future); } + + @Override + public String toString() { + return "LocallyInitiatedRlpxConnection initiatedAt:" + + getInitiatedAt() + + " to " + + getPeer().getId() + + " disconnected? " + + isFailedOrDisconnected(); + } } public static class ConnectionNotEstablishedException extends IllegalStateException { diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/PeerInfo.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/PeerInfo.java index dc9f3268843..df25746837e 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/PeerInfo.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/PeerInfo.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import javax.annotation.Nonnull; import org.apache.tuweni.bytes.Bytes; @@ -36,7 +37,7 @@ * *

The peer info is shared between peers during the HELLO wire protocol handshake. */ -public class PeerInfo { +public class PeerInfo implements Comparable { private final int version; private final String clientId; private final List capabilities; @@ -143,4 +144,9 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(version, clientId, capabilities, port, nodeId); } + + @Override + public int compareTo(final @Nonnull PeerInfo peerInfo) { + return this.nodeId.compareTo(peerInfo.nodeId); + } } diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java index 9872ce5fc60..8adc5cc3505 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java @@ -294,7 +294,7 @@ public void connect_failsWhenMaxPeersConnected() { assertThatThrownBy(connection::get) .hasCauseInstanceOf(IllegalStateException.class) - .hasMessageContaining("Max peer peer connections established (1). Cannot connect to peer"); + .hasMessageContaining("Max peer connections established (1). Cannot connect to peer"); assertPeerConnectionNotTracked(peer); assertThat(agent.getConnectionCount()).isEqualTo(1); }