diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 62a217f7214..972adf07ebb 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -98,6 +98,7 @@ import java.util.OptionalLong; import java.util.function.Supplier; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -359,11 +360,15 @@ public BesuController build() { Optional.of( ImmutableCheckpoint.builder() .blockHash( - Hash.fromHexString(configOptions.getCheckpointOptions().getHash().get())) + Hash.fromHexString( + configOptions.getCheckpointOptions().getHash().get())) // NOSONAR .blockNumber(configOptions.getCheckpointOptions().getNumber().getAsLong()) .totalDifficulty( Difficulty.fromHexString( - configOptions.getCheckpointOptions().getTotalDifficulty().get())) + configOptions + .getCheckpointOptions() + .getTotalDifficulty() + .get())) // NOSONAR .build()); } @@ -403,19 +408,14 @@ public BesuController build() { final PivotBlockSelector pivotBlockSelector = createPivotSelector(protocolContext); final Synchronizer synchronizer = - new DefaultSynchronizer( - syncConfig, + createSynchronizer( protocolSchedule, - protocolContext, worldStateStorage, - ethProtocolManager.getBlockBroadcaster(), + protocolContext, maybePruner, ethContext, syncState, - dataDirectory, - clock, - metricsSystem, - getFullSyncTerminationCondition(protocolContext.getBlockchain()), + ethProtocolManager, pivotBlockSelector); final MiningCoordinator miningCoordinator = @@ -432,7 +432,6 @@ public BesuController build() { final SubProtocolConfiguration subProtocolConfiguration = createSubProtocolConfiguration(ethProtocolManager, maybeSnapProtocolManager); - ; final JsonRpcMethods additionalJsonRpcMethodFactory = createAdditionalJsonRpcMethodFactory(protocolContext); @@ -461,6 +460,32 @@ public BesuController build() { additionalPluginServices); } + @NotNull + protected DefaultSynchronizer createSynchronizer( + final ProtocolSchedule protocolSchedule, + final WorldStateStorage worldStateStorage, + final ProtocolContext protocolContext, + final Optional maybePruner, + final EthContext ethContext, + final SyncState syncState, + final EthProtocolManager ethProtocolManager, + final PivotBlockSelector pivotBlockSelector) { + return new DefaultSynchronizer( + syncConfig, + protocolSchedule, + protocolContext, + worldStateStorage, + ethProtocolManager.getBlockBroadcaster(), + maybePruner, + ethContext, + syncState, + dataDirectory, + clock, + metricsSystem, + getFullSyncTerminationCondition(protocolContext.getBlockchain()), + pivotBlockSelector); + } + private PivotBlockSelector createPivotSelector(final ProtocolContext protocolContext) { final PivotSelectorFromPeers pivotSelectorFromPeers = new PivotSelectorFromPeers(syncConfig); @@ -507,7 +532,7 @@ protected SyncTerminationCondition getFullSyncTerminationCondition(final Blockch protected void prepForBuild() {} protected JsonRpcMethods createAdditionalJsonRpcMethodFactory( - final ProtocolContext protocolContext) { + final ProtocolContext protocolContext) { // NOSONAR return apis -> Collections.emptyMap(); } @@ -517,9 +542,8 @@ protected SubProtocolConfiguration createSubProtocolConfiguration( final SubProtocolConfiguration subProtocolConfiguration = new SubProtocolConfiguration().withSubProtocol(EthProtocol.get(), ethProtocolManager); maybeSnapProtocolManager.ifPresent( - snapProtocolManager -> { - subProtocolConfiguration.withSubProtocol(SnapProtocol.get(), snapProtocolManager); - }); + snapProtocolManager -> + subProtocolConfiguration.withSubProtocol(SnapProtocol.get(), snapProtocolManager)); return subProtocolConfiguration; } diff --git a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java index 3e257cbc30c..7f5d3c6c06c 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.controller; import org.hyperledger.besu.config.GenesisConfigFile; +import org.hyperledger.besu.consensus.merge.MergeContext; import org.hyperledger.besu.consensus.merge.PostMergeContext; import org.hyperledger.besu.consensus.merge.TransitionBackwardSyncContext; import org.hyperledger.besu.consensus.merge.TransitionContext; @@ -31,7 +32,14 @@ import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.core.PrivacyParameters; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthMessages; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; +import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; +import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -40,8 +48,10 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; +import org.hyperledger.besu.ethereum.worldstate.Pruner; import org.hyperledger.besu.ethereum.worldstate.PrunerConfiguration; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.evm.internal.EvmConfiguration; import org.hyperledger.besu.metrics.ObservableMetricsSystem; import org.hyperledger.besu.plugin.services.permissioning.NodeMessagePermissioningProvider; @@ -145,13 +155,79 @@ protected PluginServiceFactory createAdditionalPluginServices( return new NoopPluginServiceFactory(); } + @Override + protected DefaultSynchronizer createSynchronizer( + final ProtocolSchedule protocolSchedule, + final WorldStateStorage worldStateStorage, + final ProtocolContext protocolContext, + final Optional maybePruner, + final EthContext ethContext, + final SyncState syncState, + final EthProtocolManager ethProtocolManager, + final PivotBlockSelector pivotBlockSelector) { + + DefaultSynchronizer sync = + super.createSynchronizer( + protocolSchedule, + worldStateStorage, + protocolContext, + maybePruner, + ethContext, + syncState, + ethProtocolManager, + pivotBlockSelector); + + ConsensusContext cc = protocolContext.getConsensusContext(ConsensusContext.class); + if (cc instanceof MergeContext) { + protocolContext.getConsensusContext(MergeContext.class).addNewForkchoiceMessageListener(sync); + } + return sync; + } + + @Override + protected EthProtocolManager createEthProtocolManager( + final ProtocolContext protocolContext, + final boolean fastSyncEnabled, + final TransactionPool transactionPool, + final EthProtocolConfiguration ethereumWireProtocolConfiguration, + final EthPeers ethPeers, + final EthContext ethContext, + final EthMessages ethMessages, + final EthScheduler scheduler, + final List peerValidators) { + + EthProtocolManager ethProtocolManager = + super.createEthProtocolManager( + protocolContext, + fastSyncEnabled, + transactionPool, + ethereumWireProtocolConfiguration, + ethPeers, + ethContext, + ethMessages, + scheduler, + peerValidators); + + ConsensusContext cc = protocolContext.getConsensusContext(ConsensusContext.class); + if (cc instanceof MergeContext) { + protocolContext + .getConsensusContext(MergeContext.class) + .observeNewIsPostMergeState(ethProtocolManager); + protocolContext + .getConsensusContext(MergeContext.class) + .addNewForkchoiceMessageListener(ethProtocolManager); + } + + return ethProtocolManager; + } + private void initTransitionWatcher( final ProtocolContext protocolContext, final TransitionCoordinator composedCoordinator) { PostMergeContext postMergeContext = protocolContext.getConsensusContext(PostMergeContext.class); postMergeContext.observeNewIsPostMergeState( - newIsPostMergeState -> { - if (newIsPostMergeState) { + (isPoS, difficultyStoppedAt) -> { + if (isPoS) { // if we transitioned to post-merge, stop and disable any mining composedCoordinator.getPreMergeObject().disable(); composedCoordinator.getPreMergeObject().stop(); diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java index 46d9172cabc..03b26ede452 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/FinalizedBlockHashSupplier.java @@ -14,7 +14,6 @@ */ package org.hyperledger.besu.consensus.merge; -import org.hyperledger.besu.consensus.merge.MergeContext.NewForkchoiceMessageListener; import org.hyperledger.besu.datatypes.Hash; import java.util.Optional; diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java index ba6af5b0dd8..e71158b4de2 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java @@ -67,15 +67,4 @@ void fireNewUnverifiedForkchoiceMessageEvent( final Hash headBlockHash, final Optional maybeFinalizedBlockHash, final Hash safeBlockHash); - - interface NewMergeStateCallback { - void onNewIsPostMergeState(final boolean newIsPostMergeState); - } - - interface NewForkchoiceMessageListener { - void onNewForkchoiceMessage( - final Hash headBlockHash, - final Optional maybeFinalizedBlockHash, - final Hash safeBlockHash); - } } diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java index 3babd1bf85e..9036177d587 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java @@ -83,13 +83,13 @@ public PostMergeContext setTerminalTotalDifficulty(final Difficulty newTerminalT } @Override - public void setIsPostMerge(final Difficulty totalDifficulty) { + public void setIsPostMerge(final Difficulty difficultyStoppedAt) { if (isPostMerge.get().orElse(Boolean.FALSE) && lastFinalized.get() != null) { // if we have finalized, we never switch back to a pre-merge once we have transitioned // post-TTD. return; } - final boolean newState = terminalTotalDifficulty.get().lessOrEqualThan(totalDifficulty); + final boolean newState = terminalTotalDifficulty.get().lessOrEqualThan(difficultyStoppedAt); final Optional oldState = isPostMerge.getAndSet(Optional.of(newState)); // if we are past TTD, set it: @@ -99,7 +99,9 @@ public void setIsPostMerge(final Difficulty totalDifficulty) { if (oldState.isEmpty() || oldState.get() != newState) { newMergeStateCallbackSubscribers.forEach( - newMergeStateCallback -> newMergeStateCallback.onNewIsPostMergeState(newState)); + newMergeStateCallback -> + newMergeStateCallback.onCrossingMergeBoundary( + newState, Optional.of(difficultyStoppedAt))); } } diff --git a/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/PostMergeContextTest.java b/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/PostMergeContextTest.java index 4c548efc728..afaab9debff 100644 --- a/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/PostMergeContextTest.java +++ b/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/PostMergeContextTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import org.hyperledger.besu.consensus.merge.MergeContext.NewMergeStateCallback; import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -173,8 +172,9 @@ private static class MergeStateChangeCollector implements NewMergeStateCallback final List stateChanges = new ArrayList<>(); @Override - public void onNewIsPostMergeState(final boolean newIsPostMergeState) { - stateChanges.add(newIsPostMergeState); + public void onCrossingMergeBoundary( + final boolean isPoS, final Optional difficultyStoppedAt) { + stateChanges.add(isPoS); } public void reset() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/consensus/merge/NewForkchoiceMessageListener.java b/ethereum/eth/src/main/java/org/hyperledger/besu/consensus/merge/NewForkchoiceMessageListener.java new file mode 100644 index 00000000000..3ec0e611e50 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/consensus/merge/NewForkchoiceMessageListener.java @@ -0,0 +1,28 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.hyperledger.besu.consensus.merge; + +import org.hyperledger.besu.datatypes.Hash; + +import java.util.Optional; + +public interface NewForkchoiceMessageListener { + void onNewForkchoiceMessage( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash); +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/consensus/merge/NewMergeStateCallback.java b/ethereum/eth/src/main/java/org/hyperledger/besu/consensus/merge/NewMergeStateCallback.java new file mode 100644 index 00000000000..4e1717e7a97 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/consensus/merge/NewMergeStateCallback.java @@ -0,0 +1,26 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.hyperledger.besu.consensus.merge; + +import org.hyperledger.besu.ethereum.core.Difficulty; + +import java.util.Optional; + +public interface NewMergeStateCallback { + + void onCrossingMergeBoundary(final boolean isPoS, final Optional difficultyStoppedAt); +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 5cd38791543..fa16c6ec9a2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -16,6 +16,8 @@ import static com.google.common.base.Preconditions.checkArgument; +import org.hyperledger.besu.consensus.merge.NewForkchoiceMessageListener; +import org.hyperledger.besu.consensus.merge.NewMergeStateCallback; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.MinedBlockObserver; @@ -47,6 +49,8 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.StampedLock; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -54,13 +58,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { +public class EthProtocolManager + implements ProtocolManager, + MinedBlockObserver, + NewMergeStateCallback, + NewForkchoiceMessageListener { private static final Logger LOG = LoggerFactory.getLogger(EthProtocolManager.class); private final EthScheduler scheduler; private final CountDownLatch shutdown; private final AtomicBoolean stopped = new AtomicBoolean(false); - private final Hash genesisHash; private final ForkIdManager forkIdManager; private final BigInteger networkId; @@ -71,6 +78,10 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { private final Blockchain blockchain; private final BlockBroadcaster blockBroadcaster; private final List peerValidators; + private Optional powTerminalDifficulty; + private final StampedLock powTerminalDifficultyLock = new StampedLock(); + private Hash lastFinalized = Hash.ZERO; + private final AtomicLong numFinalizedSeen = new AtomicLong(0); public EthProtocolManager( final Blockchain blockchain, @@ -91,7 +102,7 @@ public EthProtocolManager( this.blockchain = blockchain; this.shutdown = new CountDownLatch(1); - genesisHash = blockchain.getBlockHashByNumber(0L).get(); + this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO); this.forkIdManager = forkIdManager; @@ -237,7 +248,7 @@ public void processMessage(final Capability cap, final Message message) { final EthPeer ethPeer = ethPeers.peer(message.getConnection()); if (ethPeer == null) { LOG.debug( - "Ignoring message received from unknown peer connection: " + message.getConnection()); + "Ignoring message received from unknown peer connection: {}", message.getConnection()); return; } @@ -271,6 +282,11 @@ public void processMessage(final Capability cap, final Message message) { return; } + if (isFinalized() && (code == EthPV62.NEW_BLOCK || code == EthPV62.NEW_BLOCK_HASHES)) { + LOG.debug("disconnecting peer for sending new blocks after transition to PoS"); + ethPeer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); + } + // This will handle responses ethPeers.dispatchMessage(ethPeer, ethMessage, getSupportedProtocol()); @@ -297,7 +313,7 @@ public void processMessage(final Capability cap, final Message message) { responseData -> { try { ethPeer.send(responseData, getSupportedProtocol()); - } catch (final PeerNotConnected __) { + } catch (final PeerNotConnected missingPeerException) { // Peer disconnected before we could respond - nothing to do } }); @@ -369,6 +385,19 @@ private void handleStatusMessage(final EthPeer peer, final MessageData data) { networkId, status.genesisHash()); peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); + } else if (isFinalized()) { + long lockStamp = this.powTerminalDifficultyLock.readLock(); + try { + if (this.powTerminalDifficulty.isPresent() + && status.totalDifficulty().greaterThan(this.powTerminalDifficulty.get())) { + LOG.debug( + "Disconnecting peer with difficulty {}, likely still on PoW chain", + status.totalDifficulty()); + peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); + } + } finally { + this.powTerminalDifficultyLock.unlockRead(lockStamp); + } } else { LOG.debug("Received status message from {}: {}", peer, status); peer.registerStatusReceived( @@ -382,6 +411,10 @@ private void handleStatusMessage(final EthPeer peer, final MessageData data) { } } + private boolean isFinalized() { + return this.numFinalizedSeen.get() > 1; + } + @Override public void blockMined(final Block block) { // This assumes the block has already been included in the chain @@ -401,4 +434,55 @@ public List getForkIdAsBytesList() { ? Collections.emptyList() : chainHeadForkId.getForkIdAsBytesList(); } + + @Override + public void onCrossingMergeBoundary( + final boolean isPoS, final Optional difficultyStoppedAt) { + if (isPoS) { + long lockStamp = this.powTerminalDifficultyLock.writeLock(); + try { + this.powTerminalDifficulty = difficultyStoppedAt; + } finally { + this.powTerminalDifficultyLock.unlockWrite(lockStamp); + } + } + } + + private void disconnectKnownPowPeers() { + long lockStamp = powTerminalDifficultyLock.readLock(); + try { + if (powTerminalDifficulty.isPresent()) { + LOG.info( + "disconnecting peers with total difficulty over {}", + powTerminalDifficulty.get().toBigInteger()); + ethPeers + .streamAllPeers() + .filter( + ethPeer -> + ethPeer + .chainState() + .getBestBlock() + .totalDifficulty + .greaterThan(powTerminalDifficulty.get())) + .forEach(ethPeer -> ethPeer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED)); + } + } finally { + powTerminalDifficultyLock.unlockRead(lockStamp); + } + } + + @Override + public void onNewForkchoiceMessage( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash) { + if (maybeFinalizedBlockHash.isPresent() + && !maybeFinalizedBlockHash.get().equals(this.lastFinalized)) { + this.lastFinalized = maybeFinalizedBlockHash.get(); + this.numFinalizedSeen.getAndIncrement(); + if (isFinalized()) { + disconnectKnownPowPeers(); + } + } + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index dd1e19e2c70..188e959992f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -16,6 +16,8 @@ import static com.google.common.base.Preconditions.checkNotNull; +import org.hyperledger.besu.consensus.merge.NewForkchoiceMessageListener; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.manager.EthContext; @@ -41,11 +43,12 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultSynchronizer implements Synchronizer { +public class DefaultSynchronizer implements Synchronizer, NewForkchoiceMessageListener { private static final Logger LOG = LoggerFactory.getLogger(DefaultSynchronizer.class); @@ -58,6 +61,8 @@ public class DefaultSynchronizer implements Synchronizer { private final ProtocolContext protocolContext; private final PivotBlockSelector pivotBlockSelector; private final SyncTerminationCondition terminationCondition; + private Hash lastFinalized = Hash.ZERO; + private final AtomicLong numFinalizedSeen = new AtomicLong(0); public DefaultSynchronizer( final SynchronizerConfiguration syncConfig, @@ -288,4 +293,21 @@ private Void finalizeSync(final Void unused) { running.set(false); return null; } + + @Override + public void onNewForkchoiceMessage( + final Hash headBlockHash, + final Optional maybeFinalizedBlockHash, + final Hash safeBlockHash) { + if (maybeFinalizedBlockHash.isPresent() + && !maybeFinalizedBlockHash.get().equals(this.lastFinalized)) { + this.lastFinalized = maybeFinalizedBlockHash.get(); + this.numFinalizedSeen.getAndIncrement(); + if (this.numFinalizedSeen.get() > 1 + && blockPropagationManager.isPresent() + && blockPropagationManager.get().isRunning()) { + blockPropagationManager.get().stop(); + } + } + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java index 8d67d051d7a..b8665df67a2 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java @@ -66,6 +66,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; @@ -78,6 +79,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -201,6 +203,101 @@ public void disconnectOnWrongChainId() { } } + @Test + public void disconnectPoWPeersAfterTransition() { + try (final EthProtocolManager ethManager = + EthProtocolManagerTestUtil.create( + blockchain, + () -> false, + protocolContext.getWorldStateArchive(), + transactionPool, + EthProtocolConfiguration.defaultConfig())) { + + final MockPeerConnection workPeer = setupPeer(ethManager, (cap, msg, conn) -> {}); + final MockPeerConnection stakePeer = setupPeer(ethManager, (cap, msg, conn) -> {}); + + final StatusMessage workPeerStatus = + StatusMessage.create( + EthProtocol.EthVersion.V63, + BigInteger.ONE, + blockchain.getChainHead().getTotalDifficulty().add(20), + blockchain.getChainHeadHash(), + blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get().getHash()); + + final StatusMessage stakePeerStatus = + StatusMessage.create( + EthProtocol.EthVersion.V63, + BigInteger.ONE, + blockchain.getChainHead().getTotalDifficulty(), + blockchain.getChainHeadHash(), + blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get().getHash()); + + ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(workPeer, workPeerStatus)); + ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(stakePeer, stakePeerStatus)); + + ethManager.onCrossingMergeBoundary( + true, Optional.of(blockchain.getChainHead().getTotalDifficulty())); + ethManager.onNewForkchoiceMessage( + Hash.EMPTY, Optional.of(Hash.hash(Bytes.of(1))), Hash.EMPTY); + ethManager.onNewForkchoiceMessage( + Hash.EMPTY, Optional.of(Hash.hash(Bytes.of(2))), Hash.EMPTY); + assertThat(workPeer.isDisconnected()).isTrue(); + assertThat(workPeer.getDisconnectReason()).isPresent(); + assertThat(workPeer.getDisconnectReason()) + .get() + .isEqualTo(DisconnectReason.SUBPROTOCOL_TRIGGERED); + assertThat(stakePeer.isDisconnected()).isFalse(); + } + } + + @Test + public void disconnectNewPoWPeers() { + try (final EthProtocolManager ethManager = + EthProtocolManagerTestUtil.create( + blockchain, + () -> false, + protocolContext.getWorldStateArchive(), + transactionPool, + EthProtocolConfiguration.defaultConfig())) { + + final MockPeerConnection workPeer = setupPeer(ethManager, (cap, msg, conn) -> {}); + final MockPeerConnection stakePeer = setupPeer(ethManager, (cap, msg, conn) -> {}); + + final StatusMessage workPeerStatus = + StatusMessage.create( + EthProtocol.EthVersion.V63, + BigInteger.ONE, + blockchain.getChainHead().getTotalDifficulty().add(20), + blockchain.getChainHeadHash(), + blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get().getHash()); + + final StatusMessage stakePeerStatus = + StatusMessage.create( + EthProtocol.EthVersion.V63, + BigInteger.ONE, + blockchain.getChainHead().getTotalDifficulty(), + blockchain.getChainHeadHash(), + blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get().getHash()); + + ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(stakePeer, stakePeerStatus)); + + ethManager.onCrossingMergeBoundary( + true, Optional.of(blockchain.getChainHead().getTotalDifficulty())); + ethManager.onNewForkchoiceMessage( + Hash.EMPTY, Optional.of(Hash.hash(Bytes.of(1))), Hash.EMPTY); + ethManager.onNewForkchoiceMessage( + Hash.EMPTY, Optional.of(Hash.hash(Bytes.of(2))), Hash.EMPTY); + + ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(workPeer, workPeerStatus)); + assertThat(workPeer.isDisconnected()).isTrue(); + assertThat(workPeer.getDisconnectReason()).isPresent(); + assertThat(workPeer.getDisconnectReason()) + .get() + .isEqualTo(DisconnectReason.SUBPROTOCOL_TRIGGERED); + assertThat(stakePeer.isDisconnected()).isFalse(); + } + } + @Test public void disconnectOnVeryLargeMessage() { try (final EthProtocolManager ethManager = @@ -293,7 +390,7 @@ public void respondToGetHeaders() throws ExecutionException, InterruptedExceptio final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message); final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); - assertThat(headers.size()).isEqualTo(blockCount); + assertThat(headers).hasSize(blockCount); for (int i = 0; i < blockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i); } @@ -330,7 +427,7 @@ public void respondToGetHeadersWithinLimits() throws ExecutionException, Interru final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message); final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); - assertThat(headers.size()).isEqualTo(limit); + assertThat(headers).hasSize(limit); for (int i = 0; i < limit; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i); } @@ -366,7 +463,7 @@ public void respondToGetHeadersReversed() throws ExecutionException, Interrupted final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message); final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); - assertThat(headers.size()).isEqualTo(blockCount); + assertThat(headers).hasSize(blockCount); for (int i = 0; i < blockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(endBlock - i); } @@ -404,7 +501,7 @@ public void respondToGetHeadersWithSkip() throws ExecutionException, Interrupted final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message); final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); - assertThat(headers.size()).isEqualTo(blockCount); + assertThat(headers).hasSize(blockCount); for (int i = 0; i < blockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i * (skip + 1)); } @@ -443,7 +540,7 @@ public void respondToGetHeadersReversedWithSkip() final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message); final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); - assertThat(headers.size()).isEqualTo(blockCount); + assertThat(headers).hasSize(blockCount); for (int i = 0; i < blockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(endBlock - i * (skip + 1)); } @@ -502,7 +599,7 @@ public void respondToGetHeadersPartial() throws ExecutionException, InterruptedE final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message); final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); - assertThat(headers.size()).isEqualTo(2); + assertThat(headers).hasSize(2); for (int i = 0; i < 2; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i); } @@ -539,7 +636,7 @@ public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedExc final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message); final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); - assertThat(headers.size()).isEqualTo(0); + assertThat(headers).isEmpty(); done.complete(null); }; final PeerConnection peer = setupPeer(ethManager, onSend); @@ -583,7 +680,7 @@ public void respondToGetBodies() throws ExecutionException, InterruptedException final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(message); final List bodies = Lists.newArrayList(blocksMessage.bodies(protocolSchedule)); - assertThat(bodies.size()).isEqualTo(blockCount); + assertThat(bodies).hasSize(blockCount); for (int i = 0; i < blockCount; i++) { assertThat(expectedBlocks[i].getBody()).isEqualTo(bodies.get(i)); } @@ -632,7 +729,7 @@ public void respondToGetBodiesWithinLimits() throws ExecutionException, Interrup final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(message); final List bodies = Lists.newArrayList(blocksMessage.bodies(protocolSchedule)); - assertThat(bodies.size()).isEqualTo(limit); + assertThat(bodies).hasSize(limit); for (int i = 0; i < limit; i++) { assertThat(expectedBlocks[i].getBody()).isEqualTo(bodies.get(i)); } @@ -676,7 +773,7 @@ public void respondToGetBodiesPartial() throws ExecutionException, InterruptedEx final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(message); final List bodies = Lists.newArrayList(blocksMessage.bodies(protocolSchedule)); - assertThat(bodies.size()).isEqualTo(1); + assertThat(bodies).hasSize(1); assertThat(expectedBlock.getBody()).isEqualTo(bodies.get(0)); done.complete(null); }; @@ -721,7 +818,7 @@ public void respondToGetReceipts() throws ExecutionException, InterruptedExcepti final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(message); final List> receipts = Lists.newArrayList(receiptsMessage.receipts()); - assertThat(receipts.size()).isEqualTo(blockCount); + assertThat(receipts).hasSize(blockCount); for (int i = 0; i < blockCount; i++) { assertThat(expectedReceipts.get(i)).isEqualTo(receipts.get(i)); } @@ -769,7 +866,7 @@ public void respondToGetReceiptsWithinLimits() throws ExecutionException, Interr final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(message); final List> receipts = Lists.newArrayList(receiptsMessage.receipts()); - assertThat(receipts.size()).isEqualTo(limit); + assertThat(receipts).hasSize(limit); for (int i = 0; i < limit; i++) { assertThat(expectedReceipts.get(i)).isEqualTo(receipts.get(i)); } @@ -813,7 +910,7 @@ public void respondToGetReceiptsPartial() throws ExecutionException, Interrupted final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(message); final List> receipts = Lists.newArrayList(receiptsMessage.receipts()); - assertThat(receipts.size()).isEqualTo(1); + assertThat(receipts).hasSize(1); assertThat(expectedReceipts).isEqualTo(receipts.get(0)); done.complete(null); }; @@ -861,7 +958,7 @@ public void respondToGetNodeData() throws Exception { assertThat(message.getCode()).isEqualTo(EthPV63.NODE_DATA); final NodeDataMessage receiptsMessage = NodeDataMessage.readFrom(message); final List nodeData = receiptsMessage.nodeData(); - assertThat(nodeData.size()).isEqualTo(blockCount); + assertThat(nodeData).hasSize(blockCount); for (int i = 0; i < blockCount; i++) { assertThat(expectedResults.get(i)).isEqualTo(nodeData.get(i)); } @@ -928,7 +1025,7 @@ public void newBlockMinedSendsNewBlockMessageToAllPeers() { assertThat(msg.totalDifficulty(protocolSchdeule)).isEqualTo(expectedTotalDifficulty); } - assertThat(receivingPeerCaptor.getAllValues().containsAll(peers)).isTrue(); + assertThat(receivingPeerCaptor.getAllValues()).containsAll(peers); } } @@ -970,7 +1067,7 @@ public void shouldSuccessfullyRespondToGetHeadersRequestLessThanZero() final BlockHeadersMessage headersMsg = BlockHeadersMessage.readFrom(message); final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); - assertThat(headers.size()).isEqualTo(receivedBlockCount); + assertThat(headers).hasSize(receivedBlockCount); for (int i = 0; i < receivedBlockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(receivedBlockCount - 1 - i); }