Skip to content

Commit

Permalink
Retry mechanism for block creation (hyperledger#4407)
Browse files Browse the repository at this point in the history
* Retry mechanism for block creation

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Update CHANGELOG

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Always keep 1 thread active in the computation executors

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Surface StorageException when building a block for finer filtering

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Log successful block created at info

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Reformat block proposal logs

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Remove test code

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 authored Sep 20, 2022
1 parent 5319376 commit 5e15625
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Upgrade besu-native to 0.6.0 and use Blake2bf native implementation if available by default [#4264](https://github.com/hyperledger/besu/pull/4264)

### Bug Fixes
- Retry block creation if there is a transient error and we still have time, to mitigate empty block issue [#4407](https://github.com/hyperledger/besu/pull/4407)
- Fix StacklessClosedChannelException in Besu and resulted timeout errors in CL clients ([#4398](https://github.com/hyperledger/besu/issues/4398), [#4400](https://github.com/hyperledger/besu/issues/4400))

## 22.7.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protected MiningCoordinator createMiningCoordinator(
return createTransitionMiningCoordinator(
protocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
transactionPool,
miningParameters,
syncState,
Expand Down Expand Up @@ -131,6 +132,7 @@ protected EthProtocolManager createEthProtocolManager(
protected MiningCoordinator createTransitionMiningCoordinator(
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final TransactionPool transactionPool,
final MiningParameters miningParameters,
final SyncState syncState,
Expand All @@ -141,6 +143,7 @@ protected MiningCoordinator createTransitionMiningCoordinator(
return new MergeCoordinator(
protocolContext,
protocolSchedule,
ethContext,
transactionPool.getPendingTransactions(),
miningParameters,
backwardSyncContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ protected MiningCoordinator createMiningCoordinator(
mergeBesuControllerBuilder.createTransitionMiningCoordinator(
transitionProtocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
transactionPool,
transitionMiningParameters,
syncState,
Expand Down
3 changes: 3 additions & 0 deletions consensus/merge/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ dependencies {
implementation 'org.apache.tuweni:tuweni-units'

testImplementation project(path: ':consensus:common', configuration: 'testArtifacts')
testImplementation project(':crypto')
testImplementation project(path: ':crypto', configuration: 'testSupportArtifacts')
testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
testImplementation project(':metrics:core')
testImplementation project(path: ':metrics:core', configuration: 'testSupportArtifacts')
testImplementation project(':testutil')

testImplementation 'junit:junit'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.hyperledger.besu.consensus.merge.TransitionUtils.isTerminalProofOfWorkBlock;
import static org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator.ForkchoiceResult.Status.INVALID;
import static org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator.ForkchoiceResult.Status.INVALID_PAYLOAD_ATTRIBUTES;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;

import org.hyperledger.besu.consensus.merge.MergeContext;
Expand All @@ -33,20 +34,25 @@
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BadChainListener;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.AbstractGasLimitSpecification;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.exception.StorageException;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
Expand All @@ -64,17 +70,20 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
protected final AtomicReference<BlockHeader> latestDescendsFromTerminal = new AtomicReference<>();
protected final MergeContext mergeContext;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final BackwardSyncContext backwardSyncContext;
protected final ProtocolSchedule protocolSchedule;

public MergeCoordinator(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final AbstractPendingTransactionsSorter pendingTransactions,
final MiningParameters miningParams,
final BackwardSyncContext backwardSyncContext) {
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.mergeContext = protocolContext.getConsensusContext(MergeContext.class);
this.miningParameters = miningParams;
this.backwardSyncContext = backwardSyncContext;
Expand Down Expand Up @@ -188,28 +197,87 @@ public PayloadIdentifier preparePayload(
result.errorMessage);
}

tryToBuildBetterBlock(timestamp, random, payloadIdentifier, mergeBlockCreator);

return payloadIdentifier;
}

private void tryToBuildBetterBlock(
final Long timestamp,
final Bytes32 random,
final PayloadIdentifier payloadIdentifier,
final MergeBlockCreator mergeBlockCreator) {

long remainingTime = miningParameters.getPosBlockCreationTimeout();
final Supplier<Block> blockCreator =
() -> mergeBlockCreator.createBlock(Optional.empty(), random, timestamp);
// start working on a full block and update the payload value and candidate when it's ready
CompletableFuture.supplyAsync(
() -> mergeBlockCreator.createBlock(Optional.empty(), random, timestamp))
.orTimeout(12, TimeUnit.SECONDS)
.whenComplete(
(bestBlock, throwable) -> {
if (throwable != null) {
LOG.debug("something went wrong creating block", throwable);
final long startedAt = System.currentTimeMillis();
retryBlockCreation(payloadIdentifier, blockCreator, remainingTime)
.thenAccept(
bestBlock -> {
final var resultBest = validateBlock(bestBlock);
if (resultBest.blockProcessingOutputs.isPresent()) {
mergeContext.putPayloadById(payloadIdentifier, bestBlock);
LOG.info(
"Successfully built block {} for proposal identified by {}, with {} transactions, in {}ms",
bestBlock.toLogString(),
payloadIdentifier.toHexString(),
bestBlock.getBody().getTransactions().size(),
System.currentTimeMillis() - startedAt);
} else {
final var resultBest = validateBlock(bestBlock);
if (resultBest.blockProcessingOutputs.isPresent()) {
mergeContext.putPayloadById(payloadIdentifier, bestBlock);
} else {
LOG.warn(
"failed to execute block proposal {}, reason {}",
bestBlock.getHash(),
resultBest.errorMessage);
}
LOG.warn(
"Block {} built for proposal identified by {}, is not valid reason {}",
bestBlock.getHash(),
payloadIdentifier.toHexString(),
resultBest.errorMessage);
}
});
}

return payloadIdentifier;
private CompletableFuture<Block> retryBlockCreation(
final PayloadIdentifier payloadIdentifier,
final Supplier<Block> blockCreator,
final long remainingTime) {
final long startedAt = System.currentTimeMillis();

debugLambda(
LOG,
"Block creation started for payload id {}, remaining time is {}ms",
payloadIdentifier::toShortHexString,
() -> remainingTime);

return exceptionallyCompose(
ethContext
.getScheduler()
.scheduleComputationTask(blockCreator)
.orTimeout(remainingTime, TimeUnit.MILLISECONDS),
throwable -> {
if (canRetryBlockCreation(throwable)) {
final long newRemainingTime = remainingTime - (System.currentTimeMillis() - startedAt);
debugLambda(
LOG,
"Retrying block creation for payload id {}, remaining time is {}ms, last error {}",
payloadIdentifier::toShortHexString,
() -> newRemainingTime,
() -> logException(throwable));
return retryBlockCreation(payloadIdentifier, blockCreator, newRemainingTime);
} else {
debugLambda(
LOG,
"Something went wrong creating block for payload id {}, error {}",
payloadIdentifier::toShortHexString,
() -> logException(throwable));
}
return CompletableFuture.failedFuture(throwable);
});
}

private boolean canRetryBlockCreation(final Throwable throwable) {
if (throwable instanceof StorageException) {
return true;
}
return false;
}

@Override
Expand Down Expand Up @@ -633,4 +701,12 @@ public Optional<Hash> getLatestValidHashOfBadBlock(Hash blockHash) {
private boolean isPoSHeader(final BlockHeader header) {
return header.getDifficulty().equals(Difficulty.ZERO);
}

private String logException(final Throwable throwable) {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
throwable.printStackTrace(pw);
pw.flush();
return sw.toString();
}
}
Loading

0 comments on commit 5e15625

Please sign in to comment.