From 50dd40f926c3289e340e5628336ad27f03223630 Mon Sep 17 00:00:00 2001 From: Stefan Pingel <16143240+pinges@users.noreply.github.com> Date: Fri, 12 Jul 2024 15:29:19 +1000 Subject: [PATCH] Make the retrying snap tasks switching (#7307) * make snap tasks switching Signed-off-by: stefan.pingel@consensys.net Signed-off-by: Daniel Lehrner --- .../RetryingGetAccountRangeFromPeerTask.java | 15 +++++++-------- .../snap/RetryingGetBytecodeFromPeerTask.java | 15 ++++++++------- .../RetryingGetStorageRangeFromPeerTask.java | 19 ++++++++++++------- .../snap/RetryingGetTrieNodeFromPeerTask.java | 15 ++++++++------- .../pipeline/AsyncOperationProcessor.java | 6 +++++- 5 files changed, 40 insertions(+), 30 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java index 0624a90589c..ef48dcf952d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java @@ -17,18 +17,17 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage; import org.hyperledger.besu.plugin.services.MetricsSystem; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.tuweni.bytes.Bytes32; public class RetryingGetAccountRangeFromPeerTask - extends AbstractRetryingPeerTask { + extends AbstractRetryingSwitchingPeerTask { public static final int MAX_RETRIES = 4; @@ -46,9 +45,9 @@ private RetryingGetAccountRangeFromPeerTask( final MetricsSystem metricsSystem) { super( ethContext, - MAX_RETRIES, + metricsSystem, data -> data.accounts().isEmpty() && data.proofs().isEmpty(), - metricsSystem); + MAX_RETRIES); this.ethContext = ethContext; this.startKeyHash = startKeyHash; this.endKeyHash = endKeyHash; @@ -67,12 +66,12 @@ public static EthTask forAccountRange( } @Override - protected CompletableFuture executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture executeTaskOnCurrentPeer( + final EthPeer peer) { final GetAccountRangeFromPeerTask task = GetAccountRangeFromPeerTask.forAccountRange( ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); + task.assignPeer(peer); return executeSubTask(task::run) .thenApply( peerResult -> { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java index 3258298f2c7..e30d062597b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java @@ -17,19 +17,21 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; -public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask> { +public class RetryingGetBytecodeFromPeerTask + extends AbstractRetryingSwitchingPeerTask> { + + public static final int MAX_RETRIES = 4; private final EthContext ethContext; private final List codeHashes; @@ -41,7 +43,7 @@ private RetryingGetBytecodeFromPeerTask( final List codeHashes, final BlockHeader blockHeader, final MetricsSystem metricsSystem) { - super(ethContext, 4, Map::isEmpty, metricsSystem); + super(ethContext, metricsSystem, Map::isEmpty, MAX_RETRIES); this.ethContext = ethContext; this.codeHashes = codeHashes; this.blockHeader = blockHeader; @@ -57,11 +59,10 @@ public static EthTask> forByteCode( } @Override - protected CompletableFuture> executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer peer) { final GetBytecodeFromPeerTask task = GetBytecodeFromPeerTask.forBytecode(ethContext, codeHashes, blockHeader, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); + task.assignPeer(peer); return executeSubTask(task::run) .thenApply( peerResult -> { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java index 731b0b7623b..6e977bb4e79 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java @@ -17,19 +17,20 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.tuweni.bytes.Bytes32; public class RetryingGetStorageRangeFromPeerTask - extends AbstractRetryingPeerTask { + extends AbstractRetryingSwitchingPeerTask { + + public static final int MAX_RETRIES = 4; private final EthContext ethContext; private final List accountHashes; @@ -45,7 +46,11 @@ private RetryingGetStorageRangeFromPeerTask( final Bytes32 endKeyHash, final BlockHeader blockHeader, final MetricsSystem metricsSystem) { - super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem); + super( + ethContext, + metricsSystem, + data -> data.proofs().isEmpty() && data.slots().isEmpty(), + MAX_RETRIES); this.ethContext = ethContext; this.accountHashes = accountHashes; this.startKeyHash = startKeyHash; @@ -66,12 +71,12 @@ public static EthTask forStorageRange( } @Override - protected CompletableFuture executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture executeTaskOnCurrentPeer( + final EthPeer peer) { final GetStorageRangeFromPeerTask task = GetStorageRangeFromPeerTask.forStorageRange( ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); + task.assignPeer(peer); return executeSubTask(task::run) .thenApply( peerResult -> { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java index 1abf0d72302..152fd91712b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java @@ -17,18 +17,20 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.tuweni.bytes.Bytes; -public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask> { +public class RetryingGetTrieNodeFromPeerTask + extends AbstractRetryingSwitchingPeerTask> { + + public static final int MAX_RETRIES = 4; private final EthContext ethContext; private final Map> paths; @@ -40,7 +42,7 @@ private RetryingGetTrieNodeFromPeerTask( final Map> paths, final BlockHeader blockHeader, final MetricsSystem metricsSystem) { - super(ethContext, 4, Map::isEmpty, metricsSystem); + super(ethContext, metricsSystem, Map::isEmpty, MAX_RETRIES); this.ethContext = ethContext; this.paths = paths; this.blockHeader = blockHeader; @@ -56,11 +58,10 @@ public static EthTask> forTrieNodes( } @Override - protected CompletableFuture> executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer peer) { final GetTrieNodeFromPeerTask task = GetTrieNodeFromPeerTask.forTrieNodes(ethContext, paths, blockHeader, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); + task.assignPeer(peer); return executeSubTask(task::run) .thenApply( peerResult -> { diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/AsyncOperationProcessor.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/AsyncOperationProcessor.java index c8c7e5905db..e17296c026d 100644 --- a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/AsyncOperationProcessor.java +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/AsyncOperationProcessor.java @@ -83,7 +83,11 @@ private void outputNextCompletedTask(final WritePipe outputPipe) { waitForAnyFutureToComplete(); outputCompletedTasks(outputPipe); } catch (final InterruptedException e) { - LOG.trace("Interrupted while waiting for processing to complete", e.getMessage()); + LOG.atTrace() + .setMessage("Interrupted while waiting for processing to complete: Message=({})") + .addArgument(e.getMessage()) + .setCause(e) + .log(); } catch (final ExecutionException e) { throw new AsyncOperationException("Async operation failed. " + e.getMessage(), e); } catch (final TimeoutException e) {