Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full-Trie Mark in Parallel #1219

Merged
merged 56 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
1102f12
commit marks only on block boundary
RatanRSur Jun 2, 2020
fc99761
test changes
RatanRSur Jun 2, 2020
655e571
truer in order verification for case where transaction size isn't 1
RatanRSur Jun 11, 2020
8f6cfaa
flatmap
RatanRSur Jun 11, 2020
e3e749e
simplify test
RatanRSur Jun 11, 2020
1cf490a
switch back to old sweepBefore
RatanRSur Jun 11, 2020
6dae00c
remove mentions to tx
RatanRSur Jun 11, 2020
50247e0
logging and timing and exiting
RatanRSur Jun 15, 2020
6a43c1b
ignore failure and logging
RatanRSur Jun 17, 2020
3b866b2
retry on most errors, fail on a MerkleTrieException
RatanRSur Jun 17, 2020
785c287
add ops per transaction back to integration test
RatanRSur Jun 17, 2020
e24e384
spotless
RatanRSur Jun 17, 2020
29e409c
medium tx
RatanRSur Jun 17, 2020
27b5843
visit all nodes in parallel
RatanRSur Jul 14, 2020
ba521ff
moving some stuff around
RatanRSur Jul 14, 2020
a56881b
change to CompletableFuture and wait on them
RatanRSur Jul 14, 2020
c4dbca0
unlock pending marks once the marks have been added to the tx
RatanRSur Jul 14, 2020
7e5ab65
use fixed thread pool
RatanRSur Jul 14, 2020
6d41c2b
join instead of get
RatanRSur Aug 6, 2020
190afb3
make single threaded visitAll return void
RatanRSur Aug 6, 2020
9b3d8bc
report when mark work items finish
RatanRSur Aug 6, 2020
9900b92
debug -> info
RatanRSur Aug 6, 2020
c34ea60
markNodes directly to disk
RatanRSur Aug 6, 2020
6a49649
ReadWriteLock for adding nodes during full trie marking
RatanRSur Aug 7, 2020
d5cd282
make markNode and markNodes both use the same locking mechanism
RatanRSur Aug 7, 2020
cb228e0
typo
RatanRSur Aug 10, 2020
0a252f4
parallelize *storage* marking instead of whole account subtries
RatanRSur Aug 21, 2020
784aa23
mark storage tries while visiting state trie
RatanRSur Aug 31, 2020
0572790
Merge branch 'master' into pruning-parallel
RatanRSur Sep 3, 2020
224168f
caller runs if max capacity
RatanRSur Sep 8, 2020
68de7c0
bring back multiple task generators
RatanRSur Sep 17, 2020
182f6ab
Merge remote-tracking branch 'upstream/master' into pruning-parallel
RatanRSur Oct 5, 2020
26f6f8b
Merge remote-tracking branch 'upstream/master' into pruning-parallel
RatanRSur Oct 21, 2020
3b21749
rename
RatanRSur Oct 21, 2020
6dbcea7
documentation
RatanRSur Oct 21, 2020
f549a4c
static
RatanRSur Oct 21, 2020
457d469
move constants
RatanRSur Oct 21, 2020
51abfaa
undo some testing changes
RatanRSur Oct 21, 2020
e52d809
thread pool holds 16 tasks
RatanRSur Oct 21, 2020
0f8dba7
comment fixup
RatanRSur Oct 21, 2020
9aa005f
remove comment
RatanRSur Oct 21, 2020
6d22d3e
param
RatanRSur Oct 21, 2020
3b91245
log messages
RatanRSur Oct 21, 2020
08adbb3
rename lock
RatanRSur Oct 21, 2020
33d74dd
fix metrics bug
RatanRSur Oct 21, 2020
bd2b55b
rename locks
RatanRSur Oct 21, 2020
97110e9
typo
RatanRSur Oct 22, 2020
26ec138
switch to non-blocking code
RatanRSur Oct 22, 2020
5700395
comment
RatanRSur Oct 22, 2020
76367cc
Revert "switch to non-blocking code"
RatanRSur Oct 23, 2020
e1d15ad
Revert "comment"
RatanRSur Oct 23, 2020
40de52a
remove unused var
RatanRSur Oct 23, 2020
5fb2372
process root node in executor
RatanRSur Oct 23, 2020
e48fe7e
lower ops per tx
RatanRSur Oct 23, 2020
4d3bd8b
Merge remote-tracking branch 'upstream/master' into pruning-parallel
RatanRSur Oct 23, 2020
aa15d98
errata
RatanRSur Oct 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,34 @@

import picocli.CommandLine;

import java.util.Objects;

public class ExperimentalCliOptionMustBeCorrectlyDisplayedNegativeCases {

@CommandLine.Option(
hidden = true,
names = {"--Xexperimental"})
hidden = true,
names = {"--Xexperimental"})
private String experimental = "";

@CommandLine.Option(
hidden = false,
names = {"--notExperimental"})
hidden = false,
names = {"--notExperimental"})
private String notExperimental = "";

@CommandLine.Option(
names = {"--notExperimental2"})
@CommandLine.Option(names = {"--notExperimental2"})
private String notExperimental2 = "";

