Skip to content

Commit

Permalink
Introduce Parallel Marking for Pruning (#1219)
Browse files Browse the repository at this point in the history
The parallel task production is by sub-trie, so calling `visitAll` on a 
root node will eventually spawn up to 16 tasks (for a hexary trie).

If we marked each sub-trie in its own thread, with no common queue of 
tasks, our mark speed would be limited by the sub-trie with the maximum 
number of nodes. In practice for the Ethereum mainnet, we see a large 
imbalance in sub-trie size so without a common task pool the time in 
which there is only 1 thread left marking its big sub-trie would be 
substantial.

If we were to leave all threads to produce mark tasks before starting 
to mark, we would run out of memory quickly.

If we were to have a constant number of threads producing the mark 
tasks with the others consuming them, we would have to optimize the 
production/consumption balance.

To get the best of both worlds, the marking executor has a 
ThreadPoolExecutor.CallerRunsPolicy which causes the producing tasks to 
essentially consume their own mark task immediately when the task queue 
is full. The resulting behavior is threads that mark their own sub-trie 
until they finish that sub-trie, at which point they switch to marking 
the sub-trie tasks produced by another thread.

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
  • Loading branch information
RatanRSur authored Oct 23, 2020
1 parent ef9cc4f commit a25d3f1
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,34 @@

import picocli.CommandLine;

import java.util.Objects;

public class ExperimentalCliOptionMustBeCorrectlyDisplayedNegativeCases {

@CommandLine.Option(
hidden = true,
names = {"--Xexperimental"})
hidden = true,
names = {"--Xexperimental"})
private String experimental = "";

@CommandLine.Option(
hidden = false,
names = {"--notExperimental"})
hidden = false,
names = {"--notExperimental"})
private String notExperimental = "";

@CommandLine.Option(
names = {"--notExperimental2"})
@CommandLine.Option(names = {"--notExperimental2"})
private String notExperimental2 = "";

private class AnotherClass {
@CommandLine.Option(
names = {"--notExperimentalInAnotherClass"})
@CommandLine.Option(names = {"--notExperimentalInAnotherClass"})
private String notExperimentalInAnotherClass = "";

@CommandLine.Option(
hidden = true,
names = {"--XexperimentalInAnotherClass"})
hidden = true,
names = {"--XexperimentalInAnotherClass"})
private String experimentalInAnotherClass = "";
}

