Skip to content

Commit

Permalink
Merge from main and unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Jul 14, 2022
1 parent c91bf8c commit 1a18de7
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 199 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Additions and Improvements
- Add a block to the bad blocks if it did not descend from the terminal block [#4080](https://github.com/hyperledger/besu/pull/4080)
- Backward sync exception improvements [#4092](https://github.com/hyperledger/besu/pull/4092)
- Remove block header checks during backward sync, since they will be always performed during block import phase [#4098](https://github.com/hyperledger/besu/pull/4098)
- Optimize the backward sync retry strategy [#4095](https://github.com/hyperledger/besu/pull/4095)

### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ public Optional<BlockHeader> getOrSyncHeaderByHash(final Hash blockHash) {
debugLambda(LOG, "BlockHeader {} is already present", () -> optHeader.get().toLogString());
} else {
debugLambda(LOG, "appending block hash {} to backward sync", blockHash::toHexString);
backwardSyncContext.syncBackwardsUntil(blockHash);
backwardSyncContext
.syncBackwardsUntil(blockHash)
.exceptionally(e -> logSyncException(blockHash, e));
}
return optHeader;
}
Expand All @@ -233,11 +235,18 @@ public Optional<BlockHeader> getOrSyncHeaderByHash(
} else {
debugLambda(LOG, "appending block hash {} to backward sync", blockHash::toHexString);
backwardSyncContext.updateHeads(blockHash, finalizedBlockHash);
backwardSyncContext.syncBackwardsUntil(blockHash);
backwardSyncContext
.syncBackwardsUntil(blockHash)
.exceptionally(e -> logSyncException(blockHash, e));
}
return optHeader;
}

private Void logSyncException(final Hash blockHash, final Throwable exception) {
LOG.warn("Sync to block hash " + blockHash.toHexString() + " failed", exception);
return null;
}

@Override
public Result validateBlock(final Block block) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -50,6 +49,7 @@
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.tuweni.bytes.Bytes32;
Expand Down Expand Up @@ -280,10 +280,12 @@ public void assertGetOrSyncForBlockAlreadyPresent() {
public void assertGetOrSyncForBlockNotPresent() {
BlockHeader mockHeader =
headerGenerator.parentHash(Hash.fromHexStringLenient("0xbeef")).buildHeader();
when(backwardSyncContext.syncBackwardsUntil(mockHeader.getBlockHash()))
.thenReturn(CompletableFuture.completedFuture(null));

var res = coordinator.getOrSyncHeaderByHash(mockHeader.getHash());

assertThat(res).isNotPresent();
verify(backwardSyncContext, times(1)).syncBackwardsUntil(mockHeader.getHash());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,13 @@ public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext)
new Block(newBlockHeader, new BlockBody(transactions, Collections.emptyList()));

if (mergeContext.isSyncing() || parentHeader.isEmpty()) {
mergeCoordinator.appendNewPayloadToSync(block);
mergeCoordinator
.appendNewPayloadToSync(block)
.exceptionally(
exception -> {
LOG.warn("Sync to block " + block.toLogString() + " failed", exception);
return null;
});
return respondWith(reqId, blockParam, null, SYNCING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import io.vertx.core.Vertx;
import org.apache.tuweni.bytes.Bytes32;
Expand Down Expand Up @@ -263,7 +264,8 @@ public void shouldRespondWithSyncingDuringForwardSync() {
BlockHeader mockHeader = new BlockHeaderTestFixture().baseFeePerGas(Wei.ONE).buildHeader();
when(blockchain.getBlockByHash(any())).thenReturn(Optional.empty());
when(mergeContext.isSyncing()).thenReturn(Boolean.TRUE);

when(mergeCoordinator.appendNewPayloadToSync(any()))
.thenReturn(CompletableFuture.completedFuture(null));
var resp = resp(mockPayload(mockHeader, Collections.emptyList()));

EnginePayloadStatusResult res = fromSuccessResp(resp);
Expand All @@ -275,7 +277,8 @@ public void shouldRespondWithSyncingDuringForwardSync() {
@Test
public void shouldRespondWithSyncingDuringBackwardsSync() {
BlockHeader mockHeader = new BlockHeaderTestFixture().baseFeePerGas(Wei.ONE).buildHeader();

when(mergeCoordinator.appendNewPayloadToSync(any()))
.thenReturn(CompletableFuture.completedFuture(null));
var resp = resp(mockPayload(mockHeader, Collections.emptyList()));

EnginePayloadStatusResult res = fromSuccessResp(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public synchronized void prependAncestorsHeader(final BlockHeader blockHeader) {
debugLambda(
LOG,
"Added header {} on height {} to backward chain led by pivot {} on height {}",
() -> blockHeader.toLogString(),
blockHeader::toLogString,
blockHeader::getNumber,
() -> lastStoredPivot.orElseThrow().toLogString(),
firstHeader::getNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.datatypes.Hash;
Expand All @@ -25,7 +24,6 @@
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
Expand All @@ -45,7 +43,7 @@
public class BackwardSyncContext {
private static final Logger LOG = LoggerFactory.getLogger(BackwardSyncContext.class);
public static final int BATCH_SIZE = 200;
private static final int DEFAULT_MAX_RETRIES = 100;
private static final int DEFAULT_MAX_RETRIES = 20;

private static final long DEFAULT_MILLIS_BETWEEN_RETRIES = 5000;

Expand All @@ -64,7 +62,7 @@ public class BackwardSyncContext {

private final int maxRetries;

private final long millisBetweenRetries;
private final long millisBetweenRetries = DEFAULT_MILLIS_BETWEEN_RETRIES;

public BackwardSyncContext(
final ProtocolContext protocolContext,
Expand All @@ -80,8 +78,7 @@ public BackwardSyncContext(
ethContext,
syncState,
backwardChain,
DEFAULT_MAX_RETRIES,
DEFAULT_MILLIS_BETWEEN_RETRIES);
DEFAULT_MAX_RETRIES);
}

public BackwardSyncContext(
Expand All @@ -91,8 +88,7 @@ public BackwardSyncContext(
final EthContext ethContext,
final SyncState syncState,
final BackwardChain backwardChain,
final int maxRetries,
final long millisBetweenRetries) {
final int maxRetries) {

this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
Expand All @@ -101,7 +97,6 @@ public BackwardSyncContext(
this.syncState = syncState;
this.backwardChain = backwardChain;
this.maxRetries = maxRetries;
this.millisBetweenRetries = millisBetweenRetries;
}

public synchronized boolean isSyncing() {
Expand All @@ -124,27 +119,33 @@ public synchronized void updateHeads(final Hash head, final Hash finalizedBlockH
}

public synchronized CompletableFuture<Void> syncBackwardsUntil(final Hash newBlockHash) {
final CompletableFuture<Void> future = this.currentBackwardSyncFuture.get();
if (isTrusted(newBlockHash)) return future;
backwardChain.addNewHash(newBlockHash);
if (future != null) {
return future;
Optional<CompletableFuture<Void>> maybeFuture =
Optional.ofNullable(this.currentBackwardSyncFuture.get());
if (isTrusted(newBlockHash)) {
return maybeFuture.orElseGet(() -> CompletableFuture.completedFuture(null));
}
infoLambda(LOG, "Starting new backward sync towards a pivot {}", newBlockHash::toHexString);
this.currentBackwardSyncFuture.set(prepareBackwardSyncFutureWithRetry());
return this.currentBackwardSyncFuture.get();
backwardChain.addNewHash(newBlockHash);
return maybeFuture.orElseGet(
() -> {
CompletableFuture<Void> future = prepareBackwardSyncFutureWithRetry();
this.currentBackwardSyncFuture.set(future);
return future;
});
}

public synchronized CompletableFuture<Void> syncBackwardsUntil(final Block newPivot) {
final CompletableFuture<Void> future = this.currentBackwardSyncFuture.get();
if (isTrusted(newPivot.getHash())) return future;
backwardChain.appendTrustedBlock(newPivot);
if (future != null) {
return future;
Optional<CompletableFuture<Void>> maybeFuture =
Optional.ofNullable(this.currentBackwardSyncFuture.get());
if (isTrusted(newPivot.getHash())) {
return maybeFuture.orElseGet(() -> CompletableFuture.completedFuture(null));
}
infoLambda(LOG, "Starting new backward sync towards a pivot {}", newPivot::toLogString);
this.currentBackwardSyncFuture.set(prepareBackwardSyncFutureWithRetry());
return this.currentBackwardSyncFuture.get();
backwardChain.appendTrustedBlock(newPivot);
return maybeFuture.orElseGet(
() -> {
CompletableFuture<Void> future = prepareBackwardSyncFutureWithRetry();
this.currentBackwardSyncFuture.set(future);
return future;
});
}

private boolean isTrusted(final Hash hash) {
Expand All @@ -164,7 +165,8 @@ private CompletableFuture<Void> prepareBackwardSyncFutureWithRetry() {
(unused, throwable) -> {
this.currentBackwardSyncFuture.set(null);
if (throwable != null) {
throw new BackwardSyncException(throwable);
throw extractBackwardSyncException(throwable)
.orElse(new BackwardSyncException(throwable));
}
return null;
});
Expand All @@ -173,7 +175,7 @@ private CompletableFuture<Void> prepareBackwardSyncFutureWithRetry() {
private CompletableFuture<Void> prepareBackwardSyncFutureWithRetry(final int retries) {
if (retries == 0) {
return CompletableFuture.failedFuture(
new BackwardSyncException("Max number of retries " + DEFAULT_MAX_RETRIES + " reached"));
new BackwardSyncException("Max number of retries " + maxRetries + " reached"));
}

return exceptionallyCompose(
Expand All @@ -190,29 +192,45 @@ private CompletableFuture<Void> prepareBackwardSyncFutureWithRetry(final int ret

@VisibleForTesting
protected void processException(final Throwable throwable) {
extractBackwardSyncException(throwable)
.ifPresentOrElse(
backwardSyncException -> {
if (backwardSyncException.shouldRestart()) {
LOG.info(
"Backward sync failed ({}). Current Peers: {}. Retrying in "
+ millisBetweenRetries
+ " milliseconds...",
backwardSyncException.getMessage(),
ethContext.getEthPeers().peerCount());
return;
} else {
debugLambda(
LOG, "Not recoverable backward sync exception {}", throwable::getMessage);
throw backwardSyncException;
}
},
() ->
LOG.warn(
"There was an uncaught exception during Backwards Sync. Retrying in "
+ millisBetweenRetries
+ " milliseconds...",
throwable));
}

private Optional<BackwardSyncException> extractBackwardSyncException(final Throwable throwable) {
Throwable currentCause = throwable;

while (currentCause != null) {
if (currentCause instanceof BackwardSyncException) {
if (((BackwardSyncException) currentCause).shouldRestart()) {
LOG.info(
"Backward sync failed ({}). Current Peers: {}. Retrying in few seconds... ",
currentCause.getMessage(),
ethContext.getEthPeers().peerCount());
return;
} else {
debugLambda(LOG, "Not recoverable backward sync exception {}", throwable::getMessage);
throw new BackwardSyncException(throwable);
}
return Optional.of((BackwardSyncException) currentCause);
}
currentCause = currentCause.getCause();
}
LOG.warn(
"There was an uncaught exception during Backwards Sync... Retrying in few seconds...",
throwable);
return Optional.empty();
}

private CompletableFuture<Void> prepareBackwardSyncFuture() {
@VisibleForTesting
CompletableFuture<Void> prepareBackwardSyncFuture() {
final MutableBlockchain blockchain = getProtocolContext().getBlockchain();
return new BackwardsSyncAlgorithm(
this,
Expand Down Expand Up @@ -271,7 +289,6 @@ public void resetBatchSize() {

protected Void saveBlock(final Block block) {
traceLambda(LOG, "Going to validate block {}", block::toLogString);
checkFinalizedSuccessionRuleBeforeSave(block);
var optResult =
this.getBlockValidatorForBlock(block)
.validateAndProcessBlock(
Expand Down Expand Up @@ -303,63 +320,6 @@ protected Void saveBlock(final Block block) {
return null;
}

@VisibleForTesting
protected synchronized void checkFinalizedSuccessionRuleBeforeSave(final Block block) {
final Optional<Hash> finalized = findMaybeFinalized();
if (finalized.isPresent()) {
final Optional<BlockHeader> maybeFinalizedHeader =
protocolContext
.getBlockchain()
.getBlockByHash(finalized.get())
.map(Block::getHeader)
.or(() -> backwardChain.getHeader(finalized.get()));
if (maybeFinalizedHeader.isEmpty()) {
throw new BackwardSyncException(
"We know a block "
+ finalized.get().toHexString()
+ " was finalized, but we don't have it downloaded yet, cannot save new block",
true);
}
final BlockHeader finalizedHeader = maybeFinalizedHeader.get();
if (finalizedHeader.getHash().equals(block.getHash())) {
debugLambda(LOG, "Saving new finalized block {}", block::toLogString);
return;
}

if (finalizedHeader.getNumber() == block.getHeader().getNumber()) {
throw new BackwardSyncException(
"This block is not the target finalized block. Is "
+ block.toLogString()
+ " but was expecting "
+ finalizedHeader.toLogString());
}
if (!getProtocolContext().getBlockchain().contains(finalizedHeader.getHash())) {
debugLambda(
LOG,
"Saving block {} before finalized {} reached",
block::toLogString,
finalizedHeader::toLogString); // todo: some check here??
return;
}
final Hash canonicalHash =
getProtocolContext()
.getBlockchain()
.getBlockByNumber(finalizedHeader.getNumber())
.orElseThrow()
.getHash();
if (finalizedHeader.getNumber() < block.getHeader().getNumber()
&& !canonicalHash.equals(finalizedHeader.getHash())) {
throw new BackwardSyncException(
"Finalized block "
+ finalizedHeader.toLogString()
+ " is not on canonical chain. Canonical is"
+ canonicalHash.toHexString()
+ ". We need to reorg before saving this block.");
}
}
LOG.debug("Finalized block not known yet...");
}

@VisibleForTesting
protected void possiblyMoveHead(final Block lastSavedBlock) {
final MutableBlockchain blockchain = getProtocolContext().getBlockchain();
Expand Down
Loading

0 comments on commit 1a18de7

Please sign in to comment.