private class AnotherClass {
@CommandLine.Option(
names = {"--notExperimentalInAnotherClass"})
@CommandLine.Option(names = {"--notExperimentalInAnotherClass"})
private String notExperimentalInAnotherClass = "";

@CommandLine.Option(
hidden = true,
names = {"--XexperimentalInAnotherClass"})
hidden = true,
names = {"--XexperimentalInAnotherClass"})
private String experimentalInAnotherClass = "";
}

private class BesuCommand {

@CommandLine.Option(
names = {"--notExperimentalInBesuCommandClass"})
@CommandLine.Option(names = {"--notExperimentalInBesuCommandClass"})
private String notExperimentalInBesuCommandClass = "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,25 @@

import picocli.CommandLine;

import java.util.Objects;

public class ExperimentalCliOptionMustBeCorrectlyDisplayedPositiveCases {

// BUG: Diagnostic contains: Experimental options must be hidden and not present in the BesuCommand class.
// BUG: Diagnostic contains: Experimental options must be hidden and not present in the
// BesuCommand class.
@CommandLine.Option(
hidden = false,
names = {"--Xexperimental"})
hidden = false,
names = {"--Xexperimental"})
private String experimental = "";

// BUG: Diagnostic contains: Experimental options must be hidden and not present in the BesuCommand class.
@CommandLine.Option(
names = {"--Xexperimental2"})
// BUG: Diagnostic contains: Experimental options must be hidden and not present in the
// BesuCommand class.
@CommandLine.Option(names = {"--Xexperimental2"})
private String experimental2 = "";

private class BesuCommand {

// BUG: Diagnostic contains: Experimental options must be hidden and not present in the BesuCommand class.
@CommandLine.Option(
names = {"--XexperimentalInBesuCommandClass"})
// BUG: Diagnostic contains: Experimental options must be hidden and not present in the
// BesuCommand class.
@CommandLine.Option(names = {"--XexperimentalInBesuCommandClass"})
private String experimentalInBesuCommandClass = "";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class PrunerIntegrationTest {

@Test
public void pruner_smallState_manyOpsPerTx() {
testPruner(3, 1, 1, 4, 1000);
testPruner(3, 1, 1, 4, 100_000);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,31 @@
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;

public class MarkSweepPruner {

private static final int DEFAULT_OPS_PER_TRANSACTION = 1000;
private static final Logger LOG = LogManager.getLogger();
private static final byte[] IN_USE = Bytes.of(1).toArrayUnsafe();

private static final int DEFAULT_OPS_PER_TRANSACTION = 50_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about this - is it really very advantageous to bundle up changes into large transactions? Versus just picking a reasonable size and committing more frequently? I also wonder if the node were under heavy load if its possible we might hit the timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me, I'll lower it to 10k. Let me know if you think it should go lower.

private static final int MAX_MARKING_THREAD_POOL_SIZE = 2;

private final int operationsPerTransaction;
private final WorldStateStorage worldStateStorage;
private final MutableBlockchain blockchain;
Expand All @@ -56,7 +64,7 @@ public class MarkSweepPruner {
private final Counter sweptNodesCounter;
private final Stopwatch markStopwatch;
private volatile long nodeAddedListenerId;
private final ReentrantLock markLock = new ReentrantLock(true);
private final ReadWriteLock pendingMarksLock = new ReentrantReadWriteLock();
private final Set<Bytes32> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());

public MarkSweepPruner(
Expand Down Expand Up @@ -104,6 +112,8 @@ public MarkSweepPruner(
"mark_time_duration",
"Cumulative number of seconds spent marking the state trie across all pruning cycles",
() -> markStopwatch.elapsed(TimeUnit.SECONDS));

LOG.debug("Using {} pruner threads", MAX_MARKING_THREAD_POOL_SIZE);
}

public void prepare() {
Expand All @@ -115,20 +125,64 @@ public void prepare() {
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes);
}

/**
* This is a parallel mark implementation.
*
* <p>The parallel task production is by sub-trie, so calling `visitAll` on a root node will
* eventually spawn up to 16 tasks (for a hexary trie).
*
* <p>If we marked each sub-trie in its own thread, with no common queue of tasks, our mark speed
* would be limited by the sub-trie with the maximum number of nodes. In practice for the Ethereum
* mainnet, we see a large imbalance in sub-trie size so without a common task pool the time in
* which there is only 1 thread left marking its big sub-trie would be substantial.
*
* <p>If we were to leave all threads to produce mark tasks before starting to mark, we would run
* out of memory quickly.
*
* <p>If we were to have a constant number of threads producing the mark threads with the others
* consuming them, we would have to optimize the production/consumption balance.
*
* <p>To get the best of both worlds, the marking executor has a {@link
* ThreadPoolExecutor.CallerRunsPolicy} which causes the producing tasks to essentially consume
* their own mark task immediately when the thread pool is full. The resulting behavior is threads
* that mark their own sub-trie until they finish that sub-trie, at which point they switch to
* marking the sub-trie tasks produced by another thread.
*
* @param rootHash The root hash of the whole state trie. Roots of storage tries will be
* discovered though traversal.
*/
public void mark(final Hash rootHash) {
markOperationCounter.inc();
markStopwatch.start();
final ExecutorService markingExecutorService =
new ThreadPoolExecutor(
0,
MAX_MARKING_THREAD_POOL_SIZE,
5L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(16),
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.setNameFormat(this.getClass().getSimpleName() + "-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy());
createStateTrie(rootHash)
.visitAll(
node -> {
if (Thread.interrupted()) {
// Since we don't expect to abort marking ourselves,
// our abort process consists only of handling interrupts
throw new RuntimeException("Interrupted while marking");
}
markNode(node.getHash());
node.getValue().ifPresent(this::processAccountState);
});
node.getValue()
.ifPresent(value -> processAccountState(value, markingExecutorService));
},
markingExecutorService)
.join() /* This will block on all the marking tasks to be _produced_ but doesn't guarantee that the marking tasks have been completed. */;
markingExecutorService.shutdown();
try {
// This ensures that the marking tasks complete.
markingExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Interrupted while marking", e);
}
markStopwatch.stop();
LOG.debug("Completed marking used nodes for pruning");
}
Expand Down Expand Up @@ -199,52 +253,57 @@ private MerklePatriciaTrie<Bytes32, Bytes> createStorageTrie(final Bytes32 rootH
Function.identity());
}

private void processAccountState(final Bytes value) {
private void processAccountState(final Bytes value, final ExecutorService executorService) {
final StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(RLP.input(value));
markNode(accountValue.getCodeHash());

createStorageTrie(accountValue.getStorageRoot())
.visitAll(storageNode -> markNode(storageNode.getHash()));
.visitAll(storageNode -> markNode(storageNode.getHash()), executorService);
}

@VisibleForTesting
void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markLock.lock();
try {
pendingMarks.add(hash);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
markThenMaybeFlush(() -> pendingMarks.add(hash), 1);
}

private void markNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markThenMaybeFlush(() -> pendingMarks.addAll(nodeHashes), nodeHashes.size());
}

private void markThenMaybeFlush(final Runnable nodeMarker, final int numberOfNodes) {
// We use the read lock here because pendingMarks is threadsafe and we want to allow all the
// marking threads access simultaneously.
final Lock markLock = pendingMarksLock.readLock();
markLock.lock();
try {
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
nodeMarker.run();
} finally {
markLock.unlock();
}
}
markedNodesCounter.inc(numberOfNodes);

private void maybeFlushPendingMarks() {
if (pendingMarks.size() > operationsPerTransaction) {
flushPendingMarks();
// However, when the size of pendingMarks grows too large, we want all the threads to stop
// adding because we're going to clear the set.
// Therefore, we need to take out a write lock.
if (pendingMarks.size() >= operationsPerTransaction) {
final Lock flushLock = pendingMarksLock.writeLock();
flushLock.lock();
try {
// Check once again that the condition holds. If it doesn't, that means another thread
// already flushed them.
if (pendingMarks.size() >= operationsPerTransaction) {
flushPendingMarks();
}
} finally {
flushLock.unlock();
}
}
}

private void flushPendingMarks() {
markLock.lock();
try {
final KeyValueStorageTransaction transaction = markStorage.startTransaction();
pendingMarks.forEach(node -> transaction.put(node.toArrayUnsafe(), IN_USE));
transaction.commit();
pendingMarks.clear();
} finally {
markLock.unlock();
}
final KeyValueStorageTransaction transaction = markStorage.startTransaction();
pendingMarks.forEach(node -> transaction.put(node.toArrayUnsafe(), IN_USE));
transaction.commit();
pendingMarks.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -137,7 +138,7 @@ private void handleNewBlock(final BlockAddedEvent event) {

private void mark(final BlockHeader header) {
final Hash stateRoot = header.getStateRoot();
LOG.debug(
LOG.info(
"Begin marking used nodes for pruning. Block number: {} State root: {}",
markBlockNumber,
stateRoot);
Expand All @@ -149,7 +150,7 @@ private void mark(final BlockHeader header) {
}

private void sweep() {
LOG.debug(
LOG.info(
"Begin sweeping unused nodes for pruning. Keeping full state for blocks {} to {}",
markBlockNumber,
markBlockNumber + blocksRetained);
Expand All @@ -163,14 +164,21 @@ private void sweep() {
private void execute(final Runnable action) {
try {
executorService.execute(action);
} catch (final Throwable t) {
LOG.error("Pruner failed", t);
pruningStrategy.cleanup();
} catch (final MerkleTrieException mte) {
LOG.fatal(
"An unrecoverable error occurred while pruning. The database directory must be deleted and resynced.",
mte);
System.exit(1);
} catch (final Exception e) {
LOG.error(
"An unexpected error ocurred in the {} pruning phase: {}. Reattempting.",
getPruningPhase(),
e.getMessage());
pruningStrategy.clearMarks();
pruningPhase.set(PruningPhase.IDLE);
}
}

@VisibleForTesting
PruningPhase getPruningPhase() {
return pruningPhase.get();
}
Expand Down
Loading