Skip to content

Commit

Permalink
Decouple TrieLogManager and CachedWorldStorageManager (#6072)
Browse files Browse the repository at this point in the history
Separate out the concepts of world state caching and trie log management.
Remove AbstractTrieLogManager and make TrieLogManager the concrete implementation.

Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
  • Loading branch information
siladu authored Oct 25, 2023
1 parent a2dbb82 commit 20f518e
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class BonsaiWorldStateProvider implements WorldStateArchive {

private final Blockchain blockchain;

private final CachedWorldStorageManager cachedWorldStorageManager;
private final TrieLogManager trieLogManager;
private final BonsaiWorldState persistedState;
private final BonsaiWorldStateKeyValueStorage worldStateStorage;
Expand Down Expand Up @@ -91,15 +92,12 @@ public BonsaiWorldStateProvider(
final ObservableMetricsSystem metricsSystem,
final BesuContext pluginContext) {

this.cachedWorldStorageManager =
new CachedWorldStorageManager(this, worldStateStorage, metricsSystem);
// TODO: de-dup constructors
this.trieLogManager =
new CachedWorldStorageManager(
this,
blockchain,
worldStateStorage,
metricsSystem,
maxLayersToLoad.orElse(RETAINED_LAYERS),
pluginContext);
new TrieLogManager(
blockchain, worldStateStorage, maxLayersToLoad.orElse(RETAINED_LAYERS), pluginContext);
this.blockchain = blockchain;
this.worldStateStorage = worldStateStorage;
this.cachedMerkleTrieLoader = cachedMerkleTrieLoader;
Expand All @@ -108,16 +106,18 @@ public BonsaiWorldStateProvider(
.getBlockHeader(persistedState.getWorldStateBlockHash())
.ifPresent(
blockHeader ->
this.trieLogManager.addCachedLayer(
this.cachedWorldStorageManager.addCachedLayer(
blockHeader, persistedState.getWorldStateRootHash(), persistedState));
}

@VisibleForTesting
BonsaiWorldStateProvider(
final CachedWorldStorageManager cachedWorldStorageManager,
final TrieLogManager trieLogManager,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final Blockchain blockchain,
final CachedMerkleTrieLoader cachedMerkleTrieLoader) {
this.cachedWorldStorageManager = cachedWorldStorageManager;
this.trieLogManager = trieLogManager;
this.blockchain = blockchain;
this.worldStateStorage = worldStateStorage;
Expand All @@ -127,13 +127,13 @@ public BonsaiWorldStateProvider(
.getBlockHeader(persistedState.getWorldStateBlockHash())
.ifPresent(
blockHeader ->
this.trieLogManager.addCachedLayer(
this.cachedWorldStorageManager.addCachedLayer(
blockHeader, persistedState.getWorldStateRootHash(), persistedState));
}

@Override
public Optional<WorldState> get(final Hash rootHash, final Hash blockHash) {
return trieLogManager
return cachedWorldStorageManager
.getWorldState(blockHash)
.or(
() -> {
Expand All @@ -148,7 +148,7 @@ public Optional<WorldState> get(final Hash rootHash, final Hash blockHash) {

@Override
public boolean isWorldStateAvailable(final Hash rootHash, final Hash blockHash) {
return trieLogManager.containWorldStateStorage(blockHash)
return cachedWorldStorageManager.containWorldStateStorage(blockHash)
|| persistedState.blockHash().equals(blockHash)
|| worldStateStorage.isWorldStateAvailable(rootHash, blockHash);
}
Expand All @@ -167,10 +167,10 @@ public Optional<MutableWorldState> getMutable(
trieLogManager.getMaxLayersToLoad());
return Optional.empty();
}
return trieLogManager
return cachedWorldStorageManager
.getWorldState(blockHeader.getHash())
.or(() -> trieLogManager.getNearestWorldState(blockHeader))
.or(() -> trieLogManager.getHeadWorldState(blockchain::getBlockHeader))
.or(() -> cachedWorldStorageManager.getNearestWorldState(blockHeader))
.or(() -> cachedWorldStorageManager.getHeadWorldState(blockchain::getBlockHeader))
.flatMap(
bonsaiWorldState ->
rollMutableStateToBlockHash(bonsaiWorldState, blockHeader.getHash()))
Expand Down Expand Up @@ -354,11 +354,15 @@ public TrieLogManager getTrieLogManager() {
return trieLogManager;
}

public CachedWorldStorageManager getCachedWorldStorageManager() {
return cachedWorldStorageManager;
}

@Override
public void resetArchiveStateTo(final BlockHeader blockHeader) {
persistedState.resetWorldStateTo(blockHeader);
this.trieLogManager.reset();
this.trieLogManager.addCachedLayer(
this.cachedWorldStorageManager.reset();
this.cachedWorldStorageManager.addCachedLayer(
blockHeader, persistedState.getWorldStateRootHash(), persistedState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,9 @@
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage.BonsaiStorageSubscriber;
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateLayerStorage;
import org.hyperledger.besu.ethereum.bonsai.trielog.AbstractTrieLogManager;
import org.hyperledger.besu.ethereum.bonsai.trielog.TrieLogFactoryImpl;
import org.hyperledger.besu.ethereum.bonsai.worldview.BonsaiWorldState;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.plugin.BesuContext;
import org.hyperledger.besu.plugin.services.TrieLogService;
import org.hyperledger.besu.plugin.services.trielogs.TrieLog;
import org.hyperledger.besu.plugin.services.trielogs.TrieLogFactory;
import org.hyperledger.besu.plugin.services.trielogs.TrieLogProvider;

import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -39,52 +31,39 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachedWorldStorageManager extends AbstractTrieLogManager
implements BonsaiStorageSubscriber {
public class CachedWorldStorageManager implements BonsaiStorageSubscriber {
public static final long RETAINED_LAYERS = 512; // at least 256 + typical rollbacks
private static final Logger LOG = LoggerFactory.getLogger(CachedWorldStorageManager.class);
private final BonsaiWorldStateProvider archive;
private final ObservableMetricsSystem metricsSystem;

CachedWorldStorageManager(
private final BonsaiWorldStateKeyValueStorage rootWorldStateStorage;
private final Map<Bytes32, CachedBonsaiWorldView> cachedWorldStatesByHash;

private CachedWorldStorageManager(
final BonsaiWorldStateProvider archive,
final Blockchain blockchain,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final long maxLayersToLoad,
final Map<Bytes32, CachedBonsaiWorldView> cachedWorldStatesByHash,
final BesuContext pluginContext,
final ObservableMetricsSystem metricsSystem) {
super(blockchain, worldStateStorage, maxLayersToLoad, cachedWorldStatesByHash, pluginContext);
worldStateStorage.subscribe(this);
this.rootWorldStateStorage = worldStateStorage;
this.cachedWorldStatesByHash = cachedWorldStatesByHash;
this.archive = archive;
this.metricsSystem = metricsSystem;
}

public CachedWorldStorageManager(
final BonsaiWorldStateProvider archive,
final Blockchain blockchain,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final ObservableMetricsSystem metricsSystem,
final long maxLayersToLoad,
final BesuContext pluginContext) {
this(
archive,
blockchain,
worldStateStorage,
maxLayersToLoad,
new ConcurrentHashMap<>(),
pluginContext,
metricsSystem);
final ObservableMetricsSystem metricsSystem) {
this(archive, worldStateStorage, new ConcurrentHashMap<>(), metricsSystem);
}

@Override
public synchronized void addCachedLayer(
final BlockHeader blockHeader,
final Hash worldStateRootHash,
Expand Down Expand Up @@ -132,7 +111,20 @@ public synchronized void addCachedLayer(
scrubCachedLayers(blockHeader.getNumber());
}

@Override
private synchronized void scrubCachedLayers(final long newMaxHeight) {
if (cachedWorldStatesByHash.size() > RETAINED_LAYERS) {
final long waterline = newMaxHeight - RETAINED_LAYERS;
cachedWorldStatesByHash.values().stream()
.filter(layer -> layer.getBlockNumber() < waterline)
.toList()
.forEach(
layer -> {
cachedWorldStatesByHash.remove(layer.getBlockHash());
layer.close();
});
}
}

public Optional<BonsaiWorldState> getWorldState(final Hash blockHash) {
if (cachedWorldStatesByHash.containsKey(blockHash)) {
// return a new worldstate using worldstate storage and an isolated copy of the updater
Expand All @@ -150,7 +142,6 @@ public Optional<BonsaiWorldState> getWorldState(final Hash blockHash) {
return Optional.empty();
}

@Override
public Optional<BonsaiWorldState> getNearestWorldState(final BlockHeader blockHeader) {
LOG.atDebug()
.setMessage("getting nearest worldstate for {}")
Expand Down Expand Up @@ -183,7 +174,6 @@ public Optional<BonsaiWorldState> getNearestWorldState(final BlockHeader blockHe
archive, new BonsaiWorldStateLayerStorage(storage)));
}

@Override
public Optional<BonsaiWorldState> getHeadWorldState(
final Function<Hash, Optional<BlockHeader>> hashBlockHeaderFunction) {

Expand All @@ -203,7 +193,10 @@ public Optional<BonsaiWorldState> getHeadWorldState(
});
}

@Override
public boolean containWorldStateStorage(final Hash blockHash) {
return cachedWorldStatesByHash.containsKey(blockHash);
}

public void reset() {
this.cachedWorldStatesByHash.clear();
}
Expand All @@ -227,76 +220,4 @@ public void onClearTrieLog() {
public void onCloseStorage() {
this.cachedWorldStatesByHash.clear();
}

@VisibleForTesting
@Override
protected TrieLogFactory setupTrieLogFactory(final BesuContext pluginContext) {
// if we have a TrieLogService from pluginContext, use it.
var trieLogServicez =
Optional.ofNullable(pluginContext)
.flatMap(context -> context.getService(TrieLogService.class));

if (trieLogServicez.isPresent()) {
var trieLogService = trieLogServicez.get();
// push the TrieLogProvider into the TrieLogService
trieLogService.configureTrieLogProvider(getTrieLogProvider());

// configure plugin observers:
trieLogService.getObservers().forEach(trieLogObservers::subscribe);

// return the TrieLogFactory implementation from the TrieLogService
return trieLogService.getTrieLogFactory();
} else {
// Otherwise default to TrieLogFactoryImpl
return new TrieLogFactoryImpl();
}
}

@VisibleForTesting
TrieLogProvider getTrieLogProvider() {
return new TrieLogProvider() {
@Override
public Optional<TrieLog> getTrieLogLayer(final Hash blockHash) {
return CachedWorldStorageManager.this.getTrieLogLayer(blockHash);
}

@Override
public Optional<TrieLog> getTrieLogLayer(final long blockNumber) {
return CachedWorldStorageManager.this
.blockchain
.getBlockHeader(blockNumber)
.map(BlockHeader::getHash)
.flatMap(CachedWorldStorageManager.this::getTrieLogLayer);
}

@Override
public List<TrieLogRangeTuple> getTrieLogsByRange(
final long fromBlockNumber, final long toBlockNumber) {
return rangeAsStream(fromBlockNumber, toBlockNumber)
.map(blockchain::getBlockHeader)
.map(
headerOpt ->
headerOpt.flatMap(
header ->
CachedWorldStorageManager.this
.getTrieLogLayer(header.getBlockHash())
.map(
layer ->
new TrieLogRangeTuple(
header.getBlockHash(), header.getNumber(), layer))))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
}

Stream<Long> rangeAsStream(final long fromBlockNumber, final long toBlockNumber) {
if (Math.abs(toBlockNumber - fromBlockNumber) > LOG_RANGE_LIMIT) {
throw new IllegalArgumentException("Requested Range too large");
}
long left = Math.min(fromBlockNumber, toBlockNumber);
long right = Math.max(fromBlockNumber, toBlockNumber);
return LongStream.range(left, right).boxed();
}
};
}
}
Loading

0 comments on commit 20f518e

Please sign in to comment.