Skip to content

Commit

Permalink
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Browse files Browse the repository at this point in the history
…-add-GetReceiptsFromPeerTask

# Conflicts:
#	ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  • Loading branch information
Matilda-Clerke committed Oct 9, 2024
2 parents 86a1f0b + 545fd5c commit 7bd048b
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,10 @@ public void setTrailingPeerRequirementsSupplier(
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeer(final Predicate<EthPeer> filter) {
return streamBestPeers()
return streamAvailablePeers()
.filter(filter)
.filter(EthPeer::hasAvailableRequestCapacity)
.findFirst();
.min(LEAST_TO_MOST_BUSY);
}

// Part of the PeerSelector interface, to be split apart later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.Collection;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -51,11 +50,22 @@ public interface PeerTask<T> {
T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException;

/**
* Gets the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor
* Gets the number of times this task may be attempted against other peers
*
* @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor
* @return the number of times this task may be attempted against other peers
*/
Collection<PeerTaskRetryBehavior> getPeerTaskRetryBehaviors();
default int getRetriesWithOtherPeer() {
return 5;
}

/**
* Gets the number of times this task may be attempted against the same peer
*
* @return the number of times this task may be attempted against the same peer
*/
default int getRetriesWithSamePeer() {
return 5;
}

/**
* Gets a Predicate that checks if an EthPeer is suitable for this PeerTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -31,9 +31,6 @@
/** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */
public class PeerTaskExecutor {

public static final int RETRIES_WITH_SAME_PEER = 2;
public static final int RETRIES_WITH_OTHER_PEER = 2;
public static final int NO_RETRIES = 0;
private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender;

Expand All @@ -55,10 +52,7 @@ public PeerTaskExecutor(

public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
PeerTaskExecutorResult<T> executorResult;
int retriesRemaining =
peerTask.getPeerTaskRetryBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS)
? RETRIES_WITH_OTHER_PEER
: NO_RETRIES;
int retriesRemaining = peerTask.getRetriesWithOtherPeer();
final Collection<EthPeer> usedEthPeers = new HashSet<>();
do {
Optional<EthPeer> peer =
Expand All @@ -84,10 +78,7 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
final PeerTask<T> peerTask, final EthPeer peer) {
MessageData requestMessageData = peerTask.getRequestMessage();
PeerTaskExecutorResult<T> executorResult;
int retriesRemaining =
peerTask.getPeerTaskRetryBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER)
? RETRIES_WITH_SAME_PEER
: NO_RETRIES;
int retriesRemaining = peerTask.getRetriesWithSamePeer();
do {
try {
T result;
Expand All @@ -103,7 +94,7 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS);

} catch (PeerConnection.PeerNotConnected e) {
} catch (PeerNotConnected e) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -69,7 +67,7 @@ public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndSuccessfulFlow()
Object responseObject = new Object();

Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
Expand Down Expand Up @@ -97,8 +95,7 @@ public void testExecuteAgainstPeerWithRetryBehaviorsAndSuccessfulFlowAfterFirstF
int requestMessageDataCode = 123;

Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors())
.thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER));
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(2);

Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Expand Down Expand Up @@ -127,7 +124,7 @@ public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndPeerNotConnected()
TimeoutException {

Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
Expand All @@ -149,7 +146,7 @@ public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndTimeoutException()
int requestMessageDataCode = 123;

Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
Expand All @@ -174,7 +171,7 @@ public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndInvalidResponseMessa
InvalidPeerTaskResponseException {

Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
Expand Down Expand Up @@ -205,7 +202,8 @@ public void testExecuteWithNoPeerTaskBehaviorsAndSuccessFlow()
.thenReturn(Optional.of(ethPeer));

Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(0);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
Expand Down Expand Up @@ -239,8 +237,8 @@ public void testExecuteWithPeerSwitchingAndSuccessFlow()
.thenReturn(Optional.of(peer2));

Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors())
.thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS));
Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(2);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException());
Expand Down

0 comments on commit 7bd048b

Please sign in to comment.