Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2195] Commit world state continuously #809

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage.Updater;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
Expand Down Expand Up @@ -151,6 +152,29 @@ public void getNodeData_saveAndGetRegularValue() {
assertThat(storage.getNodeData(Hash.hash(bytes))).contains(bytes);
}

@Test
public void reconcilesNonConflictingUpdaters() {
BytesValue bytesA = BytesValue.fromHexString("0x12");
BytesValue bytesB = BytesValue.fromHexString("0x1234");
BytesValue bytesC = BytesValue.fromHexString("0x123456");

KeyValueStorageWorldStateStorage storage = emptyStorage();
Updater updaterA = storage.updater();
Updater updaterB = storage.updater();

updaterA.putCode(bytesA);
updaterB.putCode(bytesA);
updaterB.putCode(bytesB);
updaterA.putCode(bytesC);

updaterA.commit();
updaterB.commit();

assertThat(storage.getCode(Hash.hash(bytesA))).contains(bytesA);
assertThat(storage.getCode(Hash.hash(bytesB))).contains(bytesB);
assertThat(storage.getCode(Hash.hash(bytesC))).contains(bytesC);
}

private KeyValueStorageWorldStateStorage emptyStorage() {
return new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BigQueue;
Expand Down Expand Up @@ -51,7 +52,6 @@ private enum Status {

private final EthContext ethContext;
private final BigQueue<NodeDataRequest> pendingRequests;
private final WorldStateStorage.Updater worldStateStorageUpdater;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final AtomicInteger outstandingRequests = new AtomicInteger(0);
Expand All @@ -74,7 +74,6 @@ public WorldStateDownloader(
this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests;
this.ethTasksTimer = ethTasksTimer;
this.worldStateStorageUpdater = worldStateStorage.updater();
}

public CompletableFuture<Void> run(final BlockHeader header) {
Expand Down Expand Up @@ -135,7 +134,6 @@ private void requestNodeData(final BlockHeader header) {
(res, error) -> {
if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) {
// We're done
worldStateStorageUpdater.commit();
markDone();
} else {
// Send out additional requests
Expand Down Expand Up @@ -182,14 +180,15 @@ private CompletableFuture<?> sendAndProcessRequests(
.whenComplete(
(data, err) -> {
boolean requestFailed = err != null;
Updater storageUpdater = worldStateStorage.updater();
for (NodeDataRequest request : requests) {
BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
pendingRequests.enqueue(request);
} else {
// Persist request data
request.setData(matchingData);
request.persist(worldStateStorageUpdater);
request.persist(storageUpdater);

// Queue child requests
request
Expand All @@ -198,6 +197,7 @@ private CompletableFuture<?> sendAndProcessRequests(
.forEach(pendingRequests::enqueue);
}
}
storageUpdater.commit();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
Expand Down Expand Up @@ -174,11 +175,26 @@ public void canRecoverFromTimeouts() {
assertAccountsMatch(localWorldState, accounts);
}

@Test
public void handlesPartialResponsesFromNetwork() {
downloadAvailableWorldStateFromPeers(5, 100, 10, 10, this::respondPartially);
}

private void downloadAvailableWorldStateFromPeers(
final int peerCount,
final int accountCount,
final int hashesPerRequest,
final int maxOutstandingRequests) {
downloadAvailableWorldStateFromPeers(
peerCount, accountCount, hashesPerRequest, maxOutstandingRequests, this::respondFully);
}

private void downloadAvailableWorldStateFromPeers(
final int peerCount,
final int accountCount,
final int hashesPerRequest,
final int maxOutstandingRequests,
final NetworkResponder networkResponder) {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final int trailingPeerCount = 5;
BlockDataGenerator dataGen = new BlockDataGenerator(1);
Expand Down Expand Up @@ -237,12 +253,15 @@ private void downloadAvailableWorldStateFromPeers(
CompletableFuture<?> result = downloader.run(header);

// Respond to node data requests
Responder responder =
// Send one round of full responses, so that we can get multiple requests queued up
Responder fullResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
while (!result.isDone()) {
for (RespondingEthPeer peer : usefulPeers) {
peer.respond(responder);
}
for (RespondingEthPeer peer : usefulPeers) {
peer.respond(fullResponder);
}
// Respond to remaining queued requests in custom way
if (!result.isDone()) {
networkResponder.respond(usefulPeers, remoteWorldStateArchive, result);
}

// Check that trailing peers were not queried for data
Expand All @@ -262,6 +281,57 @@ private void downloadAvailableWorldStateFromPeers(
}
}

private void respondFully(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture) {
Responder responder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
while (!downloaderFuture.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(responder);
}
}
}

private void respondPartially(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture) {
Responder fullResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
Responder partialResponder =
RespondingEthPeer.partialResponder(
mock(Blockchain.class), remoteWorldStateArchive, MainnetProtocolSchedule.create(), .5f);
Responder emptyResponder = RespondingEthPeer.emptyResponder();

// Send a few partial responses
for (int i = 0; i < 5; i++) {
for (RespondingEthPeer peer : peers) {
peer.respond(partialResponder);
}
}

// Downloader should not complete with partial responses
assertThat(downloaderFuture).isNotDone();

// Send a few empty responses
for (int i = 0; i < 3; i++) {
for (RespondingEthPeer peer : peers) {
peer.respond(emptyResponder);
}
}

// Downloader should not complete with empty responses
assertThat(downloaderFuture).isNotDone();

while (!downloaderFuture.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(fullResponder);
}
}
}

private void assertAccountsMatch(
final WorldState worldState, final List<Account> expectedAccounts) {
for (Account expectedAccount : expectedAccounts) {
Expand All @@ -277,4 +347,12 @@ private void assertAccountsMatch(
assertThat(actualStorage).isEqualTo(expectedStorage);
}
}

@FunctionalInterface
private interface NetworkResponder {
void respond(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture);
}
}