diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java index c9582af02e..a4087c1d10 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java @@ -12,7 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager.task; -import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; @@ -27,15 +27,17 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask> { +public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask> { private static final Logger LOG = LogManager.getLogger(); @@ -63,18 +65,20 @@ protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected } @Override - protected Optional> processResponse( + protected Optional> processResponse( final boolean streamClosed, final MessageData message, final EthPeer peer) { if (streamClosed) { // We don't record this as a useless response because it's impossible to know if a peer has // the data we're requesting. - return Optional.of(emptyList()); + return Optional.of(emptyMap()); } final NodeDataMessage nodeDataMessage = NodeDataMessage.readFrom(message); final List nodeData = nodeDataMessage.nodeData(); - if (nodeData.isEmpty()) { + final Map nodeDataByHash = mapNodeDataByHash(nodeData); + + if (nodeDataByHash.isEmpty()) { return Optional.empty(); - } else if (nodeData.size() > hashes.size()) { + } else if (nodeDataByHash.entrySet().size() > hashes.size()) { // Can't be the response to our request return Optional.empty(); } @@ -83,6 +87,13 @@ protected Optional> processResponse( // Message contains unrequested data, must not be the response to our request. return Optional.empty(); } - return Optional.of(nodeData); + return Optional.of(nodeDataByHash); + } + + private Map mapNodeDataByHash(final List data) { + // Map data by hash + final Map dataByHash = new HashMap<>(); + data.forEach(d -> dataByHash.put(Hash.hash(d), d)); + return dataByHash; } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index e7786c3140..4c31e7fdd4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -240,7 +240,7 @@ private CompletableFuture waitForNewPeer() { .timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5)); } - private CompletableFuture>> sendAndProcessRequests( + private CompletableFuture>> sendAndProcessRequests( final EthPeer peer, final List> requestTasks, final BlockHeader blockHeader) { @@ -250,13 +250,12 @@ private CompletableFuture>> sendAndProcessRequ .map(NodeDataRequest::getHash) .distinct() .collect(Collectors.toList()); - final AbstractPeerTask> ethTask = + final AbstractPeerTask> ethTask = GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer).assignPeer(peer); outstandingRequests.add(ethTask); return ethTask .run() .thenApply(PeerTaskResult::getResult) - .thenApply(this::mapNodeDataByHash) .exceptionally( error -> { final Throwable rootCause = ExceptionUtils.rootCause(error); @@ -276,10 +275,10 @@ private CompletableFuture>> sendAndProcessRequ () -> storeData(requestTasks, blockHeader, ethTask, data))); } - private CompletableFuture>> storeData( + private CompletableFuture>> storeData( final List> requestTasks, final BlockHeader blockHeader, - final AbstractPeerTask> ethTask, + final AbstractPeerTask> ethTask, final Map data) { final Updater storageUpdater = worldStateStorage.updater(); for (final Task task : requestTasks) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/DeconstructedGetNodeDataFromPeerTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/DeconstructedGetNodeDataFromPeerTaskTest.java index 2bae199055..0cfd136f93 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/DeconstructedGetNodeDataFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/DeconstructedGetNodeDataFromPeerTaskTest.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -73,15 +74,16 @@ public void getNodeDataFromPeerTaskTest() { } // Execute task and wait for response - final AtomicReference>> actualResult = + final AtomicReference>> actualResult = new AtomicReference<>(); final AtomicBoolean done = new AtomicBoolean(false); final List hashes = requestedData.stream().map(Hash::hash).collect(toList()); - final EthTask>> task = + final EthTask>> task = GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer); - final CompletableFuture>> future = task.run(); + final CompletableFuture>> future = + task.run(); respondingPeer.respondWhile(responder, () -> !future.isDone()); future.whenComplete( (result, error) -> { @@ -90,7 +92,10 @@ public void getNodeDataFromPeerTaskTest() { }); assertThat(done).isTrue(); - assertThat(actualResult.get().getResult()).containsExactlyInAnyOrderElementsOf(requestedData); + + final List resultData = new ArrayList<>(actualResult.get().getResult().values()); + + assertThat(resultData).containsExactlyInAnyOrderElementsOf(requestedData); assertThat(actualResult.get().getPeer()).isEqualTo(respondingPeer.getEthPeer()); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTaskTest.java index c22988d8a6..438049ad4f 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTaskTest.java @@ -23,42 +23,49 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; -public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest> { +public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest> { @Override - protected List generateDataToBeRequested() { - final List requestedData = new ArrayList<>(); + protected Map generateDataToBeRequested() { + final Map requestedData = new HashMap<>(); for (int i = 0; i < 3; i++) { final BlockHeader blockHeader = blockchain.getBlockHeader(10 + i).get(); - requestedData.add( + requestedData.put( + Hash.fromHexStringLenient(Integer.toHexString(i)), protocolContext.getWorldStateArchive().getNodeData(blockHeader.getStateRoot()).get()); } return requestedData; } @Override - protected EthTask>> createTask( - final List requestedData) { - final List hashes = requestedData.stream().map(Hash::hash).collect(toList()); + protected EthTask>> createTask( + final Map requestedData) { + final List hashes = requestedData.values().stream().map(Hash::hash).collect(toList()); return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer); } @Override protected void assertPartialResultMatchesExpectation( - final List requestedData, final List partialResponse) { + final Map requestedData, final Map partialResponse) { assertThat(partialResponse.size()).isLessThanOrEqualTo(requestedData.size()); assertThat(partialResponse.size()).isGreaterThan(0); - assertThat(requestedData).containsAll(partialResponse); + final List requestData = new ArrayList<>(requestedData.values()); + final List resultData = new ArrayList<>(partialResponse.values()); + assertThat(requestData).containsAll(resultData); } @Override protected void assertResultMatchesExpectation( - final List requestedData, - final PeerTaskResult> response, + final Map requestedData, + final PeerTaskResult> response, final EthPeer respondingPeer) { - assertThat(response.getResult()).containsExactlyInAnyOrderElementsOf(requestedData); + final List requestData = new ArrayList<>(requestedData.values()); + final List resultData = new ArrayList<>(response.getResult().values()); + assertThat(resultData).containsExactlyInAnyOrderElementsOf(requestData); assertThat(response.getPeer()).isEqualTo(respondingPeer); } }