Skip to content

Commit

Permalink
Add cache limit for pending blocks (hyperledger#1406)
Browse files Browse the repository at this point in the history
If a peer exceeds the authorized number of pending blocks, Besu will replace the lowest priority block in the cache from this peer by this new one until the local node sync a new block and maybe purges one of the blocks of this peer.

The highest priority blocks are those that are lowest in block height and then higher priority if they were sent more recently.

Other peers will not be impacted and will be able to continue sending pending blocks.

The cache size limit is the distance between the minimum and maximum value of the BlockPropagationRange parameter. Besu automatically purges blocks outside this range.

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
  • Loading branch information
matkt authored Oct 14, 2020
1 parent 75d6016 commit bbcc438
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ public BlockHeader header(final long number, final BlockBody body, final BlockOp
final int gasLimit = random.nextInt() & Integer.MAX_VALUE;
final int gasUsed = Math.max(0, gasLimit - 1);
final long blockNonce = random.nextLong();

return BlockHeaderBuilder.create()
.parentHash(options.getParentHash(hash()))
.ommersHash(BodyValidation.ommersHash(body.getOmmers()))
Expand All @@ -277,7 +278,10 @@ public BlockHeader header(final long number, final BlockBody body, final BlockOp
.number(number)
.gasLimit(gasLimit)
.gasUsed(options.getGasUsed(gasUsed))
.timestamp(Instant.now().truncatedTo(ChronoUnit.SECONDS).getEpochSecond())
.timestamp(
options
.getTimestamp()
.orElse(Instant.now().truncatedTo(ChronoUnit.SECONDS).getEpochSecond()))
.extraData(options.getExtraData(bytes32()))
.mixHash(hash())
.nonce(blockNonce)
Expand Down Expand Up @@ -511,6 +515,7 @@ public static class BlockOptions {
private Optional<Hash> receiptsRoot = Optional.empty();
private Optional<Long> gasUsed = Optional.empty();
private Optional<LogsBloomFilter> logsBloom = Optional.empty();
private Optional<Long> timestamp = Optional.empty();
private boolean hasOmmers = true;
private boolean hasTransactions = true;

Expand Down Expand Up @@ -562,6 +567,10 @@ public LogsBloomFilter getLogsBloom(final LogsBloomFilter defaultValue) {
return logsBloom.orElse(defaultValue);
}

public Optional<Long> getTimestamp() {
return timestamp;
}

public boolean hasTransactions() {
return hasTransactions;
}
Expand Down Expand Up @@ -638,5 +647,10 @@ public BlockOptions hasOmmers(final boolean hasOmmers) {
this.hasOmmers = hasOmmers;
return this;
}

public BlockOptions setTimestamp(final Long timestamp) {
this.timestamp = Optional.of(timestamp);
return this;
}
}
}
3 changes: 3 additions & 0 deletions ethereum/eth/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ dependencies {
implementation 'org.apache.tuweni:tuweni-bytes'
implementation 'org.apache.tuweni:tuweni-units'

annotationProcessor "org.immutables:value"
implementation "org.immutables:value-annotations"

testImplementation project(':config')
testImplementation project(path: ':config', configuration: 'testSupportArtifacts')
testImplementation project(':crypto')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage.NewBlockHash;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocks;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.tasks.PersistBlockTask;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
Expand All @@ -58,6 +58,7 @@
import com.google.common.collect.Range;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;

public class BlockPropagationManager {
private static final Logger LOG = LogManager.getLogger();
Expand All @@ -74,15 +75,15 @@ public class BlockPropagationManager {

private final Set<Hash> requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Hash> importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final PendingBlocks pendingBlocks;
private final PendingBlocksManager pendingBlocksManager;

BlockPropagationManager(
final SynchronizerConfiguration config,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final PendingBlocks pendingBlocks,
final PendingBlocksManager pendingBlocksManager,
final MetricsSystem metricsSystem,
final BlockBroadcaster blockBroadcaster) {
this.config = config;
Expand All @@ -92,7 +93,7 @@ public class BlockPropagationManager {
this.metricsSystem = metricsSystem;
this.blockBroadcaster = blockBroadcaster;
this.syncState = syncState;
this.pendingBlocks = pendingBlocks;
this.pendingBlocksManager = pendingBlocksManager;
}

public void start() {
Expand All @@ -117,12 +118,12 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
final Block newBlock = blockAddedEvent.getBlock();

final List<Block> readyForImport;
synchronized (pendingBlocks) {
synchronized (pendingBlocksManager) {
// Remove block from pendingBlocks list
pendingBlocks.deregisterPendingBlock(newBlock);
pendingBlocksManager.deregisterPendingBlock(newBlock);

// Import any pending blocks that are children of the newly added block
readyForImport = pendingBlocks.childrenOf(newBlock.getHash());
readyForImport = pendingBlocksManager.childrenOf(newBlock.getHash());
}

if (!readyForImport.isEmpty()) {
Expand All @@ -148,7 +149,7 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocks.purgeBlocksOlderThan(cutoff);
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
}

Expand All @@ -168,14 +169,14 @@ private void handleNewBlockFromNetwork(final EthMessage message) {
block.getHeader().getNumber(), localChainHeight, bestChainHeight)) {
return;
}
if (pendingBlocks.contains(block.getHash())) {
if (pendingBlocksManager.contains(block.getHash())) {
return;
}
if (blockchain.contains(block.getHash())) {
return;
}

importOrSavePendingBlock(block);
importOrSavePendingBlock(block, message.getPeer().nodeId());
} catch (final RLPException e) {
LOG.debug(
"Malformed NEW_BLOCK message received from peer, disconnecting: {}",
Expand Down Expand Up @@ -212,7 +213,7 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) {
if (requestedBlocks.contains(announcedBlock.hash())) {
continue;
}
if (pendingBlocks.contains(announcedBlock.hash())) {
if (pendingBlocksManager.contains(announcedBlock.hash())) {
continue;
}
if (importingBlocks.contains(announcedBlock.hash())) {
Expand Down Expand Up @@ -247,7 +248,9 @@ private CompletableFuture<Block> processAnnouncedBlock(
protocolSchedule, ethContext, newBlock.hash(), newBlock.number(), metricsSystem)
.assignPeer(peer);

return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult()));
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), peer.nodeId()));
}

private void broadcastBlock(final Block block, final BlockHeader parent) {
Expand All @@ -261,14 +264,14 @@ private void broadcastBlock(final Block block, final BlockHeader parent) {
}

@VisibleForTesting
CompletableFuture<Block> importOrSavePendingBlock(final Block block) {
CompletableFuture<Block> importOrSavePendingBlock(final Block block, final Bytes nodeId) {
// Synchronize to avoid race condition where block import event fires after the
// blockchain.contains() check and before the block is registered, causing onBlockAdded() to be
// invoked for the parent of this block before we are able to register it.
synchronized (pendingBlocks) {
synchronized (pendingBlocksManager) {
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (pendingBlocks.registerPendingBlock(block)) {
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info(
"Saving announced block {} ({}) for future import",
block.getHeader().getNumber(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncException;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocks;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.Pruner;
Expand Down Expand Up @@ -83,7 +83,7 @@ public DefaultSynchronizer(
protocolContext,
ethContext,
syncState,
new PendingBlocks(),
new PendingBlocksManager(syncConfig),
metricsSystem,
blockBroadcaster);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.ImmutablePendingBlock;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.PendingBlockCache;

import java.util.Collections;
import java.util.List;
Expand All @@ -27,35 +30,49 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class PendingBlocks {
import org.apache.tuweni.bytes.Bytes;

public class PendingBlocksManager {

private final PendingBlockCache pendingBlocks;

private final Map<Hash, Block> pendingBlocks = new ConcurrentHashMap<>();
private final Map<Hash, Set<Hash>> pendingBlocksByParentHash = new ConcurrentHashMap<>();

public PendingBlocksManager(final SynchronizerConfiguration synchronizerConfiguration) {

pendingBlocks =
new PendingBlockCache(
(Math.abs(synchronizerConfiguration.getBlockPropagationRange().lowerEndpoint())
+ Math.abs(synchronizerConfiguration.getBlockPropagationRange().upperEndpoint())));
}

/**
* Track the given block.
*
* @param pendingBlock the block to track
* @param block the block to track
* @param nodeId node that sent the block
* @return true if the block was added (was not previously present)
*/
public boolean registerPendingBlock(final Block pendingBlock) {
final Block previousValue =
this.pendingBlocks.putIfAbsent(pendingBlock.getHash(), pendingBlock);
public boolean registerPendingBlock(final Block block, final Bytes nodeId) {

final ImmutablePendingBlock previousValue =
this.pendingBlocks.putIfAbsent(
block.getHash(), ImmutablePendingBlock.builder().block(block).nodeId(nodeId).build());
if (previousValue != null) {
return false;
}

pendingBlocksByParentHash
.computeIfAbsent(
pendingBlock.getHeader().getParentHash(),
block.getHeader().getParentHash(),
h -> {
final Set<Hash> set = newSetFromMap(new ConcurrentHashMap<>());
// Go ahead and add our value at construction, so that we don't set an empty set which
// could be removed in deregisterPendingBlock
set.add(pendingBlock.getHash());
set.add(block.getHash());
return set;
})
.add(pendingBlock.getHash());
.add(block.getHash());

return true;
}
Expand All @@ -68,7 +85,7 @@ public boolean registerPendingBlock(final Block pendingBlock) {
*/
public boolean deregisterPendingBlock(final Block block) {
final Hash parentHash = block.getHeader().getParentHash();
final Block removed = pendingBlocks.remove(block.getHash());
final ImmutablePendingBlock removed = pendingBlocks.remove(block.getHash());
final Set<Hash> blocksForParent = pendingBlocksByParentHash.get(parentHash);
if (blocksForParent != null) {
blocksForParent.remove(block.getHash());
Expand All @@ -79,7 +96,8 @@ public boolean deregisterPendingBlock(final Block block) {

public void purgeBlocksOlderThan(final long blockNumber) {
pendingBlocks.values().stream()
.filter(b -> b.getHeader().getNumber() < blockNumber)
.filter(b -> b.block().getHeader().getNumber() < blockNumber)
.map(ImmutablePendingBlock::block)
.forEach(this::deregisterPendingBlock);
}

Expand All @@ -95,6 +113,7 @@ public List<Block> childrenOf(final Hash parentBlock) {
return blocksByParent.stream()
.map(pendingBlocks::get)
.filter(Objects::nonNull)
.map(ImmutablePendingBlock::block)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.state.cache;

import org.hyperledger.besu.ethereum.core.Block;

import org.apache.tuweni.bytes.Bytes;
import org.immutables.value.Value;

@Value.Immutable
public interface PendingBlock {

Block block();

Bytes nodeId();
}
Loading

0 comments on commit bbcc438

Please sign in to comment.