Skip to content

Commit

Permalink
Cache last blocks data (block headers, block bodies, transactions' re…
Browse files Browse the repository at this point in the history
…ceipts and total difficulty) (hyperledger#6009)

* Add a flag --cache-last-blocks to cache last n blocks, The default value is 0

Signed-off-by: Ameziane H <ameziane.hamlat@consensys.net>
  • Loading branch information
ahamlat authored and Gabriel-Trintinalia committed Oct 20, 2023
1 parent 3ad04f8 commit 637e90d
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 13 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Next Release
## Next release
- Cache last n blocks by using a new Besu flag --cache-last-blocks=n

### Breaking Changes

Expand Down
8 changes: 7 additions & 1 deletion besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,11 @@ static class PermissionsOptionGroup {
"Specifies the maximum number of blocks to retrieve logs from via RPC. Must be >=0. 0 specifies no limit (default: ${DEFAULT-VALUE})")
private final Long rpcMaxLogsRange = 5000L;

@CommandLine.Option(
names = {"--cache-last-blocks"},
description = "Specifies the number of last blocks to cache (default: ${DEFAULT-VALUE})")
private final Integer numberOfblocksToCache = 0;

@Mixin private P2PTLSConfigOptions p2pTLSConfigOptions;

@Mixin private PkiBlockCreationOptions pkiBlockCreationOptions;
Expand Down Expand Up @@ -2290,7 +2295,8 @@ public BesuControllerBuilder getControllerBuilder() {
.lowerBoundPeers(peersLowerBound)
.maxRemotelyInitiatedPeers(maxRemoteInitiatedPeers)
.randomPeerPriority(p2PDiscoveryOptionGroup.randomPeerPriority)
.chainPruningConfiguration(unstableChainPruningOptions.toDomainObject());
.chainPruningConfiguration(unstableChainPruningOptions.toDomainObject())
.cacheLastBlocks(numberOfblocksToCache);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides

private PluginTransactionValidatorFactory pluginTransactionValidatorFactory;

private int numberOfBlocksToCache = 0;

/**
* Provide a BesuComponent which can be used to get other dependencies
*
Expand Down Expand Up @@ -505,6 +507,17 @@ public BesuControllerBuilder chainPruningConfiguration(
return this;
}

/**
* Chain pruning configuration besu controller builder.
*
* @param numberOfBlocksToCache the number of blocks to cache
* @return the besu controller builder
*/
public BesuControllerBuilder cacheLastBlocks(final Integer numberOfBlocksToCache) {
this.numberOfBlocksToCache = numberOfBlocksToCache;
return this;
}

/**
* sets the networkConfiguration in the builder
*
Expand Down Expand Up @@ -592,7 +605,8 @@ public BesuController build() {
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory.toString());
dataDirectory.toString(),
numberOfBlocksToCache);

final CachedMerkleTrieLoader cachedMerkleTrieLoader =
besuComponent
Expand Down
12 changes: 12 additions & 0 deletions besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5585,4 +5585,16 @@ public void snapsyncForHealingFeaturesShouldFailWhenHealingIsNotEnabled() {
.contains(
"--Xsnapsync-synchronizer-flat option can only be used when -Xsnapsync-synchronizer-flat-db-healing-enabled is true");
}

@Test
public void cacheLastBlocksOptionShouldWork() {
int numberOfBlocksToCache = 512;
parseCommand("--cache-last-blocks", String.valueOf(numberOfBlocksToCache));
verify(mockControllerBuilder).cacheLastBlocks(intArgumentCaptor.capture());
verify(mockControllerBuilder).build();

assertThat(intArgumentCaptor.getValue()).isEqualTo(numberOfBlocksToCache);
assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public void initMocks() throws Exception {
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.besuComponent(any(BesuComponent.class)))
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.cacheLastBlocks(any())).thenReturn(mockControllerBuilder);

// doReturn used because of generic BesuController
doReturn(mockController).when(mockControllerBuilder).build();
lenient().when(mockController.getProtocolManager()).thenReturn(mockEthProtocolManager);
Expand Down
3 changes: 2 additions & 1 deletion besu/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ rpc-http-max-batch-size=1
rpc-http-max-request-content-length = 5242880
rpc-max-logs-range=100
json-pretty-print-enabled=false
cache-last-blocks=512

# PRIVACY TLS
privacy-tls-enabled=false
Expand Down Expand Up @@ -226,4 +227,4 @@ Xp2p-tls-crl-file="none.file"
Xp2p-tls-clienthello-sni=false

#contracts
Xevm-jumpdest-cache-weight-kb=32000
Xevm-jumpdest-cache-weight-kb=32000
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.InvalidConfigurationException;
import org.hyperledger.besu.util.Subscribers;
Expand All @@ -49,8 +50,11 @@
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import io.prometheus.client.guava.cache.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -73,20 +77,27 @@ public class DefaultBlockchain implements MutableBlockchain {

private Comparator<BlockHeader> blockChoiceRule;

private final int numberOfBlocksToCache;
private final Optional<Cache<Hash, BlockHeader>> blockHeadersCache;
private final Optional<Cache<Hash, BlockBody>> blockBodiesCache;
private final Optional<Cache<Hash, List<TransactionReceipt>>> transactionReceiptsCache;
private final Optional<Cache<Hash, Difficulty>> totalDifficultyCache;

private DefaultBlockchain(
final Optional<Block> genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold) {
this(genesisBlock, blockchainStorage, metricsSystem, reorgLoggingThreshold, null);
this(genesisBlock, blockchainStorage, metricsSystem, reorgLoggingThreshold, null, 0);
}

private DefaultBlockchain(
final Optional<Block> genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory) {
final String dataDirectory,
final int numberOfBlocksToCache) {
checkNotNull(genesisBlock);
checkNotNull(blockchainStorage);
checkNotNull(metricsSystem);
Expand Down Expand Up @@ -144,6 +155,34 @@ private DefaultBlockchain(

this.reorgLoggingThreshold = reorgLoggingThreshold;
this.blockChoiceRule = heaviestChainBlockChoiceRule;
this.numberOfBlocksToCache = numberOfBlocksToCache;

if (numberOfBlocksToCache != 0) {
blockHeadersCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
blockBodiesCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
transactionReceiptsCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
totalDifficultyCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
CacheMetricsCollector cacheMetrics = new CacheMetricsCollector();
cacheMetrics.addCache("blockHeaders", blockHeadersCache.get());
cacheMetrics.addCache("blockBodies", blockBodiesCache.get());
cacheMetrics.addCache("transactionReceipts", transactionReceiptsCache.get());
cacheMetrics.addCache("totalDifficulty", totalDifficultyCache.get());
if (metricsSystem instanceof PrometheusMetricsSystem prometheusMetricsSystem)
prometheusMetricsSystem.addCollector(BesuMetricCategory.BLOCKCHAIN, () -> cacheMetrics);
} else {
blockHeadersCache = Optional.empty();
blockBodiesCache = Optional.empty();
transactionReceiptsCache = Optional.empty();
totalDifficultyCache = Optional.empty();
}
}

public static MutableBlockchain createMutable(
Expand All @@ -153,7 +192,12 @@ public static MutableBlockchain createMutable(
final long reorgLoggingThreshold) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock), blockchainStorage, metricsSystem, reorgLoggingThreshold);
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
null,
0);
}

public static MutableBlockchain createMutable(
Expand All @@ -168,7 +212,25 @@ public static MutableBlockchain createMutable(
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory);
dataDirectory,
0);
}

public static MutableBlockchain createMutable(
final Block genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory,
final int numberOfBlocksToCache) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory,
numberOfBlocksToCache);
}

public static Blockchain create(
Expand Down Expand Up @@ -227,22 +289,37 @@ public Block getChainHeadBlock() {

@Override
public Optional<BlockHeader> getBlockHeader(final long blockNumber) {
return blockchainStorage.getBlockHash(blockNumber).flatMap(blockchainStorage::getBlockHeader);
return blockchainStorage.getBlockHash(blockNumber).flatMap(this::getBlockHeader);
}

@Override
public Optional<BlockHeader> getBlockHeader(final Hash blockHeaderHash) {
return blockchainStorage.getBlockHeader(blockHeaderHash);
return blockHeadersCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockHeader(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockHeader(blockHeaderHash));
}

@Override
public Optional<BlockBody> getBlockBody(final Hash blockHeaderHash) {
return blockchainStorage.getBlockBody(blockHeaderHash);
return blockBodiesCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockBody(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockBody(blockHeaderHash));
}

@Override
public Optional<List<TransactionReceipt>> getTxReceipts(final Hash blockHeaderHash) {
return blockchainStorage.getTransactionReceipts(blockHeaderHash);
return transactionReceiptsCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getTransactionReceipts(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getTransactionReceipts(blockHeaderHash));
}

@Override
Expand All @@ -252,7 +329,12 @@ public Optional<Hash> getBlockHashByNumber(final long number) {

@Override
public Optional<Difficulty> getTotalDifficultyByHash(final Hash blockHeaderHash) {
return blockchainStorage.getTotalDifficulty(blockHeaderHash);
return totalDifficultyCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getTotalDifficulty(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getTotalDifficulty(blockHeaderHash));
}

@Override
Expand Down Expand Up @@ -283,14 +365,24 @@ public void setBlockChoiceRule(final Comparator<BlockHeader> blockChoiceRule) {

@Override
public synchronized void appendBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), false);
}

@Override
public synchronized void storeBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), true);
}

private void cacheBlockData(final Block block, final List<TransactionReceipt> receipts) {
blockHeadersCache.ifPresent(cache -> cache.put(block.getHash(), block.getHeader()));
blockBodiesCache.ifPresent(cache -> cache.put(block.getHash(), block.getBody()));
transactionReceiptsCache.ifPresent(cache -> cache.put(block.getHash(), receipts));
totalDifficultyCache.ifPresent(
cache -> cache.put(block.getHash(), block.getHeader().getDifficulty()));
}

private boolean blockShouldBeProcessed(
final Block block, final List<TransactionReceipt> receipts) {
checkArgument(
Expand Down Expand Up @@ -768,4 +860,20 @@ int observerCount() {
private void notifyChainReorgBlockAdded(final BlockWithReceipts blockWithReceipts) {
blockReorgObservers.forEach(observer -> observer.onBlockAdded(blockWithReceipts, this));
}

public Optional<Cache<Hash, BlockHeader>> getBlockHeadersCache() {
return blockHeadersCache;
}

public Optional<Cache<Hash, BlockBody>> getBlockBodiesCache() {
return blockBodiesCache;
}

public Optional<Cache<Hash, List<TransactionReceipt>>> getTransactionReceiptsCache() {
return transactionReceiptsCache;
}

public Optional<Cache<Hash, Difficulty>> getTotalDifficultyCache() {
return totalDifficultyCache;
}
}
Loading

0 comments on commit 637e90d

Please sign in to comment.