Skip to content

Commit

Permalink
[PAN-2819] [PAN-2820] Mark Sweep Pruner
Browse files Browse the repository at this point in the history
* Mark Sweep Pruner 
* add `unload` method on Node interface, which is a noop everywhere but on `StoredNode`
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
  • Loading branch information
RatanRSur authored Aug 7, 2019
1 parent 898f616 commit b20147c
Show file tree
Hide file tree
Showing 23 changed files with 697 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.pantheon.ethereum.privacy.PrivateTransactionStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStatePreimageStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;

import java.io.Closeable;

Expand All @@ -32,4 +33,6 @@ public interface StorageProvider extends Closeable {
PrivateTransactionStorage createPrivateTransactionStorage();

PrivateStateStorage createPrivateStateStorage();

KeyValueStorage createPruningStorage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public PrivateStateStorage createPrivateStateStorage() {
return new PrivateStateKeyValueStorage(privateStateStorage);
}

@Override
public KeyValueStorage createPruningStorage() {
return pruningStorage;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.worldstate;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.trie.StoredMerklePatriciaTrie;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage.Transaction;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class MarkSweepPruner {
private static final Logger LOG = LogManager.getLogger();
private static final BytesValue IN_USE = BytesValue.of(1);
private static final int MARKS_PER_TRANSACTION = 1000;
private final WorldStateStorage worldStateStorage;
private final KeyValueStorage markStorage;
private final Counter markedNodesCounter;
private final Counter markOperationCounter;
private final Counter sweepOperationCounter;
private final Counter sweptNodesCounter;
private volatile long nodeAddedListenerId;
private final ReentrantLock markLock = new ReentrantLock(true);
private final Set<BytesValue> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());

public MarkSweepPruner(
final WorldStateStorage worldStateStorage,
final KeyValueStorage markStorage,
final MetricsSystem metricsSystem) {
this.worldStateStorage = worldStateStorage;
this.markStorage = markStorage;

markedNodesCounter =
metricsSystem.createCounter(
PantheonMetricCategory.PRUNER,
"marked_nodes_total",
"Total number of nodes marked as in use");
markOperationCounter =
metricsSystem.createCounter(
PantheonMetricCategory.PRUNER,
"mark_operations_total",
"Total number of mark operations performed");

sweptNodesCounter =
metricsSystem.createCounter(
PantheonMetricCategory.PRUNER,
"swept_nodes_total",
"Total number of unused nodes removed");
sweepOperationCounter =
metricsSystem.createCounter(
PantheonMetricCategory.PRUNER,
"sweep_operations_total",
"Total number of sweep operations performed");
}

public void prepare() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case.
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes);
}

public void cleanup() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
}

public void mark(final Hash rootHash) {
markOperationCounter.inc();
markStorage.clear();
createStateTrie(rootHash)
.visitAll(
node -> {
if (Thread.interrupted()) {
// Since we don't expect to abort marking ourselves,
// our abort process consists only of handling interrupts
throw new RuntimeException("Interrupted while marking");
}
markNode(node.getHash());
node.getValue().ifPresent(this::processAccountState);
});
LOG.info("Completed marking used nodes for pruning");
}

