Skip to content

Commit

Permalink
Cache last blocks data (block headers, block bodies, transactions' re…
Browse files Browse the repository at this point in the history
…ceipts and total difficulty) (#6009)

* Add a flag --cache-last-blocks to cache last n blocks, The default value is 0

Signed-off-by: Ameziane H <ameziane.hamlat@consensys.net>
  • Loading branch information
ahamlat authored Oct 13, 2023
1 parent 8507bb7 commit e0b3316
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 13 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Next Release
## Next release
- Cache last n blocks by using a new Besu flag --cache-last-blocks=n

### Breaking Changes

Expand Down
8 changes: 7 additions & 1 deletion besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,11 @@ static class PermissionsOptionGroup {
"Specifies the maximum number of blocks to retrieve logs from via RPC. Must be >=0. 0 specifies no limit (default: ${DEFAULT-VALUE})")
private final Long rpcMaxLogsRange = 5000L;

@CommandLine.Option(
names = {"--cache-last-blocks"},
description = "Specifies the number of last blocks to cache (default: ${DEFAULT-VALUE})")
private final Integer numberOfblocksToCache = 0;

@Mixin private P2PTLSConfigOptions p2pTLSConfigOptions;

@Mixin private PkiBlockCreationOptions pkiBlockCreationOptions;
Expand Down Expand Up @@ -2290,7 +2295,8 @@ public BesuControllerBuilder getControllerBuilder() {
.lowerBoundPeers(peersLowerBound)
.maxRemotelyInitiatedPeers(maxRemoteInitiatedPeers)
.randomPeerPriority(p2PDiscoveryOptionGroup.randomPeerPriority)
.chainPruningConfiguration(unstableChainPruningOptions.toDomainObject());
.chainPruningConfiguration(unstableChainPruningOptions.toDomainObject())
.cacheLastBlocks(numberOfblocksToCache);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides

private PluginTransactionValidatorFactory pluginTransactionValidatorFactory;

private int numberOfBlocksToCache = 0;

/**
* Provide a BesuComponent which can be used to get other dependencies
*
Expand Down Expand Up @@ -505,6 +507,17 @@ public BesuControllerBuilder chainPruningConfiguration(
return this;
}

/**
* Chain pruning configuration besu controller builder.
*
* @param numberOfBlocksToCache the number of blocks to cache
* @return the besu controller builder
*/
public BesuControllerBuilder cacheLastBlocks(final Integer numberOfBlocksToCache) {
this.numberOfBlocksToCache = numberOfBlocksToCache;
return this;
}

/**
* sets the networkConfiguration in the builder
*
Expand Down Expand Up @@ -592,7 +605,8 @@ public BesuController build() {
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory.toString());
dataDirectory.toString(),
numberOfBlocksToCache);

final CachedMerkleTrieLoader cachedMerkleTrieLoader =
besuComponent
Expand Down
12 changes: 12 additions & 0 deletions besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5585,4 +5585,16 @@ public void snapsyncForHealingFeaturesShouldFailWhenHealingIsNotEnabled() {
.contains(
"--Xsnapsync-synchronizer-flat option can only be used when -Xsnapsync-synchronizer-flat-db-healing-enabled is true");
}

@Test
public void cacheLastBlocksOptionShouldWork() {
int numberOfBlocksToCache = 512;
parseCommand("--cache-last-blocks", String.valueOf(numberOfBlocksToCache));
verify(mockControllerBuilder).cacheLastBlocks(intArgumentCaptor.capture());
verify(mockControllerBuilder).build();

assertThat(intArgumentCaptor.getValue()).isEqualTo(numberOfBlocksToCache);
assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public void initMocks() throws Exception {
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.besuComponent(any(BesuComponent.class)))
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.cacheLastBlocks(any())).thenReturn(mockControllerBuilder);

// doReturn used because of generic BesuController
doReturn(mockController).when(mockControllerBuilder).build();
lenient().when(mockController.getProtocolManager()).thenReturn(mockEthProtocolManager);
Expand Down
3 changes: 2 additions & 1 deletion besu/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ rpc-http-max-batch-size=1
rpc-http-max-request-content-length = 5242880
rpc-max-logs-range=100
json-pretty-print-enabled=false
cache-last-blocks=512

# PRIVACY TLS
privacy-tls-enabled=false
Expand Down Expand Up @@ -226,4 +227,4 @@ Xp2p-tls-crl-file="none.file"
Xp2p-tls-clienthello-sni=false

#contracts
Xevm-jumpdest-cache-weight-kb=32000
Xevm-jumpdest-cache-weight-kb=32000
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.InvalidConfigurationException;
import org.hyperledger.besu.util.Subscribers;
Expand All @@ -49,8 +50,11 @@
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import io.prometheus.client.guava.cache.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -73,20 +77,27 @@ public class DefaultBlockchain implements MutableBlockchain {

private Comparator<BlockHeader> blockChoiceRule;

private final int numberOfBlocksToCache;
private final Optional<Cache<Hash, BlockHeader>> blockHeadersCache;
private final Optional<Cache<Hash, BlockBody>> blockBodiesCache;
private final Optional<Cache<Hash, List<TransactionReceipt>>> transactionReceiptsCache;
private final Optional<Cache<Hash, Difficulty>> totalDifficultyCache;

private DefaultBlockchain(
final Optional<Block> genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold) {
this(genesisBlock, blockchainStorage, metricsSystem, reorgLoggingThreshold, null);
this(genesisBlock, blockchainStorage, metricsSystem, reorgLoggingThreshold, null, 0);
}

private DefaultBlockchain(
final Optional<Block> genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory) {
final String dataDirectory,
final int numberOfBlocksToCache) {
checkNotNull(genesisBlock);
checkNotNull(blockchainStorage);
checkNotNull(metricsSystem);
Expand Down Expand Up @@ -144,6 +155,34 @@ private DefaultBlockchain(

this.reorgLoggingThreshold = reorgLoggingThreshold;
this.blockChoiceRule = heaviestChainBlockChoiceRule;
this.numberOfBlocksToCache = numberOfBlocksToCache;

if (numberOfBlocksToCache != 0) {
blockHeadersCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
blockBodiesCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
transactionReceiptsCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
totalDifficultyCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
CacheMetricsCollector cacheMetrics = new CacheMetricsCollector();
cacheMetrics.addCache("blockHeaders", blockHeadersCache.get());
cacheMetrics.addCache("blockBodies", blockBodiesCache.get());
cacheMetrics.addCache("transactionReceipts", transactionReceiptsCache.get());
cacheMetrics.addCache("totalDifficulty", totalDifficultyCache.get());
if (metricsSystem instanceof PrometheusMetricsSystem prometheusMetricsSystem)
prometheusMetricsSystem.addCollector(BesuMetricCategory.BLOCKCHAIN, () -> cacheMetrics);
} else {
blockHeadersCache = Optional.empty();
blockBodiesCache = Optional.empty();
transactionReceiptsCache = Optional.empty();
totalDifficultyCache = Optional.empty();
}
}

public static MutableBlockchain createMutable(
Expand All @@ -153,7 +192,12 @@ public static MutableBlockchain createMutable(
final long reorgLoggingThreshold) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock), blockchainStorage, metricsSystem, reorgLoggingThreshold);
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
null,
0);
}

public static MutableBlockchain createMutable(
Expand All @@ -168,7 +212,25 @@ public static MutableBlockchain createMutable(
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory);
dataDirectory,
0);
}

public static MutableBlockchain createMutable(
final Block genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory,
final int numberOfBlocksToCache) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory,
numberOfBlocksToCache);
}

public static Blockchain create(
Expand Down Expand Up @@ -227,22 +289,37 @@ public Block getChainHeadBlock() {

@Override
public Optional<BlockHeader> getBlockHeader(final long blockNumber) {
return blockchainStorage.getBlockHash(blockNumber).flatMap(blockchainStorage::getBlockHeader);
return blockchainStorage.getBlockHash(blockNumber).flatMap(this::getBlockHeader);
}

@Override
public Optional<BlockHeader> getBlockHeader(final Hash blockHeaderHash) {
return blockchainStorage.getBlockHeader(blockHeaderHash);
return blockHeadersCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockHeader(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockHeader(blockHeaderHash));
}

@Override
public Optional<BlockBody> getBlockBody(final Hash blockHeaderHash) {
return blockchainStorage.getBlockBody(blockHeaderHash);
return blockBodiesCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockBody(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockBody(blockHeaderHash));
}

@Override
public Optional<List<TransactionReceipt>> getTxReceipts(final Hash blockHeaderHash) {
return blockchainStorage.getTransactionReceipts(blockHeaderHash);
return transactionReceiptsCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getTransactionReceipts(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getTransactionReceipts(blockHeaderHash));
}

@Override
Expand All @@ -252,7 +329,12 @@ public Optional<Hash> getBlockHashByNumber(final long number) {

@Override
public Optional<Difficulty> getTotalDifficultyByHash(final Hash blockHeaderHash) {
return blockchainStorage.getTotalDifficulty(blockHeaderHash);
return totalDifficultyCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getTotalDifficulty(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getTotalDifficulty(blockHeaderHash));
}

@Override
Expand Down Expand Up @@ -283,14 +365,24 @@ public void setBlockChoiceRule(final Comparator<BlockHeader> blockChoiceRule) {

@Override
public synchronized void appendBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), false);
}

@Override
public synchronized void storeBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), true);
}

private void cacheBlockData(final Block block, final List<TransactionReceipt> receipts) {
blockHeadersCache.ifPresent(cache -> cache.put(block.getHash(), block.getHeader()));
blockBodiesCache.ifPresent(cache -> cache.put(block.getHash(), block.getBody()));
transactionReceiptsCache.ifPresent(cache -> cache.put(block.getHash(), receipts));
totalDifficultyCache.ifPresent(
cache -> cache.put(block.getHash(), block.getHeader().getDifficulty()));
}

private boolean blockShouldBeProcessed(
final Block block, final List<TransactionReceipt> receipts) {
checkArgument(
Expand Down Expand Up @@ -768,4 +860,20 @@ int observerCount() {
private void notifyChainReorgBlockAdded(final BlockWithReceipts blockWithReceipts) {
blockReorgObservers.forEach(observer -> observer.onBlockAdded(blockWithReceipts, this));
}

public Optional<Cache<Hash, BlockHeader>> getBlockHeadersCache() {
return blockHeadersCache;
}

public Optional<Cache<Hash, BlockBody>> getBlockBodiesCache() {
return blockBodiesCache;
}

public Optional<Cache<Hash, List<TransactionReceipt>>> getTransactionReceiptsCache() {
return transactionReceiptsCache;
}

public Optional<Cache<Hash, Difficulty>> getTotalDifficultyCache() {
return totalDifficultyCache;
}
}
Loading

0 comments on commit e0b3316

Please sign in to comment.