From 24e417d45657692673e32a9f5c8638436dd265ec Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Tue, 19 Apr 2022 11:00:46 +1000 Subject: [PATCH 1/8] vertx 4.2.7 (#3705) * vertx 4.2.7 Signed-off-by: Sally MacFarlane --- gradle/versions.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/versions.gradle b/gradle/versions.gradle index ca753057383..6bb70ca0c55 100644 --- a/gradle/versions.gradle +++ b/gradle/versions.gradle @@ -93,7 +93,7 @@ dependencyManagement { dependency 'io.reactivex.rxjava2:rxjava:2.2.21' - dependencySet(group: 'io.vertx', version: '4.2.3') { + dependencySet(group: 'io.vertx', version: '4.2.7') { entry 'vertx-auth-jwt' entry 'vertx-codegen' entry 'vertx-core' From 90e29e816988ab6ee3c3ac6faa9a87fba1fb8212 Mon Sep 17 00:00:00 2001 From: Frank Li Date: Wed, 20 Apr 2022 10:25:27 +1000 Subject: [PATCH 2/8] Enforce trailing peer limits when select pivot block fails (#3699) * add trailing peer limiter Signed-off-by: Frank Li * spotless Signed-off-by: Frank Li * add conservative estimation of next pivot Signed-off-by: Frank Li * create new limiter for every pivot Signed-off-by: Frank Li * typo Signed-off-by: Frank Li --- .../eth/sync/fastsync/FastSyncActions.java | 24 ++++++++++++++++--- .../ethereum/eth/sync/state/SyncState.java | 4 ++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index 82869f04477..ff1139d2225 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -24,6 +24,8 @@ import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerLimiter; +import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; @@ -177,10 +179,26 @@ private CompletableFuture retrySelectPivotBlockAfterDelay() { return ethContext .getScheduler() .scheduleFutureTask( + this::limitTrailingPeersAndRetrySelectPivotBlock, Duration.ofSeconds(5)); + } + + private long conservativelyEstimatedPivotBlock() { + long estimatedNextPivot = + syncState.getLocalChainHeight() + syncConfig.getFastSyncPivotDistance(); + return Math.min(syncState.bestChainHeight(), estimatedNextPivot); + } + + private CompletableFuture limitTrailingPeersAndRetrySelectPivotBlock() { + final TrailingPeerLimiter trailingPeerLimiter = + new TrailingPeerLimiter( + ethContext.getEthPeers(), () -> - waitForPeers(syncConfig.getFastSyncMinimumPeerCount()) - .thenCompose(ignore -> selectPivotBlockFromPeers()), - Duration.ofSeconds(5)); + new TrailingPeerRequirements( + conservativelyEstimatedPivotBlock(), syncConfig.getMaxTrailingPeers())); + trailingPeerLimiter.enforceTrailingPeerLimit(); + + return waitForPeers(syncConfig.getFastSyncMinimumPeerCount()) + .thenCompose(ignore -> selectPivotBlockFromPeers()); } public CompletableFuture downloadPivotBlockHeader( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java index 69e754a33ac..b9a1b65d581 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java @@ -242,6 +242,10 @@ private void addEstimatedHeightListener(final SyncTarget target) { target.addPeerChainEstimatedHeightListener(estimatedHeight -> checkInSync()); } + public long getLocalChainHeight() { + return blockchain.getChainHeadBlockNumber(); + } + public long bestChainHeight() { final long localChainHeight = blockchain.getChainHeadBlockNumber(); return bestChainHeight(localChainHeight); From 12a06d546d0ef5ef4a54de3da0bac06bf5227bea Mon Sep 17 00:00:00 2001 From: garyschulte Date: Thu, 21 Apr 2022 22:28:35 +0200 Subject: [PATCH 3/8] use paris evm config in merge protocolschedule (#3748) * use paris evm config in merge protocolschedule catch exception on block persist so that it can be added to badBlocks Signed-off-by: garyschulte --- .../consensus/merge/MergeProtocolSchedule.java | 14 ++++++++++++-- .../consensus/merge/MergeProtocolScheduleTest.java | 8 ++++++++ .../ethereum/mainnet/AbstractBlockProcessor.java | 7 ++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeProtocolSchedule.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeProtocolSchedule.java index 56218baf70b..01133d841db 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeProtocolSchedule.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeProtocolSchedule.java @@ -23,9 +23,11 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSpecAdapters; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpecBuilder; import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; +import org.hyperledger.besu.evm.MainnetEVMs; import org.hyperledger.besu.evm.internal.EvmConfiguration; import java.math.BigInteger; +import java.util.Optional; public class MergeProtocolSchedule { @@ -44,7 +46,11 @@ public static ProtocolSchedule create( return new ProtocolScheduleBuilder( config, DEFAULT_CHAIN_ID, - ProtocolSpecAdapters.create(0, MergeProtocolSchedule::applyMergeSpecificModifications), + ProtocolSpecAdapters.create( + 0, + (specBuilder) -> + MergeProtocolSchedule.applyMergeSpecificModifications( + specBuilder, config.getChainId())), privacyParameters, isRevertReasonEnabled, config.isQuorum(), @@ -53,9 +59,13 @@ public static ProtocolSchedule create( } private static ProtocolSpecBuilder applyMergeSpecificModifications( - final ProtocolSpecBuilder specBuilder) { + final ProtocolSpecBuilder specBuilder, final Optional chainId) { return specBuilder + .evmBuilder( + (gasCalculator, jdCacheConfig) -> + MainnetEVMs.paris( + gasCalculator, chainId.orElse(BigInteger.ZERO), EvmConfiguration.DEFAULT)) .blockProcessorBuilder(MergeBlockProcessor::new) .blockHeaderValidatorBuilder(MergeProtocolSchedule::getBlockHeaderValidator) .blockReward(Wei.ZERO) diff --git a/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/MergeProtocolScheduleTest.java b/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/MergeProtocolScheduleTest.java index b5c1a8b04a6..0779b4b93ff 100644 --- a/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/MergeProtocolScheduleTest.java +++ b/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/MergeProtocolScheduleTest.java @@ -18,10 +18,14 @@ import org.hyperledger.besu.config.GenesisConfigFile; import org.hyperledger.besu.config.GenesisConfigOptions; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.evm.Code; +import org.hyperledger.besu.evm.operation.PrevRanDaoOperation; +import org.apache.tuweni.bytes.Bytes; import org.junit.Test; public class MergeProtocolScheduleTest { @@ -55,5 +59,9 @@ public void parametersAlignWithMainnetWithAdjustments() { assertThat(london.getName()).isEqualTo("Frontier"); assertThat(london.getBlockReward()).isEqualTo(Wei.ZERO); assertThat(london.isSkipZeroBlockRewards()).isEqualTo(true); + + Bytes diffOp = Bytes.fromHexString("0x44"); + var op = london.getEvm().operationAtOffset(Code.createLegacyCode(diffOp, Hash.hash(diffOp)), 0); + assertThat(op).isInstanceOf(PrevRanDaoOperation.class); } } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java index f4b02fcee59..8ed852e5428 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java @@ -199,7 +199,12 @@ public AbstractBlockProcessor.Result processBlock( return AbstractBlockProcessor.Result.failed(); } - worldState.persist(blockHeader); + try { + worldState.persist(blockHeader); + } catch (Exception e) { + LOG.error("failed persisting block", e); + return AbstractBlockProcessor.Result.failed(); + } return AbstractBlockProcessor.Result.successful(receipts); } From 1693db984962ac1a0fb3d94b75574d2e7bb82c71 Mon Sep 17 00:00:00 2001 From: Danno Ferrin Date: Thu, 21 Apr 2022 19:57:39 -0600 Subject: [PATCH 4/8] Rename SHA3 -> Keccak256 (#3749) Rename SHA3 -> Keccak256 Ethereum doesn't do "official" sha3, we do keccak256, so rename as needed to reflect that reality. Signed-off-by: Danno Ferrin --- CHANGELOG.md | 3 +++ .../java/org/hyperledger/besu/evm/MainnetEVMs.java | 4 ++-- .../gascalculator/ConstantinopleGasCalculator.java | 2 +- .../evm/gascalculator/FrontierGasCalculator.java | 13 +++++++++---- .../besu/evm/gascalculator/GasCalculator.java | 6 +++--- .../hyperledger/besu/evm/log/LogsBloomFilter.java | 6 +++--- .../{Sha3Operation.java => Keccak256Operation.java} | 8 ++++---- 7 files changed, 25 insertions(+), 17 deletions(-) rename evm/src/main/java/org/hyperledger/besu/evm/operation/{Sha3Operation.java => Keccak256Operation.java} (86%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e899add4b7..7739171c1e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ ## 22.4.0-RC2 +### Breaking Changes +- In the Besu EVM Library all references to SHA3 have been renamed to the more accurate name Kecack256, including class names and comment. [#3749](https://github.com/hyperledger/besu/pull/3749) + ### Additions and Improvements - Onchain node permissioning - log the enodeURL that was previously only throwing an IllegalStateException during the isPermitted check [#3697](https://github.com/hyperledger/besu/pull/3697) - \[EXPERIMENTAL\] Add snapsync `--sync-mode="X_SNAP"` (only as client) [#3710](https://github.com/hyperledger/besu/pull/3710) diff --git a/evm/src/main/java/org/hyperledger/besu/evm/MainnetEVMs.java b/evm/src/main/java/org/hyperledger/besu/evm/MainnetEVMs.java index 12eca302dae..adbe751da55 100644 --- a/evm/src/main/java/org/hyperledger/besu/evm/MainnetEVMs.java +++ b/evm/src/main/java/org/hyperledger/besu/evm/MainnetEVMs.java @@ -64,6 +64,7 @@ import org.hyperledger.besu.evm.operation.JumpDestOperation; import org.hyperledger.besu.evm.operation.JumpOperation; import org.hyperledger.besu.evm.operation.JumpiOperation; +import org.hyperledger.besu.evm.operation.Keccak256Operation; import org.hyperledger.besu.evm.operation.LogOperation; import org.hyperledger.besu.evm.operation.LtOperation; import org.hyperledger.besu.evm.operation.MLoadOperation; @@ -95,7 +96,6 @@ import org.hyperledger.besu.evm.operation.SarOperation; import org.hyperledger.besu.evm.operation.SelfBalanceOperation; import org.hyperledger.besu.evm.operation.SelfDestructOperation; -import org.hyperledger.besu.evm.operation.Sha3Operation; import org.hyperledger.besu.evm.operation.ShlOperation; import org.hyperledger.besu.evm.operation.ShrOperation; import org.hyperledger.besu.evm.operation.SignExtendOperation; @@ -155,7 +155,7 @@ public static void registerFrontierOperations( registry.put(new XorOperation(gasCalculator)); registry.put(new NotOperation(gasCalculator)); registry.put(new ByteOperation(gasCalculator)); - registry.put(new Sha3Operation(gasCalculator)); + registry.put(new Keccak256Operation(gasCalculator)); registry.put(new AddressOperation(gasCalculator)); registry.put(new BalanceOperation(gasCalculator)); registry.put(new OriginOperation(gasCalculator)); diff --git a/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/ConstantinopleGasCalculator.java b/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/ConstantinopleGasCalculator.java index 2bcaa34ee1a..c71feab43f9 100644 --- a/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/ConstantinopleGasCalculator.java +++ b/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/ConstantinopleGasCalculator.java @@ -41,7 +41,7 @@ public class ConstantinopleGasCalculator extends ByzantiumGasCalculator { public long create2OperationGasCost(final MessageFrame frame) { final long initCodeLength = clampedToLong(frame.getStackItem(2)); final long numWords = clampedAdd(initCodeLength, 31) / Bytes32.SIZE; - final long initCodeHashCost = clampedMultiply(SHA3_OPERATION_WORD_GAS_COST, numWords); + final long initCodeHashCost = clampedMultiply(KECCAK256_OPERATION_WORD_GAS_COST, numWords); return clampedAdd(createOperationGasCost(frame), initCodeHashCost); } diff --git a/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/FrontierGasCalculator.java b/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/FrontierGasCalculator.java index 54f28ca1f92..9e18da60793 100644 --- a/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/FrontierGasCalculator.java +++ b/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/FrontierGasCalculator.java @@ -98,9 +98,9 @@ public class FrontierGasCalculator implements GasCalculator { private static final long SELFDESTRUCT_OPERATION_GAS_COST = 0L; - private static final long SHA3_OPERATION_BASE_GAS_COST = 30L; + private static final long KECCAK256_OPERATION_BASE_GAS_COST = 30L; - static final long SHA3_OPERATION_WORD_GAS_COST = 6L; + static final long KECCAK256_OPERATION_WORD_GAS_COST = 6L; private static final long SLOAD_OPERATION_GAS_COST = 50L; @@ -394,9 +394,14 @@ public long selfDestructOperationGasCost(final Account recipient, final Wei inhe } @Override - public long sha3OperationGasCost(final MessageFrame frame, final long offset, final long length) { + public long keccak256OperationGasCost( + final MessageFrame frame, final long offset, final long length) { return copyWordsToMemoryGasCost( - frame, SHA3_OPERATION_BASE_GAS_COST, SHA3_OPERATION_WORD_GAS_COST, offset, length); + frame, + KECCAK256_OPERATION_BASE_GAS_COST, + KECCAK256_OPERATION_WORD_GAS_COST, + offset, + length); } @Override diff --git a/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/GasCalculator.java b/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/GasCalculator.java index 94ed115c743..0d882248454 100644 --- a/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/GasCalculator.java +++ b/evm/src/main/java/org/hyperledger/besu/evm/gascalculator/GasCalculator.java @@ -26,13 +26,13 @@ import org.hyperledger.besu.evm.operation.ExtCodeHashOperation; import org.hyperledger.besu.evm.operation.ExtCodeSizeOperation; import org.hyperledger.besu.evm.operation.JumpDestOperation; +import org.hyperledger.besu.evm.operation.Keccak256Operation; import org.hyperledger.besu.evm.operation.LogOperation; import org.hyperledger.besu.evm.operation.MLoadOperation; import org.hyperledger.besu.evm.operation.MStore8Operation; import org.hyperledger.besu.evm.operation.MStoreOperation; import org.hyperledger.besu.evm.operation.SLoadOperation; import org.hyperledger.besu.evm.operation.SelfDestructOperation; -import org.hyperledger.besu.evm.operation.Sha3Operation; import org.hyperledger.besu.evm.precompile.ECRECPrecompiledContract; import org.hyperledger.besu.evm.precompile.IDPrecompiledContract; import org.hyperledger.besu.evm.precompile.RIPEMD160PrecompiledContract; @@ -329,14 +329,14 @@ long callOperationGasCost( long selfDestructOperationGasCost(Account recipient, Wei inheritance); /** - * Returns the cost for executing a {@link Sha3Operation}. + * Returns the cost for executing a {@link Keccak256Operation}. * * @param frame The current frame * @param offset The offset in memory where the data to be hashed exists * @param length The hashed data length * @return the cost for executing the memory byte store operation */ - long sha3OperationGasCost(MessageFrame frame, long offset, long length); + long keccak256OperationGasCost(MessageFrame frame, long offset, long length); /** * Returns the cost for executing a {@link SLoadOperation}. diff --git a/evm/src/main/java/org/hyperledger/besu/evm/log/LogsBloomFilter.java b/evm/src/main/java/org/hyperledger/besu/evm/log/LogsBloomFilter.java index ada4bdf84d7..787ae778510 100644 --- a/evm/src/main/java/org/hyperledger/besu/evm/log/LogsBloomFilter.java +++ b/evm/src/main/java/org/hyperledger/besu/evm/log/LogsBloomFilter.java @@ -29,7 +29,7 @@ /* * Bloom filter implementation for storing persistent logs, describes a 2048-bit representation of * all log entries of a transaction, except data. Sets the bits of the 2048 byte array, where - * indices are given by: The lower order 11-bits, of the first three double-bytes, of the SHA3, of + * indices are given by: The lower order 11-bits, of the first three double-bytes, of the KECCAK256, of * each value. For instance the address "0x0F572E5295C57F15886F9B263E2F6D2D6C7B5EC6" results in the * KECCAK256 hash "bd2b01afcd27800b54d2179edc49e2bffde5078bb6d0b204694169b1643fb108", of which the * corresponding double-bytes are: bd2b, 01af, cd27, corresponding to the following bits in the @@ -145,8 +145,8 @@ public LogsBloomFilter build() { } /** - * Discover the low order 11-bits, of the first three double-bytes, of the SHA3 hash, of each - * value and update the bloom filter accordingly. + * Discover the low order 11-bits, of the first three double-bytes, of the KECCAK256 hash, of + * each value and update the bloom filter accordingly. * * @param hashValue The hash of the log item. */ diff --git a/evm/src/main/java/org/hyperledger/besu/evm/operation/Sha3Operation.java b/evm/src/main/java/org/hyperledger/besu/evm/operation/Keccak256Operation.java similarity index 86% rename from evm/src/main/java/org/hyperledger/besu/evm/operation/Sha3Operation.java rename to evm/src/main/java/org/hyperledger/besu/evm/operation/Keccak256Operation.java index c49cf2c5e70..50b44506613 100644 --- a/evm/src/main/java/org/hyperledger/besu/evm/operation/Sha3Operation.java +++ b/evm/src/main/java/org/hyperledger/besu/evm/operation/Keccak256Operation.java @@ -28,10 +28,10 @@ import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.units.bigints.UInt256; -public class Sha3Operation extends AbstractOperation { +public class Keccak256Operation extends AbstractOperation { - public Sha3Operation(final GasCalculator gasCalculator) { - super(0x20, "SHA3", 2, 1, 1, gasCalculator); + public Keccak256Operation(final GasCalculator gasCalculator) { + super(0x20, "KECCAK256", 2, 1, 1, gasCalculator); } @Override @@ -39,7 +39,7 @@ public OperationResult execute(final MessageFrame frame, final EVM evm) { final long from = clampedToLong(frame.popStackItem()); final long length = clampedToLong(frame.popStackItem()); - final long cost = gasCalculator().sha3OperationGasCost(frame, from, length); + final long cost = gasCalculator().keccak256OperationGasCost(frame, from, length); if (frame.getRemainingGas() < cost) { return new OperationResult( OptionalLong.of(cost), Optional.of(ExceptionalHaltReason.INSUFFICIENT_GAS)); From 23f5e2e93382b6c59e0959b6854283c4a3b96a59 Mon Sep 17 00:00:00 2001 From: garyschulte Date: Fri, 22 Apr 2022 17:42:59 +0200 Subject: [PATCH 5/8] wrap rocksdb segmenthandles in atomicreference to ensure we do not reference closed handles (#3734) Signed-off-by: garyschulte --- .../RocksDBColumnarKeyValueStorage.java | 72 ++++++++++--------- .../RocksDBColumnarKeyValueStorageTest.java | 32 +++++---- .../kvstore/SegmentedKeyValueStorage.java | 5 +- .../SegmentedKeyValueStorageAdapter.java | 21 +++--- 4 files changed, 70 insertions(+), 60 deletions(-) diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java index 181eebef0e1..0a37084da64 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java @@ -31,16 +31,17 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.collect.ImmutableMap; import org.apache.tuweni.bytes.Bytes; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.ColumnFamilyDescriptor; @@ -62,19 +63,19 @@ public class RocksDBColumnarKeyValueStorage implements SegmentedKeyValueStorage { - static { - RocksDbUtil.loadNativeLibrary(); - } - private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueStorage.class); private static final String DEFAULT_COLUMN = "default"; private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device"; + static { + RocksDbUtil.loadNativeLibrary(); + } + private final DBOptions options; private final TransactionDBOptions txOptions; private final TransactionDB db; private final AtomicBoolean closed = new AtomicBoolean(false); - private final Map columnHandlesByName; + private final Map> columnHandlesByName; private final RocksDBMetrics metrics; private final WriteOptions tryDeleteOptions = new WriteOptions().setNoSlowdown(true); @@ -127,14 +128,17 @@ public RocksDBColumnarKeyValueStorage( Collectors.toMap( segment -> Bytes.wrap(segment.getId()), SegmentIdentifier::getName)); - columnHandlesByName = new HashMap<>(); + final ImmutableMap.Builder> builder = + ImmutableMap.builder(); for (ColumnFamilyHandle columnHandle : columnHandles) { final String segmentName = requireNonNullElse( segmentsById.get(Bytes.wrap(columnHandle.getName())), DEFAULT_COLUMN); - columnHandlesByName.put(segmentName, columnHandle); + builder.put(segmentName, new AtomicReference<>(columnHandle)); } + columnHandlesByName = builder.build(); + } catch (final RocksDBException e) { throw new StorageException(e); } @@ -146,7 +150,8 @@ private BlockBasedTableConfig createBlockBasedTableConfig(final RocksDBConfigura } @Override - public ColumnFamilyHandle getSegmentIdentifierByName(final SegmentIdentifier segment) { + public AtomicReference getSegmentIdentifierByName( + final SegmentIdentifier segment) { return columnHandlesByName.get(segment.getName()); } @@ -198,30 +203,27 @@ public Set getAllKeysThat( } @Override - public ColumnFamilyHandle clear(final ColumnFamilyHandle segmentHandle) { - try { - - var entry = - columnHandlesByName.entrySet().stream() - .filter(e -> e.getValue().equals(segmentHandle)) - .findAny(); - - if (entry.isPresent()) { - String segmentName = entry.get().getKey(); - ColumnFamilyDescriptor descriptor = - new ColumnFamilyDescriptor( - segmentHandle.getName(), segmentHandle.getDescriptor().getOptions()); - db.dropColumnFamily(segmentHandle); - segmentHandle.close(); - ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor); - columnHandlesByName.put(segmentName, newHandle); - return newHandle; - } - - return segmentHandle; - - } catch (final RocksDBException e) { - throw new StorageException(e); + public void clear(final ColumnFamilyHandle segmentHandle) { + + var entry = + columnHandlesByName.values().stream().filter(e -> e.get().equals(segmentHandle)).findAny(); + + if (entry.isPresent()) { + AtomicReference segmentHandleRef = entry.get(); + segmentHandleRef.getAndUpdate( + oldHandle -> { + try { + ColumnFamilyDescriptor descriptor = + new ColumnFamilyDescriptor( + segmentHandle.getName(), segmentHandle.getDescriptor().getOptions()); + db.dropColumnFamily(oldHandle); + ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor); + segmentHandle.close(); + return newHandle; + } catch (final RocksDBException e) { + throw new StorageException(e); + } + }); } } @@ -231,7 +233,9 @@ public void close() { txOptions.close(); options.close(); tryDeleteOptions.close(); - columnHandlesByName.values().forEach(ColumnFamilyHandle::close); + columnHandlesByName.values().stream() + .map(AtomicReference::get) + .forEach(ColumnFamilyHandle::close); db.close(); } } diff --git a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java index 6e27cb8f4d1..95da9eda660 100644 --- a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java +++ b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java @@ -31,8 +31,8 @@ import java.util.Arrays; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Supplier; import org.junit.Rule; import org.junit.Test; @@ -49,7 +49,10 @@ public void assertClear() throws Exception { final byte[] val1 = bytesFromHexString("0FFF"); final byte[] val2 = bytesFromHexString("1337"); final SegmentedKeyValueStorage store = createSegmentedStore(); - Supplier segment = () -> store.getSegmentIdentifierByName(TestSegment.FOO); + AtomicReference segment = store.getSegmentIdentifierByName(TestSegment.FOO); + KeyValueStorage duplicateSegmentRef = + new SegmentedKeyValueStorageAdapter<>(TestSegment.FOO, store); + final Consumer insert = value -> { final Transaction tx = store.startTransaction(); @@ -59,18 +62,18 @@ public void assertClear() throws Exception { // insert val: insert.accept(val1); - final Optional result = store.get(segment.get(), key); - assertThat(result.orElse(null)).isEqualTo(val1); + assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val1); + assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val1); // clear and assert empty: store.clear(segment.get()); - final Optional truncResult = store.get(segment.get(), key); - assertThat(truncResult).isEmpty(); + assertThat(store.get(segment.get(), key)).isEmpty(); + assertThat(duplicateSegmentRef.get(key)).isEmpty(); // insert into empty: insert.accept(val2); - final Optional nextResult = store.get(segment.get(), key); - assertThat(nextResult.orElse(null)).isEqualTo(val2); + assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val2); + assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val2); store.close(); } @@ -81,13 +84,14 @@ public void twoSegmentsAreIndependent() throws Exception { final Transaction tx = store.startTransaction(); tx.put( - store.getSegmentIdentifierByName(TestSegment.BAR), + store.getSegmentIdentifierByName(TestSegment.BAR).get(), bytesFromHexString("0001"), bytesFromHexString("0FFF")); tx.commit(); final Optional result = - store.get(store.getSegmentIdentifierByName(TestSegment.FOO), bytesFromHexString("0001")); + store.get( + store.getSegmentIdentifierByName(TestSegment.FOO).get(), bytesFromHexString("0001")); assertThat(result).isEmpty(); @@ -100,8 +104,8 @@ public void canRemoveThroughSegmentIteration() throws Exception { // properly for (int i = 0; i < 50; i++) { final SegmentedKeyValueStorage store = createSegmentedStore(); - final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO); - final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR); + final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get(); + final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get(); final Transaction tx = store.startTransaction(); tx.put(fooSegment, bytesOf(1), bytesOf(1)); @@ -144,8 +148,8 @@ public void canRemoveThroughSegmentIteration() throws Exception { @Test public void canGetThroughSegmentIteration() throws Exception { final SegmentedKeyValueStorage store = createSegmentedStore(); - final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO); - final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR); + final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get(); + final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get(); final Transaction tx = store.startTransaction(); tx.put(fooSegment, bytesOf(1), bytesOf(1)); diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java index aa7f0224747..a09b1a8989e 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Stream; @@ -30,7 +31,7 @@ */ public interface SegmentedKeyValueStorage extends Closeable { - S getSegmentIdentifierByName(SegmentIdentifier segment); + AtomicReference getSegmentIdentifierByName(SegmentIdentifier segment); /** * Get the value from the associated segment and key. @@ -74,7 +75,7 @@ default boolean containsKey(final S segment, final byte[] key) throws StorageExc Set getAllKeysThat(S segmentHandle, Predicate returnCondition); - S clear(S segmentHandle); + void clear(S segmentHandle); /** * Represents a set of changes to be committed atomically. A single transaction is not diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java index f2d7c2d0721..1fc65627808 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java @@ -22,47 +22,48 @@ import java.io.IOException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Stream; public class SegmentedKeyValueStorageAdapter implements KeyValueStorage { - private S segmentHandle; + private final AtomicReference segmentHandle; private final SegmentedKeyValueStorage storage; public SegmentedKeyValueStorageAdapter( final SegmentIdentifier segment, final SegmentedKeyValueStorage storage) { - this.segmentHandle = storage.getSegmentIdentifierByName(segment); + segmentHandle = storage.getSegmentIdentifierByName(segment); this.storage = storage; } @Override public void clear() { - segmentHandle = storage.clear(segmentHandle); + storage.clear(segmentHandle.get()); } @Override public boolean containsKey(final byte[] key) throws StorageException { - return storage.containsKey(segmentHandle, key); + return storage.containsKey(segmentHandle.get(), key); } @Override public Optional get(final byte[] key) throws StorageException { - return storage.get(segmentHandle, key); + return storage.get(segmentHandle.get(), key); } @Override public Set getAllKeysThat(final Predicate returnCondition) { - return storage.getAllKeysThat(segmentHandle, returnCondition); + return storage.getAllKeysThat(segmentHandle.get(), returnCondition); } @Override public Stream streamKeys() { - return storage.streamKeys(segmentHandle); + return storage.streamKeys(segmentHandle.get()); } @Override public boolean tryDelete(final byte[] key) { - return storage.tryDelete(segmentHandle, key); + return storage.tryDelete(segmentHandle.get(), key); } @Override @@ -77,12 +78,12 @@ public KeyValueStorageTransaction startTransaction() throws StorageException { @Override public void put(final byte[] key, final byte[] value) { - transaction.put(segmentHandle, key, value); + transaction.put(segmentHandle.get(), key, value); } @Override public void remove(final byte[] key) { - transaction.remove(segmentHandle, key); + transaction.remove(segmentHandle.get(), key); } @Override From 714c5383a501bd2a205e31890a8c5cd2c176a496 Mon Sep 17 00:00:00 2001 From: garyschulte Date: Sat, 23 Apr 2022 18:37:46 +0200 Subject: [PATCH 6/8] Feature/bonsai fast sync clear flat db (#3707) * for snap and fastsync. we need to clean the flat storage between two pivot block * return empty optional when storage and account location are empty or the data does not match the hash. Signed-off-by: garyschulte Co-authored-by: Karim TAAM --- .../eth/sync/fastsync/FastSyncDownloader.java | 3 ++- .../worldstate/AccountTrieNodeDataRequest.java | 7 ++++++- .../fastsync/worldstate/FastDownloaderFactory.java | 2 +- .../worldstate/StorageTrieNodeDataRequest.java | 11 +++++++++-- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java index eccc5f52a13..d4aa13caaaf 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java @@ -82,7 +82,8 @@ public CompletableFuture start() { protected CompletableFuture start(final FastSyncState fastSyncState) { LOG.info("Starting fast sync."); if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) { - worldStateStorage.clear(); + LOG.info("Clearing bonsai flat account db"); + worldStateStorage.clearFlatDatabase(); } return findPivotBlock(fastSyncState, fss -> downloadChainAndWorldState(fastSyncActions, fss)); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/AccountTrieNodeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/AccountTrieNodeDataRequest.java index 6ee9e4dbf87..5903b5e1a66 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/AccountTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/AccountTrieNodeDataRequest.java @@ -43,7 +43,12 @@ protected void doPersist(final Updater updater) { @Override public Optional getExistingData(final WorldStateStorage worldStateStorage) { - return worldStateStorage.getAccountTrieNodeData(getLocation().orElse(Bytes.EMPTY), getHash()); + return getLocation() + .flatMap( + location -> + worldStateStorage + .getAccountStateTrieNode(location, getHash()) + .filter(data -> Hash.hash(data).equals(getHash()))); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index 2996bed0422..2c2177fa1fa 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -86,7 +86,7 @@ public static Optional> create( return Optional.empty(); } if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) { - worldStateStorage.clear(); + worldStateStorage.clearFlatDatabase(); } else { final Path queueDataDir = fastSyncDataDirectory.resolve("statequeue"); if (queueDataDir.toFile().exists()) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StorageTrieNodeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StorageTrieNodeDataRequest.java index d3cbf51bcba..5aebaa987a1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StorageTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StorageTrieNodeDataRequest.java @@ -46,8 +46,15 @@ protected void doPersist(final Updater updater) { @Override public Optional getExistingData(final WorldStateStorage worldStateStorage) { - return worldStateStorage.getAccountStorageTrieNode( - getAccountHash().orElse(Hash.EMPTY), getLocation().orElse(Hash.EMPTY), getHash()); + return getAccountHash() + .flatMap( + accountHash -> + getLocation() + .flatMap( + location -> + worldStateStorage + .getAccountStorageTrieNode(accountHash, location, getHash()) + .filter(data -> Hash.hash(data).equals(getHash())))); } @Override From e01f63bbfab4ba057498096d2dd24277980e7337 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Mon, 25 Apr 2022 13:00:37 +0200 Subject: [PATCH 7/8] Adapt Fast sync, and Snap sync, to use finalized block as pivot after the Merge (#3655) Signed-off-by: Fabio Di Fabio --- CHANGELOG.md | 1 + .../controller/BesuControllerBuilder.java | 55 +++++++- .../merge/FinalizedBlockHashSupplier.java | 45 +++++++ .../besu/consensus/merge/MergeContext.java | 18 +++ .../consensus/merge/PostMergeContext.java | 23 ++++ .../consensus/merge/TransitionContext.java | 21 +++ .../engine/EngineForkchoiceUpdated.java | 15 ++- .../engine/EngineForkchoiceUpdatedTest.java | 5 + .../eth/sync/DefaultSynchronizer.java | 85 ++++++++---- .../eth/sync/PipelineChainDownloader.java | 4 +- .../ethereum/eth/sync/PivotBlockSelector.java | 33 +++++ .../backwardsync/BackwardSyncContext.java | 17 +-- .../sync/fastsync/FastImportBlocksStep.java | 3 + .../eth/sync/fastsync/FastSyncActions.java | 122 ++++++++++++------ .../eth/sync/fastsync/FastSyncState.java | 32 ++++- .../PivotSelectorFromFinalizedBlock.java | 69 ++++++++++ .../sync/fastsync/PivotSelectorFromPeers.java | 53 ++++++++ .../fastsync/TransitionPivotSelector.java | 95 ++++++++++++++ .../fastsync/worldstate/CompleteTaskStep.java | 24 ++-- .../worldstate/FastDownloaderFactory.java | 3 + .../eth/sync/fullsync/FullSyncDownloader.java | 12 +- .../sync/snapsync/SnapDownloaderFactory.java | 3 + .../eth/sync/snapsync/SnapSyncState.java | 5 +- .../ethereum/eth/sync/state/SyncState.java | 24 +++- .../RetryingGetHeaderFromPeerByHashTask.java | 90 +++++++++++++ .../backwardsync/BackwardSyncContextTest.java | 9 +- .../fastsync/FastDownloaderFactoryTest.java | 11 +- .../sync/fastsync/FastSyncActionsTest.java | 53 ++++++-- .../besu/metrics/RunnableTimedCounter.java | 69 ++++++++++ .../metrics/RunnableTimedCounterTest.java | 61 +++++++++ 30 files changed, 919 insertions(+), 141 deletions(-) create mode 100644 consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PivotBlockSelector.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromFinalizedBlock.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/TransitionPivotSelector.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java create mode 100644 metrics/core/src/main/java/org/hyperledger/besu/metrics/RunnableTimedCounter.java create mode 100644 metrics/core/src/test/java/org/hyperledger/besu/metrics/RunnableTimedCounterTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 7739171c1e7..67e1ede826f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ ### Additions and Improvements - Onchain node permissioning - log the enodeURL that was previously only throwing an IllegalStateException during the isPermitted check [#3697](https://github.com/hyperledger/besu/pull/3697) - \[EXPERIMENTAL\] Add snapsync `--sync-mode="X_SNAP"` (only as client) [#3710](https://github.com/hyperledger/besu/pull/3710) +- Adapt Fast sync, and Snap sync, to use finalized block, from consensus layer, as pivot after the Merge [#3506](https://github.com/hyperledger/besu/issues/3506) ### Bug Fixes diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index fccd623e786..774935f20d6 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -17,6 +17,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.hyperledger.besu.config.GenesisConfigFile; +import org.hyperledger.besu.config.GenesisConfigOptions; +import org.hyperledger.besu.consensus.merge.FinalizedBlockHashSupplier; +import org.hyperledger.besu.consensus.merge.MergeContext; import org.hyperledger.besu.consensus.qbft.pki.PkiBlockCreationConfiguration; import org.hyperledger.besu.crypto.NodeKey; import org.hyperledger.besu.datatypes.Hash; @@ -49,8 +52,12 @@ import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator; import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromFinalizedBlock; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.TransitionPivotSelector; import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; @@ -79,6 +86,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -329,8 +337,9 @@ public BesuController build() { syncConfig.getComputationParallelism(), metricsSystem); final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler); - final SyncState syncState = new SyncState(blockchain, ethPeers); - final boolean fastSyncEnabled = SyncMode.FAST.equals(syncConfig.getSyncMode()); + final boolean fastSyncEnabled = + EnumSet.of(SyncMode.FAST, SyncMode.X_SNAP).contains(syncConfig.getSyncMode()); + final SyncState syncState = new SyncState(blockchain, ethPeers, fastSyncEnabled); final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool( @@ -360,6 +369,8 @@ public BesuController build() { final Optional maybeSnapProtocolManager = createSnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive); + final PivotBlockSelector pivotBlockSelector = createPivotSelector(protocolContext); + final Synchronizer synchronizer = new DefaultSynchronizer( syncConfig, @@ -368,12 +379,13 @@ public BesuController build() { worldStateStorage, ethProtocolManager.getBlockBroadcaster(), maybePruner, - ethProtocolManager.ethContext(), + ethContext, syncState, dataDirectory, clock, metricsSystem, - getFullSyncTerminationCondition(protocolContext.getBlockchain())); + getFullSyncTerminationCondition(protocolContext.getBlockchain()), + pivotBlockSelector); final MiningCoordinator miningCoordinator = createMiningCoordinator( @@ -418,6 +430,41 @@ public BesuController build() { additionalPluginServices); } + private PivotBlockSelector createPivotSelector(final ProtocolContext protocolContext) { + + final PivotSelectorFromPeers pivotSelectorFromPeers = new PivotSelectorFromPeers(syncConfig); + final GenesisConfigOptions genesisConfigOptions = genesisConfig.getConfigOptions(); + + if (genesisConfigOptions.getTerminalTotalDifficulty().isPresent()) { + LOG.info( + "TTD difficulty is present, creating initial sync phase with transition to PoS support"); + + final MergeContext mergeContext = protocolContext.getConsensusContext(MergeContext.class); + final FinalizedBlockHashSupplier finalizedBlockHashSupplier = + new FinalizedBlockHashSupplier(); + final long subscriptionId = + mergeContext.addNewForkchoiceMessageListener(finalizedBlockHashSupplier); + + final Runnable unsubscribeFinalizedBlockHashListener = + () -> { + mergeContext.removeNewForkchoiceMessageListener(subscriptionId); + LOG.info("Initial sync done, unsubscribe finalized block hash supplier"); + }; + + return new TransitionPivotSelector( + genesisConfigOptions, + finalizedBlockHashSupplier, + pivotSelectorFromPeers, + new PivotSelectorFromFinalizedBlock( + genesisConfigOptions, + finalizedBlockHashSupplier, + unsubscribeFinalizedBlockHashListener)); + } else { + LOG.info("TTD difficulty is not present, creating initial sync phase for PoW"); + return pivotSelectorFromPeers; + } + } + protected SyncTerminationCondition getFullSyncTerminationCondition(final Blockchain blockchain) { return genesisConfig .getConfigOptions() diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java new file mode 100644 index 00000000000..46d9172cabc --- /dev/null +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java @@ -0,0 +1,45 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.consensus.merge; + +import org.hyperledger.besu.consensus.merge.MergeContext.NewForkchoiceMessageListener; +import org.hyperledger.besu.datatypes.Hash; + +import java.util.Optional; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FinalizedBlockHashSupplier + implements Supplier>, NewForkchoiceMessageListener { + private static final Logger LOG = LoggerFactory.getLogger(FinalizedBlockHashSupplier.class); + + private volatile Optional lastAnnouncedFinalizedBlockHash = Optional.empty(); + + @Override + public void onNewForkchoiceMessage( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash) { + lastAnnouncedFinalizedBlockHash = maybeFinalizedBlockHash; + LOG.debug("New finalized block hash announced {}", lastAnnouncedFinalizedBlockHash); + } + + @Override + public Optional get() { + return lastAnnouncedFinalizedBlockHash; + } +} diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java index 43b97a70d0d..c53a6481f77 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.consensus.merge; import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ConsensusContext; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -37,6 +38,11 @@ public interface MergeContext extends ConsensusContext { void observeNewIsPostMergeState(final NewMergeStateCallback newMergeStateCallback); + long addNewForkchoiceMessageListener( + final NewForkchoiceMessageListener newForkchoiceMessageListener); + + void removeNewForkchoiceMessageListener(final long subscriberId); + Difficulty getTerminalTotalDifficulty(); void setFinalized(final BlockHeader blockHeader); @@ -53,7 +59,19 @@ public interface MergeContext extends ConsensusContext { Optional retrieveBlockById(final PayloadIdentifier payloadId); + void fireNewForkchoiceMessageEvent( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash); + interface NewMergeStateCallback { void onNewIsPostMergeState(final boolean newIsPostMergeState); } + + interface NewForkchoiceMessageListener { + void onNewForkchoiceMessage( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash); + } } diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java index 6dc8a652113..a80d93c3b0b 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.consensus.merge; import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ConsensusContext; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -42,6 +43,8 @@ public class PostMergeContext implements MergeContext { new AtomicReference<>(Optional.empty()); private final Subscribers newMergeStateCallbackSubscribers = Subscribers.create(); + private final Subscribers newForkchoiceMessageCallbackSubscribers = + Subscribers.create(); private final EvictingQueue blocksInProgress = EvictingQueue.create(MAX_BLOCKS_IN_PROGRESS); @@ -123,6 +126,26 @@ public void observeNewIsPostMergeState(final NewMergeStateCallback newMergeState newMergeStateCallbackSubscribers.subscribe(newMergeStateCallback); } + @Override + public long addNewForkchoiceMessageListener( + final NewForkchoiceMessageListener newForkchoiceMessageListener) { + return newForkchoiceMessageCallbackSubscribers.subscribe(newForkchoiceMessageListener); + } + + @Override + public void removeNewForkchoiceMessageListener(final long subscriberId) { + newForkchoiceMessageCallbackSubscribers.unsubscribe(subscriberId); + } + + @Override + public void fireNewForkchoiceMessageEvent( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash) { + newForkchoiceMessageCallbackSubscribers.forEach( + cb -> cb.onNewForkchoiceMessage(headBlockHash, maybeFinalizedBlockHash, safeBlockHash)); + } + @Override public Difficulty getTerminalTotalDifficulty() { return terminalTotalDifficulty.get(); diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionContext.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionContext.java index 86bb0b51893..802cddb9e23 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionContext.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionContext.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.consensus.merge; import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ConsensusContext; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -71,6 +72,26 @@ public void observeNewIsPostMergeState(final NewMergeStateCallback newMergeState postMergeContext.observeNewIsPostMergeState(newMergeStateCallback); } + @Override + public long addNewForkchoiceMessageListener( + final NewForkchoiceMessageListener newForkchoiceMessageListener) { + return postMergeContext.addNewForkchoiceMessageListener(newForkchoiceMessageListener); + } + + @Override + public void removeNewForkchoiceMessageListener(final long subscriberId) { + postMergeContext.removeNewForkchoiceMessageListener(subscriberId); + } + + @Override + public void fireNewForkchoiceMessageEvent( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash) { + postMergeContext.fireNewForkchoiceMessageEvent( + headBlockHash, maybeFinalizedBlockHash, safeBlockHash); + } + @Override public Difficulty getTerminalTotalDifficulty() { return postMergeContext.getTerminalTotalDifficulty(); diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java index da2b9e5ad05..e55a0a7ed2a 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java @@ -66,6 +66,13 @@ public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext) final Optional optionalPayloadAttributes = requestContext.getOptionalParameter(1, EnginePayloadAttributesParameter.class); + Optional maybeFinalizedHash = + Optional.ofNullable(forkChoice.getFinalizedBlockHash()) + .filter(finalized -> !Hash.ZERO.equals(finalized)); + + mergeContext.fireNewForkchoiceMessageEvent( + forkChoice.getHeadBlockHash(), maybeFinalizedHash, forkChoice.getSafeBlockHash()); + if (mergeContext.isSyncing()) { return new JsonRpcSuccessResponse( requestContext.getRequest().getId(), @@ -80,14 +87,10 @@ public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext) Optional newHead = protocolContext.getBlockchain().getBlockHeader(forkChoice.getHeadBlockHash()); - Optional finalizedHash = - Optional.ofNullable(forkChoice.getFinalizedBlockHash()) - .filter(finalized -> !Hash.ZERO.equals(finalized)); - Optional finalizedHead = - finalizedHash.flatMap(protocolContext.getBlockchain()::getBlockHeader); + maybeFinalizedHash.flatMap(protocolContext.getBlockchain()::getBlockHeader); - if (newHead.isPresent() && (finalizedHash.isEmpty() || finalizedHead.isPresent())) { + if (newHead.isPresent() && (maybeFinalizedHash.isEmpty() || finalizedHead.isPresent())) { // TODO: post-merge cleanup, this should be unnecessary after merge if (!mergeCoordinator.latestValidAncestorDescendsFromTerminal(newHead.get())) { diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdatedTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdatedTest.java index f5722402f1b..f266889666a 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdatedTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdatedTest.java @@ -21,6 +21,7 @@ import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.ExecutionEngineJsonRpcMethod.EngineStatus.VALID; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.hyperledger.besu.consensus.merge.MergeContext; @@ -228,6 +229,10 @@ private EngineUpdateForkchoiceResult assertSuccessWithPayloadForForkchoiceResult assertThat(res.getPayloadStatus().getLatestValidHash()).isEmpty(); assertThat(res.getPayloadId()).isNull(); } + + // assert that listeners are always notified + verify(mergeContext).fireNewForkchoiceMessageEvent(mockHash, Optional.of(mockHash), mockHash); + return res; } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index bb8e7c2e287..aa83ebefe73 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -51,10 +51,12 @@ public class DefaultSynchronizer implements Synchronizer { private final Optional maybePruner; private final SyncState syncState; private final AtomicBoolean running = new AtomicBoolean(false); - private final BlockPropagationManager blockPropagationManager; + private final Optional blockPropagationManager; private final Optional> fastSyncDownloader; - private final FullSyncDownloader fullSyncDownloader; + private final Optional fullSyncDownloader; private final ProtocolContext protocolContext; + private final PivotBlockSelector pivotBlockSelector; + private final SyncTerminationCondition terminationCondition; public DefaultSynchronizer( final SynchronizerConfiguration syncConfig, @@ -68,11 +70,14 @@ public DefaultSynchronizer( final Path dataDirectory, final Clock clock, final MetricsSystem metricsSystem, - final SyncTerminationCondition terminationCondition) { + final SyncTerminationCondition terminationCondition, + final PivotBlockSelector pivotBlockSelector) { this.maybePruner = maybePruner; this.syncState = syncState; - + this.pivotBlockSelector = pivotBlockSelector; this.protocolContext = protocolContext; + this.terminationCondition = terminationCondition; + ChainHeadTracker.trackChainHeadForPeers( ethContext, protocolSchedule, @@ -81,29 +86,36 @@ public DefaultSynchronizer( metricsSystem); this.blockPropagationManager = - new BlockPropagationManager( - syncConfig, - protocolSchedule, - protocolContext, - ethContext, - syncState, - new PendingBlocksManager(syncConfig), - metricsSystem, - blockBroadcaster); + terminationCondition.shouldStopDownload() + ? Optional.empty() + : Optional.of( + new BlockPropagationManager( + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + new PendingBlocksManager(syncConfig), + metricsSystem, + blockBroadcaster)); this.fullSyncDownloader = - new FullSyncDownloader( - syncConfig, - protocolSchedule, - protocolContext, - ethContext, - syncState, - metricsSystem, - terminationCondition); + terminationCondition.shouldStopDownload() + ? Optional.empty() + : Optional.of( + new FullSyncDownloader( + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + metricsSystem, + terminationCondition)); if (SyncMode.X_SNAP.equals(syncConfig.getSyncMode())) { this.fastSyncDownloader = SnapDownloaderFactory.createSnapDownloader( + pivotBlockSelector, syncConfig, dataDirectory, protocolSchedule, @@ -116,6 +128,7 @@ public DefaultSynchronizer( } else { this.fastSyncDownloader = FastDownloaderFactory.create( + pivotBlockSelector, syncConfig, dataDirectory, protocolSchedule, @@ -142,19 +155,23 @@ public DefaultSynchronizer( private TrailingPeerRequirements calculateTrailingPeerRequirements() { return fastSyncDownloader .flatMap(FastSyncDownloader::calculateTrailingPeerRequirements) - .orElseGet(fullSyncDownloader::calculateTrailingPeerRequirements); + .orElse( + fullSyncDownloader + .map(FullSyncDownloader::calculateTrailingPeerRequirements) + .orElse(TrailingPeerRequirements.UNRESTRICTED)); } @Override public CompletableFuture start() { if (running.compareAndSet(false, true)) { LOG.info("Starting synchronizer."); - blockPropagationManager.start(); + blockPropagationManager.ifPresent(BlockPropagationManager::start); CompletableFuture future; if (fastSyncDownloader.isPresent()) { future = fastSyncDownloader.get().start().thenCompose(this::handleFastSyncResult); } else { + syncState.markInitialSyncPhaseAsDone(); future = startFullSync(); } future = future.thenApply(this::finalizeSync); @@ -169,9 +186,9 @@ public void stop() { if (running.compareAndSet(true, false)) { LOG.info("Stopping synchronizer"); fastSyncDownloader.ifPresent(FastSyncDownloader::stop); - fullSyncDownloader.stop(); + fullSyncDownloader.ifPresent(FullSyncDownloader::stop); maybePruner.ifPresent(Pruner::stop); - blockPropagationManager.stop(); + blockPropagationManager.ifPresent(BlockPropagationManager::stop); } } @@ -196,12 +213,24 @@ private CompletableFuture handleFastSyncResult(final FastSyncState result) LOG.info( "Fast sync completed successfully with pivot block {}", result.getPivotBlockNumber().getAsLong()); - return startFullSync(); + pivotBlockSelector.close(); + syncState.markInitialSyncPhaseAsDone(); + return terminationCondition.shouldContinueDownload() + ? startFullSync() + : CompletableFuture.completedFuture(null); } private CompletableFuture startFullSync() { maybePruner.ifPresent(Pruner::start); - return fullSyncDownloader.start(); + return fullSyncDownloader + .map(FullSyncDownloader::start) + .orElse(CompletableFuture.completedFuture(null)) + .thenRun( + () -> { + if (terminationCondition.shouldStopDownload()) { + syncState.setReachedTerminalDifficulty(true); + } + }); } @Override @@ -240,7 +269,7 @@ public boolean unsubscribeInSync(final long listenerId) { private Void finalizeSync(final Void unused) { LOG.info("Stopping block propagation."); - blockPropagationManager.stop(); + blockPropagationManager.ifPresent(BlockPropagationManager::stop); LOG.info("Stopping the pruner."); maybePruner.ifPresent(Pruner::stop); running.set(false); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java index f418efed7cd..fa4a0d5ef34 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java @@ -107,7 +107,7 @@ private CompletionStage repeatUnlessDownloadComplete( @SuppressWarnings("unused") final Void result) { syncState.clearSyncTarget(); if (syncTargetManager.shouldContinueDownloading() - && !syncState.hasReachedTerminalDifficulty().orElse(false)) { + && !syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE)) { return performDownload(); } else { LOG.info("PipelineChain download complete"); @@ -149,7 +149,7 @@ private void logDownloadFailure(final String message, final Throwable error) { } private synchronized CompletionStage startDownloadForSyncTarget(final SyncTarget target) { - if (cancelled.get() || syncState.hasReachedTerminalDifficulty().orElse(false)) { + if (cancelled.get() || syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE)) { return CompletableFuture.failedFuture( new CancellationException("Chain download was cancelled")); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PivotBlockSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PivotBlockSelector.java new file mode 100644 index 00000000000..03e7c8dee59 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PivotBlockSelector.java @@ -0,0 +1,33 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; + +import java.util.Optional; + +public interface PivotBlockSelector { + + Optional selectNewPivotBlock(EthPeer peer); + + default void close() { + // do nothing by default + } + + default long getMinRequiredBlockNumber() { + return 0L; + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java index f72556a76b7..29bfa7b2b71 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java @@ -178,8 +178,9 @@ public BlockValidator getBlockValidatorForBlock(final Block block) { return getBlockValidator(block.getHeader().getNumber()); } - public boolean isOnTTD() { - return syncState.hasReachedTerminalDifficulty().orElse(false); + public boolean isReady() { + return syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE) + && syncState.isInitialSyncPhaseDone(); } public CompletableFuture stop() { @@ -191,7 +192,7 @@ public CompletableFuture executeNextStep(final Void unused) { if (firstHash.isPresent()) { return executeSyncStep(firstHash.get()); } - if (!isOnTTD()) { + if (!isReady()) { return waitForTTD().thenCompose(this::executeNextStep); } final Optional firstAncestorHeader = backwardChain.getFirstAncestorHeader(); @@ -234,23 +235,23 @@ protected CompletableFuture waitForTTD() { final long id = syncState.subscribeTTDReached( reached -> { - if (reached) { + if (reached && syncState.isInitialSyncPhaseDone()) { latch.countDown(); } }); return CompletableFuture.runAsync( () -> { try { - if (!isOnTTD()) { - LOG.info("Waiting for TTD..."); + if (!isReady()) { + LOG.info("Waiting for preconditions..."); final boolean await = latch.await(2, TimeUnit.MINUTES); if (await) { - LOG.info("TTD reached..."); + LOG.info("Preconditions meet..."); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new BackwardSyncException("Wait for TTD was interrupted"); + throw new BackwardSyncException("Wait for TTD preconditions interrupted"); } finally { syncState.unsubscribeTTDReached(id); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java index 5b33856a2c0..af02562c6dc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.ethereum.eth.sync.fastsync; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; + import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockImporter; import org.hyperledger.besu.ethereum.core.BlockWithReceipts; @@ -65,6 +67,7 @@ public void accept(final List blocksWithReceipts) { blockWithReceipts.getHeader().getNumber(), blockWithReceipts.getHash()); } + traceLambda(LOG, "Imported block {}", blockWithReceipts.getBlock()::toLogString); } if (logStartBlock.isEmpty()) { logStartBlock = OptionalLong.of(blocksWithReceipts.get(0).getNumber()); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index ff1139d2225..2926fa9070b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -17,16 +17,18 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ProtocolContext; -import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerLimiter; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; +import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.BesuMetricCategory; @@ -35,6 +37,7 @@ import org.hyperledger.besu.util.ExceptionUtils; import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -52,6 +55,7 @@ public class FastSyncActions { private final ProtocolContext protocolContext; private final EthContext ethContext; private final SyncState syncState; + private final PivotBlockSelector pivotBlockSelector; private final MetricsSystem metricsSystem; private final Counter pivotBlockSelectionCounter; private final AtomicLong pivotBlockGauge = new AtomicLong(0); @@ -63,6 +67,7 @@ public FastSyncActions( final ProtocolContext protocolContext, final EthContext ethContext, final SyncState syncState, + final PivotBlockSelector pivotBlockSelector, final MetricsSystem metricsSystem) { this.syncConfig = syncConfig; this.worldStateStorage = worldStateStorage; @@ -70,6 +75,7 @@ public FastSyncActions( this.protocolContext = protocolContext; this.ethContext = ethContext; this.syncState = syncState; + this.pivotBlockSelector = pivotBlockSelector; this.metricsSystem = metricsSystem; pivotBlockSelectionCounter = @@ -125,42 +131,40 @@ private CompletableFuture waitForPeers(final int count) { public CompletableFuture selectPivotBlock(final FastSyncState fastSyncState) { return fastSyncState.hasPivotBlockHeader() ? completedFuture(fastSyncState) - : selectPivotBlockFromPeers(); + : selectNewPivotBlock(); } - private CompletableFuture selectPivotBlockFromPeers() { + private CompletableFuture selectNewPivotBlock() { + + return selectBestPeer() + .map( + bestPeer -> + pivotBlockSelector + .selectNewPivotBlock(bestPeer) + .map(CompletableFuture::completedFuture) + .orElse(null)) + .orElseGet(this::retrySelectPivotBlockAfterDelay); + } + + private Optional selectBestPeer() { return ethContext .getEthPeers() .bestPeerMatchingCriteria(this::canPeerDeterminePivotBlock) // Only select a pivot block number when we have a minimum number of height estimates - .filter( - peer -> { - final long peerCount = countPeersThatCanDeterminePivotBlock(); - final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount(); - if (peerCount < minPeerCount) { - LOG.info( - "Waiting for valid peers with chain height information. {} / {} required peers currently available.", - peerCount, - minPeerCount); - return false; - } - return true; - }) - .map( - peer -> { - final long pivotBlockNumber = - peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance(); - if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) { - // Peer's chain isn't long enough, return an empty value so we can try again. - LOG.info("Waiting for peers with sufficient chain height"); - return null; - } - LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber); - pivotBlockSelectionCounter.inc(); - pivotBlockGauge.set(pivotBlockNumber); - return completedFuture(new FastSyncState(pivotBlockNumber)); - }) - .orElseGet(this::retrySelectPivotBlockAfterDelay); + .filter(unused -> enoughFastSyncPeersArePresent()); + } + + private boolean enoughFastSyncPeersArePresent() { + final long peerCount = countPeersThatCanDeterminePivotBlock(); + final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount(); + if (peerCount < minPeerCount) { + LOG.info( + "Waiting for valid peers with chain height information. {} / {} required peers currently available.", + peerCount, + minPeerCount); + return false; + } + return true; } private long countPeersThatCanDeterminePivotBlock() { @@ -198,22 +202,41 @@ private CompletableFuture limitTrailingPeersAndRetrySelectPivotBl trailingPeerLimiter.enforceTrailingPeerLimit(); return waitForPeers(syncConfig.getFastSyncMinimumPeerCount()) - .thenCompose(ignore -> selectPivotBlockFromPeers()); + .thenCompose(ignore -> selectNewPivotBlock()); } public CompletableFuture downloadPivotBlockHeader( final FastSyncState currentState) { - if (currentState.getPivotBlockHeader().isPresent()) { + return internalDownloadPivotBlockHeader(currentState).thenApply(this::updateStats); + } + + private CompletableFuture internalDownloadPivotBlockHeader( + final FastSyncState currentState) { + if (currentState.hasPivotBlockHeader()) { return completedFuture(currentState); } - return new PivotBlockRetriever( - protocolSchedule, - ethContext, - metricsSystem, - currentState.getPivotBlockNumber().getAsLong(), - syncConfig.getFastSyncMinimumPeerCount(), - syncConfig.getFastSyncPivotDistance()) - .downloadPivotBlockHeader(); + + return currentState + .getPivotBlockHash() + .map(this::downloadPivotBlockHeader) + .orElseGet( + () -> + new PivotBlockRetriever( + protocolSchedule, + ethContext, + metricsSystem, + currentState.getPivotBlockNumber().getAsLong(), + syncConfig.getFastSyncMinimumPeerCount(), + syncConfig.getFastSyncPivotDistance()) + .downloadPivotBlockHeader()); + } + + private FastSyncState updateStats(final FastSyncState fastSyncState) { + pivotBlockSelectionCounter.inc(); + fastSyncState + .getPivotBlockHeader() + .ifPresent(blockHeader -> pivotBlockGauge.set(blockHeader.getNumber())); + return fastSyncState; } public ChainDownloader createChainDownloader(final FastSyncState currentState) { @@ -227,4 +250,21 @@ public ChainDownloader createChainDownloader(final FastSyncState currentState) { metricsSystem, currentState); } + + private CompletableFuture downloadPivotBlockHeader(final Hash hash) { + return RetryingGetHeaderFromPeerByHashTask.byHash( + protocolSchedule, + ethContext, + hash, + pivotBlockSelector.getMinRequiredBlockNumber(), + metricsSystem) + .getHeader() + .thenApply( + blockHeader -> { + LOG.info( + "Successfully downloaded pivot block header by hash: {}", + blockHeader.toLogString()); + return new FastSyncState(blockHeader); + }); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncState.java index 688c6fb7386..048861d7e6c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncState.java @@ -14,6 +14,7 @@ */ package org.hyperledger.besu.ethereum.eth.sync.fastsync; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import java.util.Objects; @@ -22,28 +23,39 @@ public class FastSyncState { - public static FastSyncState EMPTY_SYNC_STATE = - new FastSyncState(OptionalLong.empty(), Optional.empty()); + public static final FastSyncState EMPTY_SYNC_STATE = new FastSyncState(); private OptionalLong pivotBlockNumber; + private Optional pivotBlockHash; private Optional pivotBlockHeader; public FastSyncState() { pivotBlockNumber = OptionalLong.empty(); + pivotBlockHash = Optional.empty(); pivotBlockHeader = Optional.empty(); } public FastSyncState(final long pivotBlockNumber) { - this(OptionalLong.of(pivotBlockNumber), Optional.empty()); + this(OptionalLong.of(pivotBlockNumber), Optional.empty(), Optional.empty()); + } + + public FastSyncState(final Hash pivotBlockHash) { + this(OptionalLong.empty(), Optional.of(pivotBlockHash), Optional.empty()); } public FastSyncState(final BlockHeader pivotBlockHeader) { - this(OptionalLong.of(pivotBlockHeader.getNumber()), Optional.of(pivotBlockHeader)); + this( + OptionalLong.of(pivotBlockHeader.getNumber()), + Optional.of(pivotBlockHeader.getHash()), + Optional.of(pivotBlockHeader)); } protected FastSyncState( - final OptionalLong pivotBlockNumber, final Optional pivotBlockHeader) { + final OptionalLong pivotBlockNumber, + final Optional pivotBlockHash, + final Optional pivotBlockHeader) { this.pivotBlockNumber = pivotBlockNumber; + this.pivotBlockHash = pivotBlockHash; this.pivotBlockHeader = pivotBlockHeader; } @@ -51,6 +63,10 @@ public OptionalLong getPivotBlockNumber() { return pivotBlockNumber; } + public Optional getPivotBlockHash() { + return pivotBlockHash; + } + public Optional getPivotBlockHeader() { return pivotBlockHeader; } @@ -61,6 +77,7 @@ public boolean hasPivotBlockHeader() { public void setCurrentHeader(final BlockHeader header) { pivotBlockNumber = OptionalLong.of(header.getNumber()); + pivotBlockHash = Optional.of(header.getHash()); pivotBlockHeader = Optional.of(header); } @@ -70,12 +87,13 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; FastSyncState that = (FastSyncState) o; return Objects.equals(pivotBlockNumber, that.pivotBlockNumber) + && Objects.equals(pivotBlockHash, that.pivotBlockHash) && Objects.equals(pivotBlockHeader, that.pivotBlockHeader); } @Override public int hashCode() { - return Objects.hash(pivotBlockNumber, pivotBlockHeader); + return Objects.hash(pivotBlockNumber, pivotBlockHash, pivotBlockHeader); } @Override @@ -83,6 +101,8 @@ public String toString() { return "FastSyncState{" + "pivotBlockNumber=" + pivotBlockNumber + + "pivotBlockHash=" + + pivotBlockHash + ", pivotBlockHeader=" + pivotBlockHeader + '}'; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromFinalizedBlock.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromFinalizedBlock.java new file mode 100644 index 00000000000..a64e5e53cf5 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromFinalizedBlock.java @@ -0,0 +1,69 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fastsync; + +import org.hyperledger.besu.config.GenesisConfigOptions; +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; + +import java.util.Optional; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PivotSelectorFromFinalizedBlock implements PivotBlockSelector { + + private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromFinalizedBlock.class); + + private final GenesisConfigOptions genesisConfig; + private final Supplier> finalizedBlockHashSupplier; + private final Runnable cleanupAction; + + public PivotSelectorFromFinalizedBlock( + final GenesisConfigOptions genesisConfig, + final Supplier> finalizedBlockHashSupplier, + final Runnable cleanupAction) { + this.genesisConfig = genesisConfig; + this.finalizedBlockHashSupplier = finalizedBlockHashSupplier; + this.cleanupAction = cleanupAction; + } + + @Override + public Optional selectNewPivotBlock(final EthPeer peer) { + final Optional maybeHash = finalizedBlockHashSupplier.get(); + if (maybeHash.isPresent()) { + return Optional.of(selectLastFinalizedBlockAsPivot(maybeHash.get())); + } + LOG.info("No finalized block hash announced yet"); + return Optional.empty(); + } + + private FastSyncState selectLastFinalizedBlockAsPivot(final Hash finalizedHash) { + LOG.info("Returning finalized block hash as pivot: {}", finalizedHash); + return new FastSyncState(finalizedHash); + } + + @Override + public void close() { + cleanupAction.run(); + } + + @Override + public long getMinRequiredBlockNumber() { + return genesisConfig.getTerminalBlockNumber().orElse(0L); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java new file mode 100644 index 00000000000..313d4609be0 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java @@ -0,0 +1,53 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fastsync; + +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; + +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PivotSelectorFromPeers implements PivotBlockSelector { + + private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromPeers.class); + + private final SynchronizerConfiguration syncConfig; + + public PivotSelectorFromPeers(final SynchronizerConfiguration syncConfig) { + this.syncConfig = syncConfig; + } + + @Override + public Optional selectNewPivotBlock(final EthPeer peer) { + return fromBestPeer(peer); + } + + private Optional fromBestPeer(final EthPeer peer) { + final long pivotBlockNumber = + peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance(); + if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) { + // Peer's chain isn't long enough, return an empty value so we can try again. + LOG.info("Waiting for peers with sufficient chain height"); + return Optional.empty(); + } + LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber); + return Optional.of(new FastSyncState(pivotBlockNumber)); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/TransitionPivotSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/TransitionPivotSelector.java new file mode 100644 index 00000000000..fed8a64f8a5 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/TransitionPivotSelector.java @@ -0,0 +1,95 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fastsync; + +import org.hyperledger.besu.config.GenesisConfigOptions; +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.Difficulty; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; + +import java.util.Optional; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TransitionPivotSelector implements PivotBlockSelector { + + private static final Logger LOG = LoggerFactory.getLogger(TransitionPivotSelector.class); + + private final Difficulty totalTerminalDifficulty; + private final Supplier> finalizedBlockHashSupplier; + private final PivotBlockSelector pivotSelectorFromPeers; + private final PivotBlockSelector pivotSelectorFromFinalizedBlock; + + public TransitionPivotSelector( + final GenesisConfigOptions genesisConfig, + final Supplier> finalizedBlockHashSupplier, + final PivotBlockSelector pivotSelectorFromPeers, + final PivotBlockSelector pivotSelectorFromFinalizedBlock) { + this.totalTerminalDifficulty = + genesisConfig + .getTerminalTotalDifficulty() + .map(Difficulty::of) + .orElseThrow( + () -> + new IllegalArgumentException( + "This class can only be used when TTD is present")); + this.finalizedBlockHashSupplier = finalizedBlockHashSupplier; + this.pivotSelectorFromPeers = pivotSelectorFromPeers; + this.pivotSelectorFromFinalizedBlock = pivotSelectorFromFinalizedBlock; + } + + @Override + public Optional selectNewPivotBlock(final EthPeer peer) { + return routeDependingOnTotalTerminalDifficulty(peer); + } + + private Optional routeDependingOnTotalTerminalDifficulty(final EthPeer peer) { + + Difficulty bestPeerEstDifficulty = peer.chainState().getEstimatedTotalDifficulty(); + + if (finalizedBlockHashSupplier.get().isPresent()) { + LOG.info("A finalized block is present, use it as pivot"); + return pivotSelectorFromFinalizedBlock.selectNewPivotBlock(peer); + } + + if (bestPeerEstDifficulty.greaterOrEqualThan(totalTerminalDifficulty)) { + LOG.info( + "Chain has reached TTD, best peer has estimated difficulty {}," + + " select pivot from finalized block", + bestPeerEstDifficulty); + return pivotSelectorFromFinalizedBlock.selectNewPivotBlock(peer); + } + + LOG.info( + "Chain has not yet reached TTD, best peer has estimated difficulty {}," + + " select pivot from peers", + bestPeerEstDifficulty); + return pivotSelectorFromPeers.selectNewPivotBlock(peer); + } + + @Override + public void close() { + pivotSelectorFromFinalizedBlock.close(); + pivotSelectorFromPeers.close(); + } + + @Override + public long getMinRequiredBlockNumber() { + return pivotSelectorFromFinalizedBlock.getMinRequiredBlockNumber(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStep.java index fa78932bbe8..5536a50c8ef 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStep.java @@ -17,11 +17,12 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; import org.hyperledger.besu.metrics.BesuMetricCategory; -import org.hyperledger.besu.metrics.RunnableCounter; +import org.hyperledger.besu.metrics.RunnableTimedCounter; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; import org.hyperledger.besu.services.tasks.Task; +import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; import org.slf4j.Logger; @@ -29,24 +30,23 @@ public class CompleteTaskStep { private static final Logger LOG = LoggerFactory.getLogger(CompleteTaskStep.class); - private static final int DISPLAY_PROGRESS_STEP = 100000; - private final RunnableCounter completedRequestsCounter; + private final RunnableTimedCounter completedRequestsCounter; private final Counter retriedRequestsCounter; private final LongSupplier worldStatePendingRequestsCurrentSupplier; - private long lastLogAt = System.currentTimeMillis(); public CompleteTaskStep( final MetricsSystem metricsSystem, final LongSupplier worldStatePendingRequestsCurrentSupplier) { this.worldStatePendingRequestsCurrentSupplier = worldStatePendingRequestsCurrentSupplier; completedRequestsCounter = - new RunnableCounter( + new RunnableTimedCounter( metricsSystem.createCounter( BesuMetricCategory.SYNCHRONIZER, "world_state_completed_requests_total", "Total number of node data requests completed as part of fast sync world state download"), this::displayWorldStateSyncProgress, - DISPLAY_PROGRESS_STEP); + 1, + TimeUnit.MINUTES); retriedRequestsCounter = metricsSystem.createCounter( BesuMetricCategory.SYNCHRONIZER, @@ -72,14 +72,10 @@ public void markAsCompleteOrFailed( } private void displayWorldStateSyncProgress() { - final long now = System.currentTimeMillis(); - if (now - lastLogAt > 10 * 1000L) { - LOG.info( - "Downloaded {} world state nodes. At least {} nodes remaining.", - getCompletedRequests(), - worldStatePendingRequestsCurrentSupplier.getAsLong()); - lastLogAt = now; - } + LOG.info( + "Downloaded {} world state nodes. At least {} nodes remaining.", + getCompletedRequests(), + worldStatePendingRequestsCurrentSupplier.getAsLong()); } long getCompletedRequests() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index 2c2177fa1fa..38a0c868eb4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; @@ -51,6 +52,7 @@ public class FastDownloaderFactory { private static final Logger LOG = LoggerFactory.getLogger(FastDownloaderFactory.class); public static Optional> create( + final PivotBlockSelector pivotBlockSelector, final SynchronizerConfiguration syncConfig, final Path dataDirectory, final ProtocolSchedule protocolSchedule, @@ -118,6 +120,7 @@ public static Optional> create( protocolContext, ethContext, syncState, + pivotBlockSelector, metricsSystem), worldStateStorage, worldStateDownloader, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java index 15e55bef774..4484c194460 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java @@ -35,7 +35,6 @@ public class FullSyncDownloader { private final SynchronizerConfiguration syncConfig; private final ProtocolContext protocolContext; private final SyncState syncState; - private final SyncTerminationCondition terminationCondition; public FullSyncDownloader( final SynchronizerConfiguration syncConfig, @@ -48,7 +47,6 @@ public FullSyncDownloader( this.syncConfig = syncConfig; this.protocolContext = protocolContext; this.syncState = syncState; - this.terminationCondition = terminationCondition; this.chainDownloader = FullSyncChainDownloader.create( @@ -63,15 +61,7 @@ public FullSyncDownloader( public CompletableFuture start() { LOG.info("Starting full sync."); - return chainDownloader - .start() - .thenApply( - unused -> { - if (terminationCondition.shouldStopDownload()) { - syncState.setReachedTerminalDifficulty(true); - } - return null; - }); + return chainDownloader.start(); } public void stop() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java index 03c09c2b2af..86b594bf2fd 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -17,6 +17,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; @@ -45,6 +46,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory { private static final Logger LOG = LoggerFactory.getLogger(SnapDownloaderFactory.class); public static Optional> createSnapDownloader( + final PivotBlockSelector pivotBlockSelector, final SynchronizerConfiguration syncConfig, final Path dataDirectory, final ProtocolSchedule protocolSchedule, @@ -108,6 +110,7 @@ public static Optional> createSnapDownloader( protocolContext, ethContext, syncState, + pivotBlockSelector, metricsSystem), worldStateStorage, snapWorldStateDownloader, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncState.java index c150544e555..4e90c6fdd57 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncState.java @@ -23,7 +23,10 @@ public class SnapSyncState extends FastSyncState { private boolean isHealInProgress; public SnapSyncState(final FastSyncState fastSyncState) { - super(fastSyncState.getPivotBlockNumber(), fastSyncState.getPivotBlockHeader()); + super( + fastSyncState.getPivotBlockNumber(), + fastSyncState.getPivotBlockHash(), + fastSyncState.getPivotBlockHeader()); } public boolean isHealInProgress() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java index b9a1b65d581..dadd7e76e63 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java @@ -49,10 +49,18 @@ public class SyncState { private Optional worldStateDownloadStatus = Optional.empty(); private Optional newPeerListenerId; private Optional reachedTerminalDifficulty = Optional.empty(); + private volatile boolean isInitialSyncPhaseDone; public SyncState(final Blockchain blockchain, final EthPeers ethPeers) { + this(blockchain, ethPeers, false); + } + + public SyncState( + final Blockchain blockchain, final EthPeers ethPeers, final boolean hasInitialSyncPhase) { this.blockchain = blockchain; this.ethPeers = ethPeers; + isInitialSyncPhaseDone = !hasInitialSyncPhase; + blockchain.observeBlockAdded( event -> { if (event.isNewCanonicalHead()) { @@ -157,7 +165,10 @@ public void setReachedTerminalDifficulty(final boolean stoppedAtTerminalDifficul } public Optional hasReachedTerminalDifficulty() { - return reachedTerminalDifficulty; + if (isInitialSyncPhaseDone) { + return reachedTerminalDifficulty; + } + return Optional.of(Boolean.FALSE); } private boolean isInSync( @@ -165,7 +176,8 @@ private boolean isInSync( final Optional syncTargetChain, final Optional bestPeerChain, final long syncTolerance) { - return reachedTerminalDifficulty.orElse(true) + return isInitialSyncPhaseDone + && reachedTerminalDifficulty.orElse(true) // Sync target may be temporarily empty while we switch sync targets during a sync, so // check both the sync target and our best peer to determine if we're in sync or not && isInSync(localChain, syncTargetChain, syncTolerance) @@ -277,4 +289,12 @@ private synchronized void checkInSync() { .forEach( (syncTracker) -> syncTracker.checkState(localChain, syncTargetChain, bestPeerChain)); } + + public void markInitialSyncPhaseAsDone() { + isInitialSyncPhaseDone = true; + } + + public boolean isInitialSyncPhaseDone() { + return isInitialSyncPhaseDone; + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java new file mode 100644 index 00000000000..3a88827539a --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java @@ -0,0 +1,90 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.tasks; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractGetHeadersFromPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.plugin.services.MetricsSystem; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import com.google.common.annotations.VisibleForTesting; + +public class RetryingGetHeaderFromPeerByHashTask + extends AbstractRetryingPeerTask> { + + private final Hash referenceHash; + private final ProtocolSchedule protocolSchedule; + private final long minimumRequiredBlockNumber; + + @VisibleForTesting + RetryingGetHeaderFromPeerByHashTask( + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final Hash referenceHash, + final long minimumRequiredBlockNumber, + final MetricsSystem metricsSystem) { + super(ethContext, 3, List::isEmpty, metricsSystem); + this.protocolSchedule = protocolSchedule; + this.minimumRequiredBlockNumber = minimumRequiredBlockNumber; + checkNotNull(referenceHash); + this.referenceHash = referenceHash; + } + + public static RetryingGetHeaderFromPeerByHashTask byHash( + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final Hash referenceHash, + final long minimumRequiredBlockNumber, + final MetricsSystem metricsSystem) { + return new RetryingGetHeaderFromPeerByHashTask( + protocolSchedule, ethContext, referenceHash, minimumRequiredBlockNumber, metricsSystem); + } + + @Override + protected CompletableFuture> executePeerTask( + final Optional assignedPeer) { + final AbstractGetHeadersFromPeerTask task = + GetHeadersFromPeerByHashTask.forSingleHash( + protocolSchedule, + getEthContext(), + referenceHash, + minimumRequiredBlockNumber, + getMetricsSystem()); + assignedPeer.ifPresent(task::assignPeer); + return executeSubTask(task::run) + .thenApply( + peerResult -> { + if (!peerResult.getResult().isEmpty()) { + result.complete(peerResult.getResult()); + } + return peerResult.getResult(); + }); + } + + public CompletableFuture getHeader() { + return run().thenApply(singletonList -> singletonList.get(0)); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java index f0b5abd8eed..af005937d2b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java @@ -144,7 +144,7 @@ public void setup() { ethContext, syncState, backwardChain)); - doReturn(true).when(context).isOnTTD(); + doReturn(true).when(context).isReady(); doReturn(2).when(context).getBatchSize(); } @@ -222,7 +222,8 @@ private Block getBlockByNumber(final int number) { @Test public void shouldWaitWhenTTDNotReached() throws ExecutionException, InterruptedException, TimeoutException { - doReturn(false).when(context).isOnTTD(); + doReturn(false).when(context).isReady(); + when(syncState.isInitialSyncPhaseDone()).thenReturn(Boolean.TRUE); when(syncState.subscribeTTDReached(any())).thenReturn(88L); final CompletableFuture voidCompletableFuture = context.waitForTTD(); @@ -241,7 +242,7 @@ public void shouldWaitWhenTTDNotReached() @Test public void shouldNotWaitWhenTTDReached() throws ExecutionException, InterruptedException, TimeoutException { - doReturn(true).when(context).isOnTTD(); + doReturn(true).when(context).isReady(); when(syncState.subscribeTTDReached(any())).thenReturn(88L); final CompletableFuture voidCompletableFuture = context.waitForTTD(); voidCompletableFuture.get(1, TimeUnit.SECONDS); @@ -261,7 +262,7 @@ public void shouldStartForwardStepWhenOnLocalHeight() { } @Test - public void shouldFinishWhenWorkIsDonw() { + public void shouldFinishWhenWorkIsDone() { final CompletableFuture completableFuture = context.executeNextStep(null); assertThat(completableFuture.isDone()).isTrue(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java index 76092c361f5..a3bc035f9b1 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java @@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloaderFactory; @@ -58,14 +59,16 @@ public class FastDownloaderFactoryTest { @Mock private SyncState syncState; @Mock private Clock clock; @Mock private Path dataDirectory; + @Mock private PivotBlockSelector pivotBlockSelector; @SuppressWarnings("unchecked") @Test(expected = IllegalStateException.class) - public void shouldThrowIfSyncModeChangedWhileFastSyncIncomplete() throws NoSuchFieldException { + public void shouldThrowIfSyncModeChangedWhileFastSyncIncomplete() { initDataDirectory(true); when(syncConfig.getSyncMode()).thenReturn(SyncMode.FULL); FastDownloaderFactory.create( + pivotBlockSelector, syncConfig, dataDirectory, protocolSchedule, @@ -79,12 +82,13 @@ public void shouldThrowIfSyncModeChangedWhileFastSyncIncomplete() throws NoSuchF @SuppressWarnings({"unchecked", "rawtypes"}) @Test - public void shouldNotThrowIfSyncModeChangedWhileFastSyncComplete() throws NoSuchFieldException { + public void shouldNotThrowIfSyncModeChangedWhileFastSyncComplete() { initDataDirectory(false); when(syncConfig.getSyncMode()).thenReturn(SyncMode.FULL); final Optional result = FastDownloaderFactory.create( + pivotBlockSelector, syncConfig, dataDirectory, protocolSchedule, @@ -108,6 +112,7 @@ public void shouldNotThrowWhenFastSyncModeRequested() throws NoSuchFieldExceptio when(syncConfig.getSyncMode()).thenReturn(SyncMode.FAST); FastDownloaderFactory.create( + pivotBlockSelector, syncConfig, dataDirectory, protocolSchedule, @@ -137,6 +142,7 @@ public void shouldClearWorldStateDuringFastSyncWhenStateQueDirectoryExists() thr assertThat(Files.exists(stateQueueDir)).isTrue(); FastDownloaderFactory.create( + pivotBlockSelector, syncConfig, dataDirectory, protocolSchedule, @@ -168,6 +174,7 @@ public void shouldCrashWhenStateQueueIsNotDirectory() throws IOException { Assertions.assertThatThrownBy( () -> FastDownloaderFactory.create( + pivotBlockSelector, syncConfig, dataDirectory, protocolSchedule, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index 99ddee822e0..0937e19e1a8 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -19,6 +19,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.hyperledger.besu.config.GenesisConfigOptions; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -33,6 +35,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -45,6 +48,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -92,8 +96,7 @@ public void setUp() { blockchainSetupUtil.getWorldArchive(), blockchainSetupUtil.getTransactionPool(), EthProtocolConfiguration.defaultConfig()); - fastSyncActions = createFastSyncActions(syncConfig); - when(worldStateStorage.isWorldStateAvailable(any(), any())).thenReturn(true); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); } @Test @@ -136,7 +139,7 @@ public void selectPivotBlockShouldSelectBlockPivotDistanceFromBestPeer() { final int minPeers = 1; syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); syncConfig = syncConfigBuilder.build(); - fastSyncActions = createFastSyncActions(syncConfig); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000); @@ -151,7 +154,7 @@ public void selectPivotBlockShouldConsiderTotalDifficultyWhenSelectingBestPeer() final int minPeers = 1; syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); syncConfig = syncConfigBuilder.build(); - fastSyncActions = createFastSyncActions(syncConfig); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(1000), 5500); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(2000), 4000); @@ -168,7 +171,7 @@ public void selectPivotBlockShouldWaitAndRetryUntilMinHeightEstimatesAreAvailabl final int minPeers = 2; syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); syncConfig = syncConfigBuilder.build(); - fastSyncActions = createFastSyncActions(syncConfig); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); final CompletableFuture result = fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE); @@ -193,7 +196,7 @@ public void selectPivotBlockShouldWaitAndRetryIfSufficientChainHeightEstimatesAr final int minPeers = 3; syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); syncConfig = syncConfigBuilder.build(); - fastSyncActions = createFastSyncActions(syncConfig); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L; EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); @@ -238,7 +241,7 @@ public void selectPivotBlockShouldWaitAndRetryIfSufficientValidatedPeersUnavaila final PeerValidator validator = mock(PeerValidator.class); syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); syncConfig = syncConfigBuilder.build(); - fastSyncActions = createFastSyncActions(syncConfig); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L; EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); @@ -299,7 +302,7 @@ private void selectPivotBlockUsesBestPeerMatchingRequiredCriteria( final int peerCount = minPeers + 1; syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); syncConfig = syncConfigBuilder.build(); - fastSyncActions = createFastSyncActions(syncConfig); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L; EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); @@ -345,7 +348,7 @@ public void selectPivotBlockShouldWaitAndRetryIfBestPeerChainIsShorterThanPivotD final int minPeers = 1; syncConfigBuilder.fastSyncMinimumPeerCount(minPeers); syncConfig = syncConfigBuilder.build(); - fastSyncActions = createFastSyncActions(syncConfig); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); final long pivotDistance = syncConfig.getFastSyncPivotDistance(); EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); @@ -396,7 +399,7 @@ public void downloadPivotBlockHeaderShouldUseExistingPivotBlockHeaderIfPresent() @Test public void downloadPivotBlockHeaderShouldRetrievePivotBlockHeader() { syncConfig = SynchronizerConfiguration.builder().fastSyncMinimumPeerCount(1).build(); - fastSyncActions = createFastSyncActions(syncConfig); + fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig)); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001); final CompletableFuture result = @@ -409,7 +412,32 @@ public void downloadPivotBlockHeaderShouldRetrievePivotBlockHeader() { assertThat(result).isCompletedWithValue(new FastSyncState(blockchain.getBlockHeader(1).get())); } - private FastSyncActions createFastSyncActions(final SynchronizerConfiguration syncConfig) { + @Test + public void downloadPivotBlockHeaderShouldRetrievePivotBlockHash() { + syncConfig = SynchronizerConfiguration.builder().fastSyncMinimumPeerCount(1).build(); + GenesisConfigOptions genesisConfig = mock(GenesisConfigOptions.class); + when(genesisConfig.getTerminalBlockNumber()).thenReturn(OptionalLong.of(10L)); + + final Optional finalizedHash = blockchain.getBlockHashByNumber(2L); + + fastSyncActions = + createFastSyncActions( + syncConfig, + new PivotSelectorFromFinalizedBlock(genesisConfig, () -> finalizedHash, () -> {})); + + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001); + final CompletableFuture result = + fastSyncActions.downloadPivotBlockHeader(new FastSyncState(finalizedHash.get())); + assertThat(result).isNotCompleted(); + + final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(blockchain); + peer.respond(responder); + + assertThat(result).isCompletedWithValue(new FastSyncState(blockchain.getBlockHeader(2).get())); + } + + private FastSyncActions createFastSyncActions( + final SynchronizerConfiguration syncConfig, final PivotBlockSelector pivotBlockSelector) { final ProtocolSchedule protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); final ProtocolContext protocolContext = blockchainSetupUtil.getProtocolContext(); final EthContext ethContext = ethProtocolManager.ethContext(); @@ -419,7 +447,8 @@ private FastSyncActions createFastSyncActions(final SynchronizerConfiguration sy protocolSchedule, protocolContext, ethContext, - new SyncState(blockchain, ethContext.getEthPeers()), + new SyncState(blockchain, ethContext.getEthPeers(), true), + pivotBlockSelector, new NoOpMetricsSystem()); } } diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/RunnableTimedCounter.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/RunnableTimedCounter.java new file mode 100644 index 00000000000..dea94250d65 --- /dev/null +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/RunnableTimedCounter.java @@ -0,0 +1,69 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.metrics; + +import org.hyperledger.besu.plugin.services.metrics.Counter; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** Counter that triggers a specific task if the specified interval has elapsed. */ +public class RunnableTimedCounter implements Counter { + + private final Counter backedCounter; + private final Runnable task; + private final long intervalMillis; + private final AtomicLong stepCounter; + private volatile long nextExecutionAtMillis; + + public RunnableTimedCounter( + final Counter backedCounter, final Runnable task, final long interval, final TimeUnit unit) { + this.backedCounter = backedCounter; + this.task = task; + this.stepCounter = new AtomicLong(0); + this.intervalMillis = unit.toMillis(interval); + this.nextExecutionAtMillis = System.currentTimeMillis() + intervalMillis; + } + + /** + * Increments the stepCounter by 1 + * + *

{@link #inc(long) inc} method + */ + @Override + public void inc() { + this.inc(1); + } + + /** + * Increments the stepCounter by amount. Triggers the runnable if interval has elapsed + * + * @param amount the value to add to the stepCounter. + */ + @Override + public void inc(final long amount) { + backedCounter.inc(amount); + stepCounter.addAndGet(amount); + final long now = System.currentTimeMillis(); + if (nextExecutionAtMillis < now) { + task.run(); + nextExecutionAtMillis = now + intervalMillis; + } + } + + public long get() { + return stepCounter.get(); + } +} diff --git a/metrics/core/src/test/java/org/hyperledger/besu/metrics/RunnableTimedCounterTest.java b/metrics/core/src/test/java/org/hyperledger/besu/metrics/RunnableTimedCounterTest.java new file mode 100644 index 00000000000..18c61a7c462 --- /dev/null +++ b/metrics/core/src/test/java/org/hyperledger/besu/metrics/RunnableTimedCounterTest.java @@ -0,0 +1,61 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.metrics; + +import static org.assertj.core.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import org.hyperledger.besu.plugin.services.metrics.Counter; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class RunnableTimedCounterTest { + + @Mock Counter backedCounter; + + @Test + public void shouldNotRunTaskIfIntervalNotElapsed() { + + RunnableTimedCounter rtc = + new RunnableTimedCounter( + backedCounter, () -> fail("Must not be called"), 1L, TimeUnit.MINUTES); + + rtc.inc(); + verify(backedCounter).inc(1L); + } + + @Test + public void shouldRunTaskIfIntervalElapsed() throws InterruptedException { + + Runnable task = mock(Runnable.class); + + RunnableTimedCounter rtc = + new RunnableTimedCounter(backedCounter, task, 1L, TimeUnit.MICROSECONDS); + + Thread.sleep(1L); + + rtc.inc(); + + verify(backedCounter).inc(1L); + verify(task).run(); + } +} From c0dafd6767a197d16dfae3b341167850a9caf7cd Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Tue, 26 Apr 2022 08:50:58 +1000 Subject: [PATCH 8/8] Log full peers list on connect/disconnect (#3745) * added logging of EthPeers * RlpxAgent logging of peers Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthPeer.java | 34 ++++++++++++++++-- .../besu/ethereum/eth/manager/EthPeers.java | 10 ++++-- .../eth/manager/EthProtocolManager.java | 4 ++- .../ethereum/eth/manager/PeerReputation.java | 23 +++++++++++- .../ethereum/eth/messages/StatusMessage.java | 23 ++++++++++++ .../ethereum/eth/manager/EthPeerTest.java | 35 +++++++++++++++++++ .../ethereum/eth/manager/EthPeersTest.java | 13 +++++++ .../eth/messages/StatusMessageTest.java | 19 ++++++++++ .../besu/ethereum/p2p/rlpx/RlpxAgent.java | 17 ++++++++- .../p2p/rlpx/connections/RlpxConnection.java | 18 ++++++++++ .../besu/ethereum/p2p/rlpx/wire/PeerInfo.java | 8 ++++- .../besu/ethereum/p2p/rlpx/RlpxAgentTest.java | 2 +- 12 files changed, 197 insertions(+), 9 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index 6fed1ff423f..c3a1d43960e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.manager; import static com.google.common.base.Preconditions.checkArgument; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Difficulty; @@ -54,6 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import javax.annotation.Nonnull; import com.google.common.annotations.VisibleForTesting; import org.apache.tuweni.bytes.Bytes; @@ -61,7 +63,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EthPeer { +public class EthPeer implements Comparable { private static final Logger LOG = LoggerFactory.getLogger(EthPeer.class); private static final int MAX_OUTSTANDING_REQUESTS = 5; @@ -395,7 +397,14 @@ public Map timeoutCounts() { return reputation.timeoutCounts(); } + public PeerReputation getReputation() { + return reputation; + } + void handleDisconnect() { + traceLambda( + LOG, "handleDisconnect - peer... {}, {}", this::getShortNodeId, this::getReputation); + requestManagers.forEach( (protocolName, map) -> map.forEach((code, requestManager) -> requestManager.close())); } @@ -522,7 +531,28 @@ public boolean hasSupportForMessage(final int messageCode) { @Override public String toString() { - return String.format("Peer %s...", nodeId().toString().substring(0, 20)); + return String.format( + "Peer %s... %s, validated? %s, disconnected? %s", + getShortNodeId(), reputation, isFullyValidated(), isDisconnected()); + } + + @Nonnull + public String getShortNodeId() { + return nodeId().toString().substring(0, 20); + } + + @Override + public int compareTo(final @Nonnull EthPeer ethPeer) { + int repCompare = this.reputation.compareTo(ethPeer.reputation); + if (repCompare != 0) return repCompare; + + int headStateCompare = + Long.compare( + this.chainHeadState.getBestBlock().getNumber(), + ethPeer.chainHeadState.getBestBlock().getNumber()); + if (headStateCompare != 0) return headStateCompare; + + return getConnection().getPeerInfo().compareTo(ethPeer.getConnection().getPeerInfo()); } @FunctionalInterface diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 9268def8ded..b35c2aab6b8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -204,9 +204,15 @@ public interface ConnectCallback { @Override public String toString() { + if (connections.isEmpty()) { + return "0 EthPeers {}"; + } final String connectionsList = - connections.values().stream().map(EthPeer::toString).collect(Collectors.joining(",")); - return "EthPeers{connections=" + connectionsList + '}'; + connections.values().stream() + .sorted() + .map(EthPeer::toString) + .collect(Collectors.joining(", \n")); + return connections.size() + " EthPeers {\n" + connectionsList + '}'; } private void invokeConnectionCallbacks(final EthPeer peer) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index d90503a14eb..5cd38791543 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -331,6 +331,7 @@ public void handleNewConnection(final PeerConnection connection) { } catch (final PeerNotConnected peerNotConnected) { // Nothing to do. } + LOG.trace("{}", ethPeers); } @Override @@ -345,6 +346,7 @@ public void handleDisconnect( reason, connection.getPeerInfo(), ethPeers.peerCount()); + LOG.trace("{}", ethPeers); } private void handleStatusMessage(final EthPeer peer, final MessageData data) { @@ -374,7 +376,7 @@ private void handleStatusMessage(final EthPeer peer, final MessageData data) { } } catch (final RLPException e) { LOG.debug("Unable to parse status message.", e); - // Parsing errors can happen when clients broadcast network ids outside of the int range, + // Parsing errors can happen when clients broadcast network ids outside the int range, // So just disconnect with "subprotocol" error rather than "breach of protocol". peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerReputation.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerReputation.java index 2955c713eda..8293f0dfa09 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerReputation.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerReputation.java @@ -24,11 +24,12 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PeerReputation { +public class PeerReputation implements Comparable { private static final Logger LOG = LoggerFactory.getLogger(PeerReputation.class); private static final int TIMEOUT_THRESHOLD = 3; private static final int USELESS_RESPONSE_THRESHOLD = 5; @@ -39,12 +40,20 @@ public class PeerReputation { new ConcurrentHashMap<>(); private final Queue uselessResponseTimes = new ConcurrentLinkedQueue<>(); + private static final int DEFAULT_SCORE = 100; + private static final int SMALL_ADJUSTMENT = 1; + private static final int LARGE_ADJUSTMENT = 10; + + private int score = DEFAULT_SCORE; + public Optional recordRequestTimeout(final int requestCode) { final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet(); if (newTimeoutCount >= TIMEOUT_THRESHOLD) { LOG.debug("Disconnection triggered by repeated timeouts"); + score -= LARGE_ADJUSTMENT; return Optional.of(DisconnectReason.TIMEOUT); } else { + score -= SMALL_ADJUSTMENT; return Optional.empty(); } } @@ -67,9 +76,11 @@ public Optional recordUselessResponse(final long timestamp) { uselessResponseTimes.poll(); } if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) { + score -= LARGE_ADJUSTMENT; LOG.debug("Disconnection triggered by exceeding useless response threshold"); return Optional.of(DisconnectReason.USELESS_PEER); } else { + score -= SMALL_ADJUSTMENT; return Optional.empty(); } } @@ -77,4 +88,14 @@ public Optional recordUselessResponse(final long timestamp) { private boolean shouldRemove(final Long timestamp, final long currentTimestamp) { return timestamp != null && timestamp + USELESS_RESPONSE_WINDOW_IN_MILLIS < currentTimestamp; } + + @Override + public String toString() { + return String.format("PeerReputation " + score); + } + + @Override + public int compareTo(final @Nonnull PeerReputation otherReputation) { + return Integer.compare(this.score, otherReputation.score); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessage.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessage.java index 565ed706a0b..d3b7d076445 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessage.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessage.java @@ -215,5 +215,28 @@ public static EthStatus readFrom(final RLPInput in) { return new EthStatus( protocolVersion, networkId, totalDifficulty, bestHash, genesisHash, forkId); } + + @Override + public String toString() { + return "EthStatus{" + + "protocolVersion=" + + protocolVersion + + ", networkId=" + + networkId + + ", totalDifficulty=" + + totalDifficulty + + ", bestHash=" + + bestHash + + ", genesisHash=" + + genesisHash + + ", forkId=" + + forkId + + '}'; + } + } + + @Override + public String toString() { + return status().toString(); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java index 8ed9c715bc4..9ac7948febc 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java @@ -35,6 +35,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.PingMessage; import org.hyperledger.besu.plugin.services.permissioning.NodeMessagePermissioningProvider; import org.hyperledger.besu.testutil.TestClock; @@ -47,11 +48,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.tuweni.bytes.Bytes; import org.junit.Test; public class EthPeerTest { private static final BlockDataGenerator gen = new BlockDataGenerator(); private final TestClock clock = new TestClock(); + private static final Bytes NODE_ID = Bytes.random(32); @Test public void getHeadersStream() throws PeerNotConnected { @@ -342,6 +345,22 @@ public void message_permissioning_any_false_permission_preventsMessageFromSendin verify(peer.getConnection(), times(0)).sendForProtocol(any(), eq(PingMessage.get())); } + @Test + public void compareTo_withSameNodeId() { + final EthPeer peer1 = createPeerWithPeerInfo(NODE_ID); + final EthPeer peer2 = createPeerWithPeerInfo(NODE_ID); + assertThat(peer1.compareTo(peer2)).isEqualTo(0); + assertThat(peer2.compareTo(peer1)).isEqualTo(0); + } + + @Test + public void compareTo_withDifferentNodeId() { + final EthPeer peer1 = createPeerWithPeerInfo(NODE_ID); + final EthPeer peer2 = createPeerWithPeerInfo(Bytes.fromHexString("0x00")); + assertThat(peer1.compareTo(peer2)).isEqualTo(1); + assertThat(peer2.compareTo(peer1)).isEqualTo(-1); + } + private void messageStream( final ResponseStreamSupplier getStream, final MessageData targetMessage, @@ -431,6 +450,22 @@ private EthPeer createPeer(final List peerValidators) { return createPeer(peerValidators, Collections.emptyList()); } + private EthPeer createPeerWithPeerInfo(final Bytes nodeId) { + final PeerConnection peerConnection = mock(PeerConnection.class); + final Consumer onPeerReady = (peer) -> {}; + // Use a non-eth protocol name to ensure that EthPeer with sub-protocols such as Istanbul + // that extend the sub-protocol work correctly + PeerInfo peerInfo = new PeerInfo(1, "clientId", Collections.emptyList(), 30303, nodeId); + when(peerConnection.getPeerInfo()).thenReturn(peerInfo); + return new EthPeer( + peerConnection, + "foo", + onPeerReady, + Collections.emptyList(), + clock, + Collections.emptyList()); + } + private EthPeer createPeer( final List peerValidators, final List permissioningProviders) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java index ddf1edb13de..e696f78d8cb 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java @@ -32,6 +32,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; +import java.util.Collections; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CancellationException; @@ -264,6 +265,18 @@ public void shouldFailRequestWithBusyDisconnectedAssignedPeer() throws Exception assertRequestFailure(pendingRequest, CancellationException.class); } + @Test + public void toString_hasExpectedInfo() { + assertThat(ethPeers.toString()).isEqualTo("0 EthPeers {}"); + + final EthPeer peerA = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(50), 20) + .getEthPeer(); + ethPeers.registerConnection(peerA.getConnection(), Collections.emptyList()); + assertThat(ethPeers.toString()).contains("1 EthPeers {"); + assertThat(ethPeers.toString()).contains(peerA.getShortNodeId()); + } + private void freeUpCapacity(final EthPeer ethPeer) { ethPeers.dispatchMessage(ethPeer, new EthMessage(ethPeer, NodeDataMessage.create(emptyList()))); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessageTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessageTest.java index 44fd1e42ce4..e84c795f4b7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessageTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/StatusMessageTest.java @@ -90,6 +90,25 @@ public void serializeDeserializeWithForkId() { assertThat(copy.forkId()).isEqualTo(forkId); } + @Test + public void toStringHasExpectedInfo() { + final int version = EthProtocol.EthVersion.V64; + final BigInteger networkId = BigInteger.ONE; + final Difficulty td = Difficulty.of(1000L); + final Hash bestHash = randHash(1L); + final Hash genesisHash = randHash(2L); + final ForkId forkId = new ForkId(Bytes.fromHexString("0xa00bc334"), 0L); + + final MessageData msg = + StatusMessage.create(version, networkId, td, bestHash, genesisHash, forkId); + + final StatusMessage copy = new StatusMessage(msg.getData()); + final String copyToString = copy.toString(); + + assertThat(copyToString).contains("bestHash=" + bestHash); + assertThat(copyToString).contains("genesisHash=" + genesisHash); + } + private Hash randHash(final long seed) { final Random random = new Random(seed); final byte[] bytes = new byte[32]; diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java index 1e16e1d692f..5691c63bdba 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java @@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.isNull; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; import org.hyperledger.besu.crypto.NodeKey; import org.hyperledger.besu.crypto.SECPPublicKey; @@ -180,6 +181,14 @@ public void connect(final Stream peerStream) { .forEach(this::connect); } + private String logConnectionsByIdToString() { + final String connectionsList = + connectionsById.values().stream() + .map(RlpxConnection::toString) + .collect(Collectors.joining(",\n")); + return connectionsById.size() + " ConnectionsById {\n" + connectionsList + "}"; + } + public void disconnect(final Bytes peerId, final DisconnectReason reason) { final RlpxConnection connection = connectionsById.remove(peerId); if (connection != null) { @@ -227,7 +236,7 @@ public CompletableFuture connect(final Peer peer) { // Check max peers if (!peerPrivileges.canExceedConnectionLimits(peer) && getConnectionCount() >= maxConnections) { final String errorMsg = - "Max peer peer connections established (" + "Max peer connections established (" + maxConnections + "). Cannot connect to peer: " + peer; @@ -263,6 +272,8 @@ public CompletableFuture connect(final Peer peer) { } }); + traceLambda(LOG, "{}", this::logConnectionsByIdToString); + return connectionFuture.get(); } @@ -276,6 +287,7 @@ private void handleDisconnect( final PeerConnection peerConnection, final DisconnectReason disconnectReason, final boolean initiatedByPeer) { + traceLambda(LOG, "{}", this::logConnectionsByIdToString); cleanUpPeerConnection(peerConnection.getPeer().getId()); } @@ -416,6 +428,7 @@ && getConnectionCount() >= maxConnections) { // Check remote connections again to control for race conditions enforceRemoteConnectionLimits(); enforceConnectionLimits(); + traceLambda(LOG, "{}", this::logConnectionsByIdToString); } private boolean shouldLimitRemoteConnections() { @@ -518,6 +531,7 @@ private int compareDuplicateConnections(final RlpxConnection a, final RlpxConnec final Bytes peerId = a.getPeer().getId(); final Bytes localId = localNode.getPeer().getId(); + // at this point a.Id == b.Id if (a.initiatedRemotely() != b.initiatedRemotely()) { // If we have connections initiated in different directions, keep the connection initiated // by the node with the lower id @@ -528,6 +542,7 @@ private int compareDuplicateConnections(final RlpxConnection a, final RlpxConnec } } // Otherwise, keep older connection + LOG.info("comparing timestamps " + a.getInitiatedAt() + " with " + b.getInitiatedAt()); return Math.toIntExact(a.getInitiatedAt() - b.getInitiatedAt()); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/RlpxConnection.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/RlpxConnection.java index a56e9b8fef6..ae0ba7f4b4a 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/RlpxConnection.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/RlpxConnection.java @@ -143,6 +143,14 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(peerConnection); } + + @Override + public String toString() { + return "RemotelyInitiatedRlpxConnection initiatedAt:" + + getInitiatedAt() + + " to " + + peerConnection.getPeer().getId(); + } } private static class LocallyInitiatedRlpxConnection extends RlpxConnection { @@ -213,6 +221,16 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(peer, future); } + + @Override + public String toString() { + return "LocallyInitiatedRlpxConnection initiatedAt:" + + getInitiatedAt() + + " to " + + getPeer().getId() + + " disconnected? " + + isFailedOrDisconnected(); + } } public static class ConnectionNotEstablishedException extends IllegalStateException { diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/PeerInfo.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/PeerInfo.java index dc9f3268843..df25746837e 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/PeerInfo.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/PeerInfo.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import javax.annotation.Nonnull; import org.apache.tuweni.bytes.Bytes; @@ -36,7 +37,7 @@ * *

The peer info is shared between peers during the HELLO wire protocol handshake. */ -public class PeerInfo { +public class PeerInfo implements Comparable { private final int version; private final String clientId; private final List capabilities; @@ -143,4 +144,9 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(version, clientId, capabilities, port, nodeId); } + + @Override + public int compareTo(final @Nonnull PeerInfo peerInfo) { + return this.nodeId.compareTo(peerInfo.nodeId); + } } diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java index 9872ce5fc60..8adc5cc3505 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java @@ -294,7 +294,7 @@ public void connect_failsWhenMaxPeersConnected() { assertThatThrownBy(connection::get) .hasCauseInstanceOf(IllegalStateException.class) - .hasMessageContaining("Max peer peer connections established (1). Cannot connect to peer"); + .hasMessageContaining("Max peer connections established (1). Cannot connect to peer"); assertPeerConnectionNotTracked(peer); assertThat(agent.getConnectionCount()).isEqualTo(1); }