From bbcc438244b9966e95ee693150323208c42a87f6 Mon Sep 17 00:00:00 2001 From: matkt Date: Wed, 14 Oct 2020 15:24:31 +0200 Subject: [PATCH] Add cache limit for pending blocks (#1406) If a peer exceeds the authorized number of pending blocks, Besu will replace the lowest priority block in the cache from this peer by this new one until the local node sync a new block and maybe purges one of the blocks of this peer. The highest priority blocks are those that are lowest in block height and then higher priority if they were sent more recently. Other peers will not be impacted and will be able to continue sending pending blocks. The cache size limit is the distance between the minimum and maximum value of the BlockPropagationRange parameter. Besu automatically purges blocks outside this range. Signed-off-by: Karim TAAM --- .../ethereum/core/BlockDataGenerator.java | 16 +- ethereum/eth/build.gradle | 3 + .../eth/sync/BlockPropagationManager.java | 33 +-- .../eth/sync/DefaultSynchronizer.java | 4 +- ...gBlocks.java => PendingBlocksManager.java} | 41 +++- .../eth/sync/state/cache/PendingBlock.java | 28 +++ .../sync/state/cache/PendingBlockCache.java | 75 ++++++ .../eth/sync/BlockPropagationManagerTest.java | 37 +-- .../sync/state/PendingBlocksManagerTest.java | 230 ++++++++++++++++++ .../eth/sync/state/PendingBlocksTest.java | 127 ---------- .../state/cache/PendingBlockCacheTest.java | 140 +++++++++++ 11 files changed, 562 insertions(+), 172 deletions(-) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/{PendingBlocks.java => PendingBlocksManager.java} (67%) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlock.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlockCache.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java delete mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksTest.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlockCacheTest.java diff --git a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java index a6dae8f4ed3..ed40b3e0e89 100644 --- a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java +++ b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java @@ -265,6 +265,7 @@ public BlockHeader header(final long number, final BlockBody body, final BlockOp final int gasLimit = random.nextInt() & Integer.MAX_VALUE; final int gasUsed = Math.max(0, gasLimit - 1); final long blockNonce = random.nextLong(); + return BlockHeaderBuilder.create() .parentHash(options.getParentHash(hash())) .ommersHash(BodyValidation.ommersHash(body.getOmmers())) @@ -277,7 +278,10 @@ public BlockHeader header(final long number, final BlockBody body, final BlockOp .number(number) .gasLimit(gasLimit) .gasUsed(options.getGasUsed(gasUsed)) - .timestamp(Instant.now().truncatedTo(ChronoUnit.SECONDS).getEpochSecond()) + .timestamp( + options + .getTimestamp() + .orElse(Instant.now().truncatedTo(ChronoUnit.SECONDS).getEpochSecond())) .extraData(options.getExtraData(bytes32())) .mixHash(hash()) .nonce(blockNonce) @@ -511,6 +515,7 @@ public static class BlockOptions { private Optional receiptsRoot = Optional.empty(); private Optional gasUsed = Optional.empty(); private Optional logsBloom = Optional.empty(); + private Optional timestamp = Optional.empty(); private boolean hasOmmers = true; private boolean hasTransactions = true; @@ -562,6 +567,10 @@ public LogsBloomFilter getLogsBloom(final LogsBloomFilter defaultValue) { return logsBloom.orElse(defaultValue); } + public Optional getTimestamp() { + return timestamp; + } + public boolean hasTransactions() { return hasTransactions; } @@ -638,5 +647,10 @@ public BlockOptions hasOmmers(final boolean hasOmmers) { this.hasOmmers = hasOmmers; return this; } + + public BlockOptions setTimestamp(final Long timestamp) { + this.timestamp = Optional.of(timestamp); + return this; + } } } diff --git a/ethereum/eth/build.gradle b/ethereum/eth/build.gradle index 4b5aa1a54c5..051d462267c 100644 --- a/ethereum/eth/build.gradle +++ b/ethereum/eth/build.gradle @@ -45,6 +45,9 @@ dependencies { implementation 'org.apache.tuweni:tuweni-bytes' implementation 'org.apache.tuweni:tuweni-units' + annotationProcessor "org.immutables:value" + implementation "org.immutables:value-annotations" + testImplementation project(':config') testImplementation project(path: ':config', configuration: 'testSupportArtifacts') testImplementation project(':crypto') diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java index 4c3f23974fd..1d7b841f166 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java @@ -32,7 +32,7 @@ import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage; import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage.NewBlockHash; import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage; -import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocks; +import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.tasks.PersistBlockTask; import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator; @@ -58,6 +58,7 @@ import com.google.common.collect.Range; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes; public class BlockPropagationManager { private static final Logger LOG = LogManager.getLogger(); @@ -74,7 +75,7 @@ public class BlockPropagationManager { private final Set requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final PendingBlocks pendingBlocks; + private final PendingBlocksManager pendingBlocksManager; BlockPropagationManager( final SynchronizerConfiguration config, @@ -82,7 +83,7 @@ public class BlockPropagationManager { final ProtocolContext protocolContext, final EthContext ethContext, final SyncState syncState, - final PendingBlocks pendingBlocks, + final PendingBlocksManager pendingBlocksManager, final MetricsSystem metricsSystem, final BlockBroadcaster blockBroadcaster) { this.config = config; @@ -92,7 +93,7 @@ public class BlockPropagationManager { this.metricsSystem = metricsSystem; this.blockBroadcaster = blockBroadcaster; this.syncState = syncState; - this.pendingBlocks = pendingBlocks; + this.pendingBlocksManager = pendingBlocksManager; } public void start() { @@ -117,12 +118,12 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) { final Block newBlock = blockAddedEvent.getBlock(); final List readyForImport; - synchronized (pendingBlocks) { + synchronized (pendingBlocksManager) { // Remove block from pendingBlocks list - pendingBlocks.deregisterPendingBlock(newBlock); + pendingBlocksManager.deregisterPendingBlock(newBlock); // Import any pending blocks that are children of the newly added block - readyForImport = pendingBlocks.childrenOf(newBlock.getHash()); + readyForImport = pendingBlocksManager.childrenOf(newBlock.getHash()); } if (!readyForImport.isEmpty()) { @@ -148,7 +149,7 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) { if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) { final long head = protocolContext.getBlockchain().getChainHeadBlockNumber(); final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint(); - pendingBlocks.purgeBlocksOlderThan(cutoff); + pendingBlocksManager.purgeBlocksOlderThan(cutoff); } } @@ -168,14 +169,14 @@ private void handleNewBlockFromNetwork(final EthMessage message) { block.getHeader().getNumber(), localChainHeight, bestChainHeight)) { return; } - if (pendingBlocks.contains(block.getHash())) { + if (pendingBlocksManager.contains(block.getHash())) { return; } if (blockchain.contains(block.getHash())) { return; } - importOrSavePendingBlock(block); + importOrSavePendingBlock(block, message.getPeer().nodeId()); } catch (final RLPException e) { LOG.debug( "Malformed NEW_BLOCK message received from peer, disconnecting: {}", @@ -212,7 +213,7 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) { if (requestedBlocks.contains(announcedBlock.hash())) { continue; } - if (pendingBlocks.contains(announcedBlock.hash())) { + if (pendingBlocksManager.contains(announcedBlock.hash())) { continue; } if (importingBlocks.contains(announcedBlock.hash())) { @@ -247,7 +248,9 @@ private CompletableFuture processAnnouncedBlock( protocolSchedule, ethContext, newBlock.hash(), newBlock.number(), metricsSystem) .assignPeer(peer); - return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); + return getBlockTask + .run() + .thenCompose((r) -> importOrSavePendingBlock(r.getResult(), peer.nodeId())); } private void broadcastBlock(final Block block, final BlockHeader parent) { @@ -261,14 +264,14 @@ private void broadcastBlock(final Block block, final BlockHeader parent) { } @VisibleForTesting - CompletableFuture importOrSavePendingBlock(final Block block) { + CompletableFuture importOrSavePendingBlock(final Block block, final Bytes nodeId) { // Synchronize to avoid race condition where block import event fires after the // blockchain.contains() check and before the block is registered, causing onBlockAdded() to be // invoked for the parent of this block before we are able to register it. - synchronized (pendingBlocks) { + synchronized (pendingBlocksManager) { if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) { // Block isn't connected to local chain, save it to pending blocks collection - if (pendingBlocks.registerPendingBlock(block)) { + if (pendingBlocksManager.registerPendingBlock(block, nodeId)) { LOG.info( "Saving announced block {} ({}) for future import", block.getHeader().getNumber(), diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 972187ab2cf..fdd229c7117 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -24,7 +24,7 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncException; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader; -import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocks; +import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.worldstate.Pruner; @@ -83,7 +83,7 @@ public DefaultSynchronizer( protocolContext, ethContext, syncState, - new PendingBlocks(), + new PendingBlocksManager(syncConfig), metricsSystem, blockBroadcaster); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocks.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java similarity index 67% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocks.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java index 50d20ebdbe7..84fb7274791 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocks.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java @@ -18,6 +18,9 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.state.cache.ImmutablePendingBlock; +import org.hyperledger.besu.ethereum.eth.sync.state.cache.PendingBlockCache; import java.util.Collections; import java.util.List; @@ -27,35 +30,49 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -public class PendingBlocks { +import org.apache.tuweni.bytes.Bytes; + +public class PendingBlocksManager { + + private final PendingBlockCache pendingBlocks; - private final Map pendingBlocks = new ConcurrentHashMap<>(); private final Map> pendingBlocksByParentHash = new ConcurrentHashMap<>(); + public PendingBlocksManager(final SynchronizerConfiguration synchronizerConfiguration) { + + pendingBlocks = + new PendingBlockCache( + (Math.abs(synchronizerConfiguration.getBlockPropagationRange().lowerEndpoint()) + + Math.abs(synchronizerConfiguration.getBlockPropagationRange().upperEndpoint()))); + } + /** * Track the given block. * - * @param pendingBlock the block to track + * @param block the block to track + * @param nodeId node that sent the block * @return true if the block was added (was not previously present) */ - public boolean registerPendingBlock(final Block pendingBlock) { - final Block previousValue = - this.pendingBlocks.putIfAbsent(pendingBlock.getHash(), pendingBlock); + public boolean registerPendingBlock(final Block block, final Bytes nodeId) { + + final ImmutablePendingBlock previousValue = + this.pendingBlocks.putIfAbsent( + block.getHash(), ImmutablePendingBlock.builder().block(block).nodeId(nodeId).build()); if (previousValue != null) { return false; } pendingBlocksByParentHash .computeIfAbsent( - pendingBlock.getHeader().getParentHash(), + block.getHeader().getParentHash(), h -> { final Set set = newSetFromMap(new ConcurrentHashMap<>()); // Go ahead and add our value at construction, so that we don't set an empty set which // could be removed in deregisterPendingBlock - set.add(pendingBlock.getHash()); + set.add(block.getHash()); return set; }) - .add(pendingBlock.getHash()); + .add(block.getHash()); return true; } @@ -68,7 +85,7 @@ public boolean registerPendingBlock(final Block pendingBlock) { */ public boolean deregisterPendingBlock(final Block block) { final Hash parentHash = block.getHeader().getParentHash(); - final Block removed = pendingBlocks.remove(block.getHash()); + final ImmutablePendingBlock removed = pendingBlocks.remove(block.getHash()); final Set blocksForParent = pendingBlocksByParentHash.get(parentHash); if (blocksForParent != null) { blocksForParent.remove(block.getHash()); @@ -79,7 +96,8 @@ public boolean deregisterPendingBlock(final Block block) { public void purgeBlocksOlderThan(final long blockNumber) { pendingBlocks.values().stream() - .filter(b -> b.getHeader().getNumber() < blockNumber) + .filter(b -> b.block().getHeader().getNumber() < blockNumber) + .map(ImmutablePendingBlock::block) .forEach(this::deregisterPendingBlock); } @@ -95,6 +113,7 @@ public List childrenOf(final Hash parentBlock) { return blocksByParent.stream() .map(pendingBlocks::get) .filter(Objects::nonNull) + .map(ImmutablePendingBlock::block) .collect(Collectors.toList()); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlock.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlock.java new file mode 100644 index 00000000000..ff48c95643e --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlock.java @@ -0,0 +1,28 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.state.cache; + +import org.hyperledger.besu.ethereum.core.Block; + +import org.apache.tuweni.bytes.Bytes; +import org.immutables.value.Value; + +@Value.Immutable +public interface PendingBlock { + + Block block(); + + Bytes nodeId(); +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlockCache.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlockCache.java new file mode 100644 index 00000000000..c568eaf01a3 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlockCache.java @@ -0,0 +1,75 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.state.cache; + +import org.hyperledger.besu.ethereum.core.Hash; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.apache.tuweni.bytes.Bytes; + +public class PendingBlockCache extends ConcurrentHashMap { + + private final long cacheSizePerPeer; + + public PendingBlockCache(final long cacheSizePerPeer) { + this.cacheSizePerPeer = cacheSizePerPeer; + } + + /** + * Adds the specified hash in the cache if it is not already associated with a value. Otherwise + * returns the current value. + * + * @return the previous value associated with the specified key, or {@code null} if there was no + * mapping for the hash + */ + @Override + public ImmutablePendingBlock putIfAbsent( + final Hash hash, final ImmutablePendingBlock pendingBlock) { + final ImmutablePendingBlock foundBlock = super.putIfAbsent(hash, pendingBlock); + if (foundBlock == null) { + removeLowestPriorityBlockWhenCacheFull(pendingBlock.nodeId()); + } + return foundBlock; + } + + /** + * Removes the lowest priority block if a peer has reached the cache limit it is allowed to use + * The highest priority blocks are those that are lowest in block height and then higher priority + * if they were sent more recently. + * + * @param nodeId id of the peer + */ + private void removeLowestPriorityBlockWhenCacheFull(final Bytes nodeId) { + final List blockByNodeId = + values().stream().filter(value -> value.nodeId() == nodeId).collect(Collectors.toList()); + if (blockByNodeId.size() > cacheSizePerPeer) { + blockByNodeId.stream() + .min(getComparatorByBlockNumber().reversed().thenComparing(getComparatorByTimeStamp())) + .ifPresent(value -> remove(value.block().getHash())); + } + } + + private Comparator getComparatorByBlockNumber() { + return Comparator.comparing(s -> s.block().getHeader().getNumber()); + } + + private Comparator getComparatorByTimeStamp() { + return Comparator.comparing(s -> s.block().getHeader().getTimestamp()); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManagerTest.java index 2d0f9de7d00..68c618a365b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManagerTest.java @@ -46,7 +46,7 @@ import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer.Responder; import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage; import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage; -import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocks; +import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; @@ -59,6 +59,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import org.apache.tuweni.bytes.Bytes; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -67,6 +68,8 @@ public class BlockPropagationManagerTest { + private static final Bytes NODE_ID_1 = Bytes.fromHexString("0x00"); + private static Blockchain fullBlockchain; private BlockchainSetupUtil blockchainUtil; @@ -77,7 +80,9 @@ public class BlockPropagationManagerTest { private EthProtocolManager ethProtocolManager; private BlockPropagationManager blockPropagationManager; private SynchronizerConfiguration syncConfig; - private final PendingBlocks pendingBlocks = new PendingBlocks(); + private final PendingBlocksManager pendingBlocksManager = + new PendingBlocksManager( + SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build()); private SyncState syncState; private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @@ -113,7 +118,7 @@ public void setup() { protocolContext, ethProtocolManager.ethContext(), syncState, - pendingBlocks, + pendingBlocksManager, metricsSystem, blockBroadcaster); } @@ -324,7 +329,7 @@ public void handlesDuplicateAnnouncements() { protocolContext, ethProtocolManager.ethContext(), syncState, - pendingBlocks, + pendingBlocksManager, metricsSystem, blockBroadcaster); @@ -376,7 +381,7 @@ public void handlesPendingDuplicateAnnouncements() { protocolContext, ethProtocolManager.ethContext(), syncState, - pendingBlocks, + pendingBlocksManager, metricsSystem, blockBroadcaster); blockchainUtil.importFirstBlocks(2); @@ -486,7 +491,7 @@ public void ignoresOldNewBlockHashAnnouncement() { final Responder responder = RespondingEthPeer.blockchainResponder(fullBlockchain); peer.respondWhile(responder, peer::hasOutstandingRequests); - verify(propManager, times(0)).importOrSavePendingBlock(any()); + verify(propManager, times(0)).importOrSavePendingBlock(any(), any(Bytes.class)); assertThat(blockchain.contains(oldBlock.getHash())).isFalse(); } @@ -512,7 +517,7 @@ public void ignoresOldNewBlockAnnouncement() { final Responder responder = RespondingEthPeer.blockchainResponder(fullBlockchain); peer.respondWhile(responder, peer::hasOutstandingRequests); - verify(propManager, times(0)).importOrSavePendingBlock(any()); + verify(propManager, times(0)).importOrSavePendingBlock(any(), any(Bytes.class)); assertThat(blockchain.contains(oldBlock.getHash())).isFalse(); } @@ -528,7 +533,7 @@ public void purgesOldBlocks() { protocolContext, ethProtocolManager.ethContext(), syncState, - pendingBlocks, + pendingBlocksManager, metricsSystem, blockBroadcaster); @@ -551,20 +556,20 @@ public void purgesOldBlocks() { // Check that we pushed our block into the pending collection assertThat(blockchain.contains(blockToPurge.getHash())).isFalse(); - assertThat(pendingBlocks.contains(blockToPurge.getHash())).isTrue(); + assertThat(pendingBlocksManager.contains(blockToPurge.getHash())).isTrue(); // Import blocks until we bury the target block far enough to be cleaned up for (int i = 0; i < oldBlocksToImport; i++) { blockchainUtil.importBlockAtIndex((int) blockchain.getChainHeadBlockNumber() + 1); assertThat(blockchain.contains(blockToPurge.getHash())).isFalse(); - assertThat(pendingBlocks.contains(blockToPurge.getHash())).isTrue(); + assertThat(pendingBlocksManager.contains(blockToPurge.getHash())).isTrue(); } // Import again to trigger cleanup blockchainUtil.importBlockAtIndex((int) blockchain.getChainHeadBlockNumber() + 1); assertThat(blockchain.contains(blockToPurge.getHash())).isFalse(); - assertThat(pendingBlocks.contains(blockToPurge.getHash())).isFalse(); + assertThat(pendingBlocksManager.contains(blockToPurge.getHash())).isFalse(); } @Test @@ -611,15 +616,15 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() { protocolContext, ethContext, syncState, - pendingBlocks, + pendingBlocksManager, metricsSystem, blockBroadcaster); blockchainUtil.importFirstBlocks(2); final Block nextBlock = blockchainUtil.getBlock(2); - blockPropagationManager.importOrSavePendingBlock(nextBlock); - blockPropagationManager.importOrSavePendingBlock(nextBlock); + blockPropagationManager.importOrSavePendingBlock(nextBlock, NODE_ID_1); + blockPropagationManager.importOrSavePendingBlock(nextBlock, NODE_ID_1); verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class)); } @@ -668,7 +673,7 @@ public Object answer(final InvocationOnMock invocation) throws Throwable { protocolContext, ethContext, syncState, - pendingBlocks, + pendingBlocksManager, metricsSystem, blockBroadcaster); @@ -685,7 +690,7 @@ public Object answer(final InvocationOnMock invocation) throws Throwable { .setBlockHeaderFunctions(new MainnetBlockHeaderFunctions())); assertThat(badBlocksManager.getBadBlocks()).isEmpty(); - blockPropagationManager.importOrSavePendingBlock(badBlock); + blockPropagationManager.importOrSavePendingBlock(badBlock, NODE_ID_1); assertThat(badBlocksManager.getBadBlocks().size()).isEqualTo(1); verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class)); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java new file mode 100644 index 00000000000..024252b40b6 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java @@ -0,0 +1,230 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.state; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import org.apache.tuweni.bytes.Bytes; +import org.junit.Before; +import org.junit.Test; + +public class PendingBlocksManagerTest { + + private static final Bytes NODE_ID_1 = Bytes.fromHexString("0x00"); + private static final Bytes NODE_ID_2 = Bytes.fromHexString("0x01"); + + private PendingBlocksManager pendingBlocksManager; + private BlockDataGenerator gen; + + @Before + public void setup() { + pendingBlocksManager = + new PendingBlocksManager( + SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build()); + gen = new BlockDataGenerator(); + } + + @Test + public void registerPendingBlock() { + final Block block = gen.block(); + + // Sanity check + assertThat(pendingBlocksManager.contains(block.getHash())).isFalse(); + + pendingBlocksManager.registerPendingBlock(block, NODE_ID_1); + + assertThat(pendingBlocksManager.contains(block.getHash())).isTrue(); + final List pendingBlocksForParent = + pendingBlocksManager.childrenOf(block.getHeader().getParentHash()); + assertThat(pendingBlocksForParent).isEqualTo(Collections.singletonList(block)); + } + + @Test + public void deregisterPendingBlock() { + final Block block = gen.block(); + pendingBlocksManager.registerPendingBlock(block, NODE_ID_1); + pendingBlocksManager.deregisterPendingBlock(block); + + assertThat(pendingBlocksManager.contains(block.getHash())).isFalse(); + final List pendingBlocksForParent = + pendingBlocksManager.childrenOf(block.getHeader().getParentHash()); + assertThat(pendingBlocksForParent).isEqualTo(Collections.emptyList()); + } + + @Test + public void registerSiblingBlocks() { + final BlockDataGenerator gen = new BlockDataGenerator(); + final Block parentBlock = gen.block(); + final Block childBlock = gen.nextBlock(parentBlock); + final Block childBlock2 = gen.nextBlock(parentBlock); + final List children = Arrays.asList(childBlock, childBlock2); + + pendingBlocksManager.registerPendingBlock(childBlock, NODE_ID_1); + pendingBlocksManager.registerPendingBlock(childBlock2, NODE_ID_1); + + assertThat(pendingBlocksManager.contains(childBlock.getHash())).isTrue(); + assertThat(pendingBlocksManager.contains(childBlock2.getHash())).isTrue(); + + final List pendingBlocksForParent = + pendingBlocksManager.childrenOf(parentBlock.getHash()); + assertThat(pendingBlocksForParent.size()).isEqualTo(2); + assertThat(new HashSet<>(pendingBlocksForParent)).isEqualTo(new HashSet<>(children)); + } + + @Test + public void deregisterSubsetOfSiblingBlocks() { + final BlockDataGenerator gen = new BlockDataGenerator(); + final Block parentBlock = gen.block(); + final Block childBlock = gen.nextBlock(parentBlock); + final Block childBlock2 = gen.nextBlock(parentBlock); + + pendingBlocksManager.registerPendingBlock(childBlock, NODE_ID_1); + pendingBlocksManager.registerPendingBlock(childBlock2, NODE_ID_1); + pendingBlocksManager.deregisterPendingBlock(childBlock); + + assertThat(pendingBlocksManager.contains(childBlock.getHash())).isFalse(); + assertThat(pendingBlocksManager.contains(childBlock2.getHash())).isTrue(); + + final List pendingBlocksForParent = + pendingBlocksManager.childrenOf(parentBlock.getHash()); + assertThat(pendingBlocksForParent).isEqualTo(Collections.singletonList(childBlock2)); + } + + @Test + public void purgeBlocks() { + pendingBlocksManager = + new PendingBlocksManager( + SynchronizerConfiguration.builder().blockPropagationRange(0, 15).build()); + final List blocks = gen.blockSequence(10); + + for (final Block block : blocks) { + pendingBlocksManager.registerPendingBlock(block, NODE_ID_1); + assertThat(pendingBlocksManager.contains(block.getHash())).isTrue(); + } + + final List blocksToPurge = blocks.subList(0, 5); + final List blocksToKeep = blocks.subList(5, blocks.size()); + pendingBlocksManager.purgeBlocksOlderThan(blocksToKeep.get(0).getHeader().getNumber()); + + for (final Block block : blocksToPurge) { + assertThat(pendingBlocksManager.contains(block.getHash())).isFalse(); + assertThat(pendingBlocksManager.childrenOf(block.getHeader().getParentHash()).size()) + .isEqualTo(0); + } + for (final Block block : blocksToKeep) { + assertThat(pendingBlocksManager.contains(block.getHash())).isTrue(); + assertThat(pendingBlocksManager.childrenOf(block.getHeader().getParentHash()).size()) + .isEqualTo(1); + } + } + + @Test + public void shouldPreventNodeFromFillingCache() { + final int nbBlocks = 4; + pendingBlocksManager = + new PendingBlocksManager( + SynchronizerConfiguration.builder().blockPropagationRange(-1, 2).build()); + final BlockDataGenerator gen = new BlockDataGenerator(); + final Block parentBlock = gen.block(); + + // add new blocks from node 1 + final ArrayDeque childBlockFromNodeOne = new ArrayDeque<>(); + for (int i = 0; i < nbBlocks; i++) { + final Block generatedBlock = + gen.block(gen.nextBlockOptions(parentBlock).setTimestamp((long) i)); + childBlockFromNodeOne.add(generatedBlock); + pendingBlocksManager.registerPendingBlock(generatedBlock, NODE_ID_1); + } + + // add new block from node 2 + final Block childBlockFromNodeTwo = gen.nextBlock(parentBlock); + pendingBlocksManager.registerPendingBlock(childBlockFromNodeTwo, NODE_ID_2); + + // check blocks from node 1 in the cache (node 1 should replace the lowest priority block) + List pendingBlocksForParent = pendingBlocksManager.childrenOf(parentBlock.getHash()); + for (int i = 0; i < nbBlocks; i++) { + final Block foundBlock = childBlockFromNodeOne.poll(); + if (i != 0) { + assertThat(pendingBlocksManager.contains(foundBlock.getHash())).isTrue(); + assertThat(pendingBlocksForParent).contains(foundBlock); + } else { + assertThat(pendingBlocksManager.contains(foundBlock.getHash())).isFalse(); + assertThat(pendingBlocksForParent).doesNotContain(foundBlock); + } + } + // check blocks from node 2 in the cache (node 1 could not prevent node 2 from adding its + // blocks) + assertThat(pendingBlocksManager.contains(childBlockFromNodeTwo.getHash())).isTrue(); + assertThat(pendingBlocksForParent).contains(childBlockFromNodeTwo); + } + + @Test + public void shouldReplaceLowestPriorityBlockWhenCacheIsFull() { + final int nbBlocks = 3; + pendingBlocksManager = + new PendingBlocksManager( + SynchronizerConfiguration.builder().blockPropagationRange(-1, 3).build()); + final BlockDataGenerator gen = new BlockDataGenerator(); + final List childBlockFromNodeOne = gen.blockSequence(nbBlocks); + Block reorgBlock = null; + + // add new blocks from node 1 + for (Block block : childBlockFromNodeOne) { + pendingBlocksManager.registerPendingBlock(block, NODE_ID_1); + if (block.getHeader().getNumber() == 1) { + // add reorg block + reorgBlock = + gen.block( + gen.nextBlockOptions(block).setTimestamp(block.getHeader().getTimestamp() + 1)); + pendingBlocksManager.registerPendingBlock(reorgBlock, NODE_ID_1); + } + } + // BLOCK 0 , BLOCK 1, BLOCK 2, BLOCK 2-reorg + + // try to add a new block (not added because low priority : block number too high) + final Block lowPriorityBlock = + gen.block(BlockDataGenerator.BlockOptions.create().setBlockNumber(10)); + pendingBlocksManager.registerPendingBlock(lowPriorityBlock, NODE_ID_1); + assertThat(pendingBlocksManager.contains(lowPriorityBlock.getHash())).isFalse(); + + // try to add a new block (added because high priority : low block number and high timestamp) + final Block highPriorityBlock = + gen.block(gen.nextBlockOptions(childBlockFromNodeOne.get(0)).setTimestamp(Long.MAX_VALUE)); + pendingBlocksManager.registerPendingBlock(highPriorityBlock, NODE_ID_1); + assertThat(pendingBlocksManager.contains(highPriorityBlock.getHash())).isTrue(); + // BLOCK 0 , BLOCK 1, BLOCK 1-reorg, BLOCK 2-reorg + + // check blocks in the cache + // and verify remove the block with the lowest priority (BLOCK-2) + for (Block block : childBlockFromNodeOne) { + if (block.getHeader().getNumber() == 2) { + assertThat(pendingBlocksManager.contains(block.getHash())).isFalse(); + } else { + assertThat(pendingBlocksManager.contains(block.getHash())).isTrue(); + } + } + assertThat(pendingBlocksManager.contains(reorgBlock.getHash())).isTrue(); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksTest.java deleted file mode 100644 index cacd143d384..00000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.sync.state; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.hyperledger.besu.ethereum.core.Block; -import org.hyperledger.besu.ethereum.core.BlockDataGenerator; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; - -import org.junit.Before; -import org.junit.Test; - -public class PendingBlocksTest { - - private PendingBlocks pendingBlocks; - private BlockDataGenerator gen; - - @Before - public void setup() { - pendingBlocks = new PendingBlocks(); - gen = new BlockDataGenerator(); - } - - @Test - public void registerPendingBlock() { - final Block block = gen.block(); - - // Sanity check - assertThat(pendingBlocks.contains(block.getHash())).isFalse(); - - pendingBlocks.registerPendingBlock(block); - - assertThat(pendingBlocks.contains(block.getHash())).isTrue(); - final List pendingBlocksForParent = - pendingBlocks.childrenOf(block.getHeader().getParentHash()); - assertThat(pendingBlocksForParent).isEqualTo(Collections.singletonList(block)); - } - - @Test - public void deregisterPendingBlock() { - final Block block = gen.block(); - pendingBlocks.registerPendingBlock(block); - pendingBlocks.deregisterPendingBlock(block); - - assertThat(pendingBlocks.contains(block.getHash())).isFalse(); - final List pendingBlocksForParent = - pendingBlocks.childrenOf(block.getHeader().getParentHash()); - assertThat(pendingBlocksForParent).isEqualTo(Collections.emptyList()); - } - - @Test - public void registerSiblingBlocks() { - final BlockDataGenerator gen = new BlockDataGenerator(); - final Block parentBlock = gen.block(); - final Block childBlock = gen.nextBlock(parentBlock); - final Block childBlock2 = gen.nextBlock(parentBlock); - final List children = Arrays.asList(childBlock, childBlock2); - - pendingBlocks.registerPendingBlock(childBlock); - pendingBlocks.registerPendingBlock(childBlock2); - - assertThat(pendingBlocks.contains(childBlock.getHash())).isTrue(); - assertThat(pendingBlocks.contains(childBlock2.getHash())).isTrue(); - - final List pendingBlocksForParent = pendingBlocks.childrenOf(parentBlock.getHash()); - assertThat(pendingBlocksForParent.size()).isEqualTo(2); - assertThat(new HashSet<>(pendingBlocksForParent)).isEqualTo(new HashSet<>(children)); - } - - @Test - public void deregisterSubsetOfSiblingBlocks() { - final BlockDataGenerator gen = new BlockDataGenerator(); - final Block parentBlock = gen.block(); - final Block childBlock = gen.nextBlock(parentBlock); - final Block childBlock2 = gen.nextBlock(parentBlock); - - pendingBlocks.registerPendingBlock(childBlock); - pendingBlocks.registerPendingBlock(childBlock2); - pendingBlocks.deregisterPendingBlock(childBlock); - - assertThat(pendingBlocks.contains(childBlock.getHash())).isFalse(); - assertThat(pendingBlocks.contains(childBlock2.getHash())).isTrue(); - - final List pendingBlocksForParent = pendingBlocks.childrenOf(parentBlock.getHash()); - assertThat(pendingBlocksForParent).isEqualTo(Collections.singletonList(childBlock2)); - } - - @Test - public void purgeBlocks() { - final List blocks = gen.blockSequence(10); - - for (final Block block : blocks) { - pendingBlocks.registerPendingBlock(block); - assertThat(pendingBlocks.contains(block.getHash())).isTrue(); - } - - final List blocksToPurge = blocks.subList(0, 5); - final List blocksToKeep = blocks.subList(5, blocks.size()); - pendingBlocks.purgeBlocksOlderThan(blocksToKeep.get(0).getHeader().getNumber()); - - for (final Block block : blocksToPurge) { - assertThat(pendingBlocks.contains(block.getHash())).isFalse(); - assertThat(pendingBlocks.childrenOf(block.getHeader().getParentHash()).size()).isEqualTo(0); - } - for (final Block block : blocksToKeep) { - assertThat(pendingBlocks.contains(block.getHash())).isTrue(); - assertThat(pendingBlocks.childrenOf(block.getHeader().getParentHash()).size()).isEqualTo(1); - } - } -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlockCacheTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlockCacheTest.java new file mode 100644 index 00000000000..dee3ab8881f --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/cache/PendingBlockCacheTest.java @@ -0,0 +1,140 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.state.cache; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; + +import org.apache.tuweni.bytes.Bytes; +import org.junit.Before; +import org.junit.Test; + +public class PendingBlockCacheTest { + + private static final Bytes NODE_ID = Bytes.of(0); + private static final Bytes NODE_ID_2 = Bytes.of(1); + + private BlockDataGenerator gen; + + private PendingBlockCache pendingBlockCache; + + private Block parentBlock; + + @Before + public void setup() { + gen = new BlockDataGenerator(); + parentBlock = gen.block(); + pendingBlockCache = new PendingBlockCache(2); + } + + @Test + public void shouldReplaceLowestPriorityBlockWhenCacheFull() { + final ImmutablePendingBlock firstBlock = generateBlock(parentBlock, NODE_ID); + + final ImmutablePendingBlock firstBlockFromSecondNode = + generateBlock(firstBlock.block(), NODE_ID_2); + pendingBlockCache.putIfAbsent( + firstBlockFromSecondNode.block().getHash(), firstBlockFromSecondNode); + + final ImmutablePendingBlock secondBlock = generateBlock(firstBlock.block(), NODE_ID); + final ImmutablePendingBlock thirdBlock = generateBlock(firstBlock.block(), NODE_ID); + final ImmutablePendingBlock fourthBlock = generateBlock(firstBlock.block(), NODE_ID); + final ImmutablePendingBlock fifthBlock = generateBlock(parentBlock, NODE_ID); + + pendingBlockCache.putIfAbsent(thirdBlock.block().getHash(), thirdBlock); + pendingBlockCache.putIfAbsent(secondBlock.block().getHash(), secondBlock); + + // add more recent block (timestamp) + pendingBlockCache.putIfAbsent(fourthBlock.block().getHash(), fourthBlock); + assertThat(pendingBlockCache.values()) + .containsExactlyInAnyOrder(thirdBlock, fourthBlock, firstBlockFromSecondNode); + + // add a block with a number lower than the others already present + pendingBlockCache.putIfAbsent(firstBlock.block().getHash(), firstBlock); + assertThat(pendingBlockCache.values()) + .containsExactlyInAnyOrder(firstBlock, fourthBlock, firstBlockFromSecondNode); + + // add a block with a number lower than the others already present and higher timestamp + pendingBlockCache.putIfAbsent(fifthBlock.block().getHash(), fifthBlock); + assertThat(pendingBlockCache.values()) + .containsExactlyInAnyOrder(firstBlock, fifthBlock, firstBlockFromSecondNode); + } + + @Test + public void shouldNotAddAlreadyPresentBlock() { + final ImmutablePendingBlock firstBlock = generateBlock(parentBlock, NODE_ID); + final ImmutablePendingBlock secondBlock = generateBlock(parentBlock, NODE_ID); + + assertThat(pendingBlockCache.putIfAbsent(firstBlock.block().getHash(), firstBlock)).isNull(); + assertThat(pendingBlockCache.putIfAbsent(secondBlock.block().getHash(), secondBlock)).isNull(); + assertThat(pendingBlockCache.putIfAbsent(firstBlock.block().getHash(), firstBlock)) + .isEqualTo(firstBlock); + + assertThat(pendingBlockCache.values()).containsExactlyInAnyOrder(firstBlock, secondBlock); + } + + @Test + public void shouldAcceptBlockFromMultipleNode() { + final ImmutablePendingBlock firstBlockFromFirstNode = generateBlock(parentBlock, NODE_ID); + final ImmutablePendingBlock secondBlockFromFirstNode = generateBlock(parentBlock, NODE_ID); + + pendingBlockCache.putIfAbsent( + firstBlockFromFirstNode.block().getHash(), firstBlockFromFirstNode); + pendingBlockCache.putIfAbsent( + secondBlockFromFirstNode.block().getHash(), secondBlockFromFirstNode); + + final ImmutablePendingBlock firstBlockFromSecondNode = generateBlock(parentBlock, NODE_ID_2); + pendingBlockCache.putIfAbsent( + firstBlockFromSecondNode.block().getHash(), firstBlockFromSecondNode); + + assertThat(pendingBlockCache.values()) + .containsExactlyInAnyOrder( + firstBlockFromFirstNode, secondBlockFromFirstNode, firstBlockFromSecondNode); + } + + @Test + public void shouldNotAddBlockWhenCacheFullAndNewBlockNumberTooHigh() { + final ImmutablePendingBlock firstBlock = generateBlock(parentBlock, NODE_ID); + final ImmutablePendingBlock secondBlock = generateBlock(parentBlock, NODE_ID); + final ImmutablePendingBlock thirdBlock = generateBlock(secondBlock.block(), NODE_ID); + + pendingBlockCache.putIfAbsent(firstBlock.block().getHash(), firstBlock); + pendingBlockCache.putIfAbsent(secondBlock.block().getHash(), secondBlock); + pendingBlockCache.putIfAbsent(thirdBlock.block().getHash(), thirdBlock); + + assertThat(pendingBlockCache.values()).containsExactlyInAnyOrder(firstBlock, secondBlock); + } + + @Test + public void shouldNotAddBlockWhenCacheFullAndNewBlockTimestampTooLow() { + final ImmutablePendingBlock firstBlock = generateBlock(parentBlock, NODE_ID); + final ImmutablePendingBlock secondBlock = generateBlock(parentBlock, NODE_ID); + final ImmutablePendingBlock thirdBlock = generateBlock(parentBlock, NODE_ID); + + pendingBlockCache.putIfAbsent(secondBlock.block().getHash(), secondBlock); + pendingBlockCache.putIfAbsent(thirdBlock.block().getHash(), thirdBlock); + pendingBlockCache.putIfAbsent(firstBlock.block().getHash(), firstBlock); + + assertThat(pendingBlockCache.values()).containsExactlyInAnyOrder(secondBlock, thirdBlock); + } + + private ImmutablePendingBlock generateBlock(final Block parentBlock, final Bytes nodeId) { + final Block block = + gen.block(gen.nextBlockOptions(parentBlock).setTimestamp(System.currentTimeMillis())); + return ImmutablePendingBlock.builder().block(block).nodeId(nodeId).build(); + } +}