public void sweep() {
flushPendingMarks();
sweepOperationCounter.inc();
LOG.info("Sweeping unused nodes");
final long prunedNodeCount = worldStateStorage.prune(markStorage::containsKey);
sweptNodesCounter.inc(prunedNodeCount);
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
markStorage.clear();
LOG.info("Completed sweeping unused nodes");
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStorageTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStorageTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

private void processAccountState(final BytesValue value) {
final StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(RLP.input(value));
markNode(accountValue.getCodeHash());

createStorageTrie(accountValue.getStorageRoot())
.visitAll(storageNode -> markNode(storageNode.getHash()));
}

private void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markLock.lock();
try {
pendingMarks.add(hash);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
}

private void maybeFlushPendingMarks() {
if (pendingMarks.size() > MARKS_PER_TRANSACTION) {
flushPendingMarks();
}
}

void flushPendingMarks() {
markLock.lock();
try {
final Transaction transaction = markStorage.startTransaction();
pendingMarks.forEach(node -> transaction.put(node, IN_USE));
transaction.commit();
pendingMarks.clear();
} finally {
markLock.unlock();
}
}

private void markNewNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.worldstate;

import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Pruner {
private static final Logger LOG = LogManager.getLogger();

private final MarkSweepPruner pruningStrategy;
private final Blockchain blockchain;
private final ExecutorService executorService;
private final long retentionPeriodInBlocks;
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private volatile long markBlockNumber = 0;
private volatile BlockHeader markedBlockHeader;
private long transientForkThreshold;

public Pruner(
final MarkSweepPruner pruningStrategy,
final Blockchain blockchain,
final ExecutorService executorService,
final long transientForkThreshold,
final long retentionPeriodInBlocks) {
this.pruningStrategy = pruningStrategy;
this.executorService = executorService;
this.blockchain = blockchain;
if (transientForkThreshold < 0 || retentionPeriodInBlocks < 0) {
throw new IllegalArgumentException(
String.format(
"TransientForkThreshold and RetentionPeriodInBlocks must be non-negative. transientForkThreshold=%d, retentionPeriodInBlocks=%d",
transientForkThreshold, retentionPeriodInBlocks));
}
this.retentionPeriodInBlocks = retentionPeriodInBlocks;
this.transientForkThreshold = transientForkThreshold;
}

public void start() {
blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event));
}

public void stop() throws InterruptedException {
pruningStrategy.cleanup();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}

private void handleNewBlock(final BlockAddedEvent event) {
if (!event.isNewCanonicalHead()) {
return;
}

final long blockNumber = event.getBlock().getHeader().getNumber();
if (state.compareAndSet(State.IDLE, State.TRANSIENT_FORK_OUTLIVING)) {
pruningStrategy.prepare();
markBlockNumber = blockNumber;
} else if (blockNumber >= markBlockNumber + transientForkThreshold
&& state.compareAndSet(State.TRANSIENT_FORK_OUTLIVING, State.MARKING)) {
markedBlockHeader = blockchain.getBlockHeader(markBlockNumber).get();
mark(markedBlockHeader);
} else if (blockNumber >= markBlockNumber + retentionPeriodInBlocks
&& blockchain.blockIsOnCanonicalChain(markedBlockHeader.getHash())
&& state.compareAndSet(State.MARKING_COMPLETE, State.SWEEPING)) {
sweep();
}
}

private void mark(final BlockHeader header) {
markBlockNumber = header.getNumber();
final Hash stateRoot = header.getStateRoot();
LOG.info(
"Begin marking used nodes for pruning. Block number: {} State root: {}",
markBlockNumber,
stateRoot);
execute(
() -> {
pruningStrategy.mark(stateRoot);
state.compareAndSet(State.MARKING, State.MARKING_COMPLETE);
});
}

private void sweep() {
LOG.info(
"Begin sweeping unused nodes for pruning. Retention period: {}", retentionPeriodInBlocks);
execute(
() -> {
pruningStrategy.sweep();
state.compareAndSet(State.SWEEPING, State.IDLE);
});
}

private void execute(final Runnable action) {
try {
executorService.execute(action);
} catch (final Throwable t) {
LOG.error("Pruning failed", t);
state.set(State.IDLE);
}
}

private enum State {
IDLE,
TRANSIENT_FORK_OUTLIVING,
MARKING,
MARKING_COMPLETE,
SWEEPING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;

public class InMemoryStorageProvider implements StorageProvider {

Expand Down Expand Up @@ -86,6 +87,11 @@ public PrivateStateStorage createPrivateStateStorage() {
return new PrivateStateKeyValueStorage(new InMemoryKeyValueStorage());
}

@Override
public KeyValueStorage createPruningStorage() {
return new InMemoryKeyValueStorage();
}

@Override
public void close() {}
}
Loading

0 comments on commit b20147c

Please sign in to comment.