private class BesuCommand {

@CommandLine.Option(
names = {"--notExperimentalInBesuCommandClass"})
@CommandLine.Option(names = {"--notExperimentalInBesuCommandClass"})
private String notExperimentalInBesuCommandClass = "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,25 @@

import picocli.CommandLine;

import java.util.Objects;

public class ExperimentalCliOptionMustBeCorrectlyDisplayedPositiveCases {

// BUG: Diagnostic contains: Experimental options must be hidden and not present in the BesuCommand class.
// BUG: Diagnostic contains: Experimental options must be hidden and not present in the
// BesuCommand class.
@CommandLine.Option(
hidden = false,
names = {"--Xexperimental"})
hidden = false,
names = {"--Xexperimental"})
private String experimental = "";

// BUG: Diagnostic contains: Experimental options must be hidden and not present in the BesuCommand class.
@CommandLine.Option(
names = {"--Xexperimental2"})
// BUG: Diagnostic contains: Experimental options must be hidden and not present in the
// BesuCommand class.
@CommandLine.Option(names = {"--Xexperimental2"})
private String experimental2 = "";

private class BesuCommand {

// BUG: Diagnostic contains: Experimental options must be hidden and not present in the BesuCommand class.
@CommandLine.Option(
names = {"--XexperimentalInBesuCommandClass"})
// BUG: Diagnostic contains: Experimental options must be hidden and not present in the
// BesuCommand class.
@CommandLine.Option(names = {"--XexperimentalInBesuCommandClass"})
private String experimentalInBesuCommandClass = "";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class PrunerIntegrationTest {

@Test
public void pruner_smallState_manyOpsPerTx() {
testPruner(3, 1, 1, 4, 1000);
testPruner(3, 1, 1, 4, 100_000);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,31 @@
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;

public class MarkSweepPruner {

private static final int DEFAULT_OPS_PER_TRANSACTION = 1000;
private static final Logger LOG = LogManager.getLogger();
private static final byte[] IN_USE = Bytes.of(1).toArrayUnsafe();

private static final int DEFAULT_OPS_PER_TRANSACTION = 10_000;
private static final int MAX_MARKING_THREAD_POOL_SIZE = 2;

private final int operationsPerTransaction;
private final WorldStateStorage worldStateStorage;
private final MutableBlockchain blockchain;
Expand All @@ -56,7 +64,7 @@ public class MarkSweepPruner {
private final Counter sweptNodesCounter;
private final Stopwatch markStopwatch;
private volatile long nodeAddedListenerId;
private final ReentrantLock markLock = new ReentrantLock(true);
private final ReadWriteLock pendingMarksLock = new ReentrantReadWriteLock();
private final Set<Bytes32> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());

public MarkSweepPruner(
Expand Down Expand Up @@ -104,6 +112,8 @@ public MarkSweepPruner(
"mark_time_duration",
"Cumulative number of seconds spent marking the state trie across all pruning cycles",
() -> markStopwatch.elapsed(TimeUnit.SECONDS));

LOG.debug("Using {} pruner threads", MAX_MARKING_THREAD_POOL_SIZE);
}

public void prepare() {
Expand All @@ -115,20 +125,64 @@ public void prepare() {
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes);
}

/**
* This is a parallel mark implementation.
*
* <p>The parallel task production is by sub-trie, so calling `visitAll` on a root node will
* eventually spawn up to 16 tasks (for a hexary trie).
*
* <p>If we marked each sub-trie in its own thread, with no common queue of tasks, our mark speed
* would be limited by the sub-trie with the maximum number of nodes. In practice for the Ethereum
* mainnet, we see a large imbalance in sub-trie size so without a common task pool the time in
* which there is only 1 thread left marking its big sub-trie would be substantial.
*
* <p>If we were to leave all threads to produce mark tasks before starting to mark, we would run
* out of memory quickly.
*
* <p>If we were to have a constant number of threads producing the mark tasks with the others
* consuming them, we would have to optimize the production/consumption balance.
*
* <p>To get the best of both worlds, the marking executor has a {@link
* ThreadPoolExecutor.CallerRunsPolicy} which causes the producing tasks to essentially consume
* their own mark task immediately when the task queue is full. The resulting behavior is threads
* that mark their own sub-trie until they finish that sub-trie, at which point they switch to
* marking the sub-trie tasks produced by another thread.
*
* @param rootHash The root hash of the whole state trie. Roots of storage tries will be
* discovered though traversal.
*/
public void mark(final Hash rootHash) {
markOperationCounter.inc();
markStopwatch.start();
final ExecutorService markingExecutorService =
new ThreadPoolExecutor(
0,
MAX_MARKING_THREAD_POOL_SIZE,
5L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(16),
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.setNameFormat(this.getClass().getSimpleName() + "-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy());
createStateTrie(rootHash)
.visitAll(
node -> {
if (Thread.interrupted()) {
// Since we don't expect to abort marking ourselves,
// our abort process consists only of handling interrupts
throw new RuntimeException("Interrupted while marking");
}
markNode(node.getHash());
node.getValue().ifPresent(this::processAccountState);
});
node.getValue()
.ifPresent(value -> processAccountState(value, markingExecutorService));
},
markingExecutorService)
.join() /* This will block on all the marking tasks to be _produced_ but doesn't guarantee that the marking tasks have been completed. */;
markingExecutorService.shutdown();
try {
// This ensures that the marking tasks complete.
markingExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Interrupted while marking", e);
}
markStopwatch.stop();
LOG.debug("Completed marking used nodes for pruning");
}
Expand Down Expand Up @@ -199,52 +253,57 @@ private MerklePatriciaTrie<Bytes32, Bytes> createStorageTrie(final Bytes32 rootH
Function.identity());
}

private void processAccountState(final Bytes value) {
private void processAccountState(final Bytes value, final ExecutorService executorService) {
final StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(RLP.input(value));
markNode(accountValue.getCodeHash());

createStorageTrie(accountValue.getStorageRoot())
.visitAll(storageNode -> markNode(storageNode.getHash()));
.visitAll(storageNode -> markNode(storageNode.getHash()), executorService);
}

@VisibleForTesting
void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markLock.lock();
try {
pendingMarks.add(hash);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
markThenMaybeFlush(() -> pendingMarks.add(hash), 1);
}

private void markNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markThenMaybeFlush(() -> pendingMarks.addAll(nodeHashes), nodeHashes.size());
}

private void markThenMaybeFlush(final Runnable nodeMarker, final int numberOfNodes) {
// We use the read lock here because pendingMarks is threadsafe and we want to allow all the
// marking threads access simultaneously.
final Lock markLock = pendingMarksLock.readLock();
markLock.lock();
try {
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
nodeMarker.run();
} finally {
markLock.unlock();
}
}
markedNodesCounter.inc(numberOfNodes);

private void maybeFlushPendingMarks() {
if (pendingMarks.size() > operationsPerTransaction) {
flushPendingMarks();
// However, when the size of pendingMarks grows too large, we want all the threads to stop
// adding because we're going to clear the set.
// Therefore, we need to take out a write lock.
if (pendingMarks.size() >= operationsPerTransaction) {
final Lock flushLock = pendingMarksLock.writeLock();
flushLock.lock();
try {
// Check once again that the condition holds. If it doesn't, that means another thread
// already flushed them.
if (pendingMarks.size() >= operationsPerTransaction) {
flushPendingMarks();
}
} finally {
flushLock.unlock();
}
}
}

private void flushPendingMarks() {
markLock.lock();
try {
final KeyValueStorageTransaction transaction = markStorage.startTransaction();
pendingMarks.forEach(node -> transaction.put(node.toArrayUnsafe(), IN_USE));
transaction.commit();
pendingMarks.clear();
} finally {
markLock.unlock();
}
final KeyValueStorageTransaction transaction = markStorage.startTransaction();
pendingMarks.forEach(node -> transaction.put(node.toArrayUnsafe(), IN_USE));
transaction.commit();
pendingMarks.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -137,7 +138,7 @@ private void handleNewBlock(final BlockAddedEvent event) {

private void mark(final BlockHeader header) {
final Hash stateRoot = header.getStateRoot();
LOG.debug(
LOG.info(
"Begin marking used nodes for pruning. Block number: {} State root: {}",
markBlockNumber,
stateRoot);
Expand All @@ -149,7 +150,7 @@ private void mark(final BlockHeader header) {
}

private void sweep() {
LOG.debug(
LOG.info(
"Begin sweeping unused nodes for pruning. Keeping full state for blocks {} to {}",
markBlockNumber,
markBlockNumber + blocksRetained);
Expand All @@ -163,14 +164,21 @@ private void sweep() {
private void execute(final Runnable action) {
try {
executorService.execute(action);
} catch (final Throwable t) {
LOG.error("Pruner failed", t);
pruningStrategy.cleanup();
} catch (final MerkleTrieException mte) {
LOG.fatal(
"An unrecoverable error occurred while pruning. The database directory must be deleted and resynced.",
mte);
System.exit(1);
} catch (final Exception e) {
LOG.error(
"An unexpected error ocurred in the {} pruning phase: {}. Reattempting.",
getPruningPhase(),
e.getMessage());
pruningStrategy.clearMarks();
pruningPhase.set(PruningPhase.IDLE);
}
}

@VisibleForTesting
PruningPhase getPruningPhase() {
return pruningPhase.get();
}
Expand Down
Loading

0 comments on commit a25d3f1

Please sign in to comment.