Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Use a single thread to persist downloaded world state nodes (#950)
Browse files Browse the repository at this point in the history
* Use a single thread to persist received world state nodes to avoid RocksDB timeouts due to contention on the write lock.

* Fix RocksDbTaskQueue so it doesn't read stale data when resuming a transfer.

* Skip downloading empty trie nodes.

* Log unhandled errors from world state download requests
  • Loading branch information
ajsutton authored Feb 26, 2019
1 parent 4e8b387 commit 700896a
Show file tree
Hide file tree
Showing 15 changed files with 393 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
Expand All @@ -33,8 +31,7 @@ class AccountTrieNodeDataRequest extends TrieNodeDataRequest {
}

@Override
public void persist(final Updater updater) {
checkNotNull(getData(), "Must set data before node can be persisted.");
protected void doPersist(final Updater updater) {
updater.putAccountStateTrieNode(getHash(), getData());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
Expand All @@ -29,8 +27,7 @@ class CodeNodeDataRequest extends NodeDataRequest {
}

@Override
public void persist(final Updater updater) {
checkNotNull(getData(), "Must set data before node can be persisted.");
protected void doPersist(final Updater updater) {
updater.putCode(getHash(), getData());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
Expand All @@ -28,6 +30,7 @@ public abstract class NodeDataRequest {
private final RequestType requestType;
private final Hash hash;
private BytesValue data;
private boolean requiresPersisting = true;
private final AtomicInteger failedRequestCount = new AtomicInteger(0);

protected NodeDataRequest(final RequestType requestType, final Hash hash) {
Expand All @@ -52,14 +55,14 @@ public static BytesValue serialize(final NodeDataRequest request) {
}

public static NodeDataRequest deserialize(final BytesValue encoded) {
RLPInput in = RLP.input(encoded);
final RLPInput in = RLP.input(encoded);
in.enterList();
RequestType requestType = RequestType.fromValue(in.readByte());
Hash hash = Hash.wrap(in.readBytes32());
int failureCount = in.readIntScalar();
final RequestType requestType = RequestType.fromValue(in.readByte());
final Hash hash = Hash.wrap(in.readBytes32());
final int failureCount = in.readIntScalar();
in.leaveList();

NodeDataRequest deserialized;
final NodeDataRequest deserialized;
switch (requestType) {
case ACCOUNT_TRIE_NODE:
deserialized = createAccountDataRequest(hash);
Expand Down Expand Up @@ -105,6 +108,11 @@ public NodeDataRequest setData(final BytesValue data) {
return this;
}

public NodeDataRequest setRequiresPersisting(final boolean requiresPersisting) {
this.requiresPersisting = requiresPersisting;
return this;
}

public int trackFailure() {
return failedRequestCount.incrementAndGet();
}
Expand All @@ -113,7 +121,14 @@ private void setFailureCount(final int failures) {
failedRequestCount.set(failures);
}

public abstract void persist(final WorldStateStorage.Updater updater);
public final void persist(final WorldStateStorage.Updater updater) {
if (requiresPersisting) {
checkNotNull(getData(), "Must set data before node can be persisted.");
doPersist(updater);
}
}

protected abstract void doPersist(final WorldStateStorage.Updater updater);

public abstract Stream<NodeDataRequest> getChildRequests();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
Expand All @@ -30,8 +28,7 @@ class StorageTrieNodeDataRequest extends TrieNodeDataRequest {
}

@Override
public void persist(final Updater updater) {
checkNotNull(getData(), "Must set data before node can be persisted.");
protected void doPersist(final Updater updater) {
updater.putAccountStorageTrieNode(getHash(), getData());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.trie.Node;
import tech.pegasys.pantheon.ethereum.trie.TrieNodeDecoder;
import tech.pegasys.pantheon.util.bytes.BytesValue;
Expand Down Expand Up @@ -58,7 +59,11 @@ private Stream<NodeDataRequest> getRequestsFromLoadedTrieNode(final Node<BytesVa
private Stream<NodeDataRequest> getRequestsFromChildTrieNode(final Node<BytesValue> trieNode) {
if (trieNode.isReferencedByHash()) {
// If child nodes are reference by hash, we need to download them
NodeDataRequest req = createChildNodeDataRequest(Hash.wrap(trieNode.getHash()));
final Hash hash = Hash.wrap(trieNode.getHash());
if (MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH.equals(hash) || BytesValue.EMPTY.equals(hash)) {
return Stream.empty();
}
final NodeDataRequest req = createChildNodeDataRequest(hash);
return Stream.of(req);
}
// Otherwise if the child's value has been inlined we can go ahead and process it
Expand Down
Loading

0 comments on commit 700896a

Please sign in to comment.