Skip to content

Commit

Permalink
add register useless response to multiple tasks (hyperledger#5220)
Browse files Browse the repository at this point in the history
* add register useless response to multiple tasks

Signed-off-by: Stefan <stefan.pingel@consensys.net>
Signed-off-by: Stefan Pingel <16143240+pinges@users.noreply.github.com>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
  • Loading branch information
pinges and macfarla authored Mar 31, 2023
1 parent ebbc0df commit 64efb66
Show file tree
Hide file tree
Showing 20 changed files with 596 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ Optional<RequestManager> dispatch(final EthMessage ethMessage, final String prot
final int messageCode = ethMessage.getData().getCode();
reputation.resetTimeoutCount(messageCode);

Optional<RequestManager> requestManager = getRequestManager(protocolName, messageCode);
final Optional<RequestManager> requestManager = getRequestManager(protocolName, messageCode);
requestManager.ifPresentOrElse(
localRequestManager -> localRequestManager.dispatchResponse(ethMessage),
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public PendingPeerRequest executePeerRequest(

public void dispatchMessage(
final EthPeer peer, final EthMessage ethMessage, final String protocolName) {
Optional<RequestManager> maybeRequestManager = peer.dispatch(ethMessage, protocolName);
final Optional<RequestManager> maybeRequestManager = peer.dispatch(ethMessage, protocolName);
if (maybeRequestManager.isPresent() && peer.hasAvailableRequestCapacity()) {
reattemptPendingPeerRequests();
}
Expand Down Expand Up @@ -229,7 +229,7 @@ public Stream<EthPeer> streamAllPeers() {

private void removeDisconnectedPeers() {
final Collection<EthPeer> peerStream = connections.values();
for (EthPeer p : peerStream) {
for (final EthPeer p : peerStream) {
if (p.isDisconnected()) {
connections.remove(p.getConnection());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class PeerReputation implements Comparable<PeerReputation> {
private final Queue<Long> uselessResponseTimes = new ConcurrentLinkedQueue<>();

private static final int SMALL_ADJUSTMENT = 1;
private static final int USELESS_ADJUSTMENT = 2;
private static final int LARGE_ADJUSTMENT = 10;

private int score;
Expand Down Expand Up @@ -95,7 +96,7 @@ public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
LOG.debug("Disconnection triggered by exceeding useless response threshold");
return Optional.of(DisconnectReason.USELESS_PEER);
} else {
score -= SMALL_ADJUSTMENT;
score -= USELESS_ADJUSTMENT;
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
Expand Down Expand Up @@ -71,7 +72,14 @@ protected CompletableFuture<AccountRangeMessage.AccountRangeData> executePeerTas
return executeSubTask(task::run)
.thenApply(
peerResult -> {
if (isEmptyResponse(peerResult.getResult())) {
final EthPeer peer = peerResult.getPeer();
peer.recordUselessResponse("GetAccountRangeFromPeerTask");
throw new IncompleteResultsException(
"No account range returned by peer " + peer.nodeId());
}
result.complete(peerResult.getResult());
peerResult.getPeer().chainState().updateHeightEstimate(blockHeader.getNumber());
return peerResult.getResult();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand Down Expand Up @@ -65,7 +66,13 @@ protected CompletableFuture<Map<Bytes32, Bytes>> executePeerTask(
return executeSubTask(task::run)
.thenApply(
peerResult -> {
if (isEmptyResponse(peerResult.getResult())) {
final EthPeer peer = peerResult.getPeer();
peer.recordUselessResponse("GetBytecodeFromPeerTask");
throw new IncompleteResultsException("No code returned by peer " + peer.nodeId());
}
result.complete(peerResult.getResult());
peerResult.getPeer().chainState().updateHeightEstimate(blockHeader.getNumber());
return peerResult.getResult();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
Expand Down Expand Up @@ -75,7 +76,14 @@ protected CompletableFuture<StorageRangeMessage.SlotRangeData> executePeerTask(
return executeSubTask(task::run)
.thenApply(
peerResult -> {
if (isEmptyResponse(peerResult.getResult())) {
final EthPeer peer = peerResult.getPeer();
peer.recordUselessResponse("RetryingGetStorageRangeFromPeerTask");
throw new IncompleteResultsException(
"No storage returned by peer " + peer.nodeId());
}
result.complete(peerResult.getResult());
peerResult.getPeer().chainState().updateHeightEstimate(blockHeader.getNumber());
return peerResult.getResult();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand Down Expand Up @@ -64,7 +65,14 @@ protected CompletableFuture<Map<Bytes, Bytes>> executePeerTask(
return executeSubTask(task::run)
.thenApply(
peerResult -> {
if (isEmptyResponse(peerResult.getResult())) {
final EthPeer peer = peerResult.getPeer();
peer.recordUselessResponse("GetTrieNodeFromPeerTask");
throw new IncompleteResultsException(
"No trie node returned by peer " + peer.nodeId());
}
result.complete(peerResult.getResult());
peerResult.getPeer().chainState().updateHeightEstimate(blockHeader.getNumber());
return peerResult.getResult();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ protected Optional<List<BlockHeader>> processResponse(
final List<BlockHeader> headers = headersMessage.getHeaders(protocolSchedule);
if (headers.isEmpty()) {
// Message contains no data - nothing to do
// We cannot register a useless response yet, as we have to try all response streams for the
// RequestManager if request ids are not supported (see RequestManager dispatchResponse())
// Instead we are returning Optional.empty() so the next response stream can be tried.
// If we cannot find a matching response stream for the message the
// peer.recordUselessResponse() of the if(streamClosed) block above is triggered by calling
// this method with streamClosed set to true.
LOG.debug("headers.isEmpty. Peer: {}", peer);
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ protected final void executeTask() {
ethContext.getScheduler().failAfterTimeout(promise, timeout);

stream.then(
(streamClosed, message, peer1) ->
handleMessage(promise, streamClosed, message, peer1));
(streamClosed, message, ethPeer) ->
handleMessage(promise, streamClosed, message, ethPeer));
},
promise::completeExceptionally);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ protected boolean isPeerFailure(final Throwable error) {
|| error instanceof NoAvailablePeersException;
}

public boolean isEmptyResponse(final T peerResult) {
return isEmptyResponse.test(peerResult);
}

protected EthContext getEthContext() {
return ethContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
Expand Down Expand Up @@ -71,17 +72,17 @@ public static EthProtocolManager create(
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final Optional<MergePeerFilter> mergePeerFilter) {

EthPeers peers =
final EthPeers peers =
new EthPeers(
EthProtocol.NAME,
() -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()),
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
EthScheduler ethScheduler = new DeterministicEthScheduler(TimeoutPolicy.NEVER_TIMEOUT);
EthContext ethContext = new EthContext(peers, messages, ethScheduler);
final EthMessages messages = new EthMessages();
final EthScheduler ethScheduler = new DeterministicEthScheduler(TimeoutPolicy.NEVER_TIMEOUT);
final EthContext ethContext = new EthContext(peers, messages, ethScheduler);

return new EthProtocolManager(
blockchain,
Expand Down Expand Up @@ -185,15 +186,15 @@ public static EthProtocolManager create(
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthProtocolConfiguration configuration) {
EthPeers peers =
final EthPeers peers =
new EthPeers(
EthProtocol.NAME,
() -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()),
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
final EthMessages messages = new EthMessages();

return create(
blockchain,
Expand All @@ -214,15 +215,15 @@ public static EthProtocolManager create(
final TransactionPool transactionPool,
final EthProtocolConfiguration configuration,
final ForkIdManager forkIdManager) {
EthPeers peers =
final EthPeers peers =
new EthPeers(
EthProtocol.NAME,
() -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()),
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
final EthMessages messages = new EthMessages();

return create(
blockchain,
Expand All @@ -240,15 +241,15 @@ public static EthProtocolManager create(
final ProtocolSchedule protocolSchedule,
final Blockchain blockchain,
final EthScheduler ethScheduler) {
EthPeers peers =
final EthPeers peers =
new EthPeers(
EthProtocol.NAME,
() -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()),
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
final EthMessages messages = new EthMessages();

return create(
blockchain,
Expand Down Expand Up @@ -392,6 +393,17 @@ public static RespondingEthPeer createPeer(
.build();
}

public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final SnapProtocolManager snapProtocolManager,
final long estimatedHeight) {
return RespondingEthPeer.builder()
.ethProtocolManager(ethProtocolManager)
.estimatedHeight(estimatedHeight)
.snapProtocolManager(snapProtocolManager)
.build();
}

public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final long estimatedHeight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
Expand All @@ -47,6 +52,7 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -62,6 +68,7 @@
import java.util.stream.Stream;

import com.google.common.collect.Lists;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;

public class RespondingEthPeer {
Expand Down Expand Up @@ -389,6 +396,33 @@ public static Responder emptyResponder() {
};
}

public static Responder emptyResponderForSnap() {
return (cap, msg) -> {
MessageData response = null;
switch (msg.getCode()) {
case SnapV1.GET_ACCOUNT_RANGE:
response =
AccountRangeMessage.create(
Optional.of(BigInteger.ONE), Collections.emptyMap(), Collections.emptyList());
break;
case SnapV1.GET_STORAGE_RANGE:
response =
StorageRangeMessage.create(
Optional.ofNullable(BigInteger.ONE), new ArrayDeque<>(), new ArrayList<>());
break;
case SnapV1.GET_BYTECODES:
response =
ByteCodesMessage.create(Optional.ofNullable(BigInteger.ONE), Collections.emptyList());
break;
case SnapV1.GET_TRIE_NODES:
response =
TrieNodesMessage.create(Optional.ofNullable(BigInteger.ONE), Collections.emptyList());
break;
}
return Optional.ofNullable(response);
};
}

public static class Builder {
private EthProtocolManager ethProtocolManager;
private Optional<SnapProtocolManager> snapProtocolManager = Optional.empty();
Expand Down
Loading

0 comments on commit 64efb66

Please sign in to comment.