Skip to content

Commit

Permalink
Use pipeline for world state download (PegaSysEng#1096)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Mar 14, 2019
1 parent a9ab43a commit b5d025c
Show file tree
Hide file tree
Showing 22 changed files with 1,289 additions and 491 deletions.
1 change: 1 addition & 0 deletions ethereum/eth/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':ethereum:permissioning')
implementation project(':metrics')
implementation project(':services:kvstore')
implementation project(':services:pipeline')
implementation project(':services:tasks')

implementation 'io.vertx:vertx-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
Expand Down Expand Up @@ -117,6 +118,13 @@ public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
return serviceFuture;
}

public CompletableFuture<Void> startPipeline(final Pipeline pipeline) {
final CompletableFuture<Void> pipelineFuture = pipeline.start(servicesExecutor);
serviceFutures.add(pipelineFuture);
pipelineFuture.whenComplete((r, t) -> serviceFutures.remove(pipelineFuture));
return pipelineFuture;
}

public <T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
return CompletableFuture.supplyAsync(computation, computationExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager.task;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

public class RetryingGetNodeDataFromPeerTask
extends AbstractRetryingPeerTask<Map<Hash, BytesValue>> {

private final EthContext ethContext;
private final Set<Hash> hashes;
private final long pivotBlockNumber;
private final MetricsSystem metricsSystem;

private RetryingGetNodeDataFromPeerTask(
final EthContext ethContext,
final Collection<Hash> hashes,
final long pivotBlockNumber,
final MetricsSystem metricsSystem) {
super(ethContext, 3, data -> false, metricsSystem);
this.ethContext = ethContext;
this.hashes = new HashSet<>(hashes);
this.pivotBlockNumber = pivotBlockNumber;
this.metricsSystem = metricsSystem;
}

public static RetryingGetNodeDataFromPeerTask forHashes(
final EthContext ethContext,
final Collection<Hash> hashes,
final long pivotBlockNumber,
final MetricsSystem metricsSystem) {
return new RetryingGetNodeDataFromPeerTask(ethContext, hashes, pivotBlockNumber, metricsSystem);
}

@Override
protected CompletableFuture<Map<Hash, BytesValue>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
final GetNodeDataFromPeerTask task =
GetNodeDataFromPeerTask.forHashes(ethContext, hashes, pivotBlockNumber, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.get().complete(peerResult.getResult());
return peerResult.getResult();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.tasks.Task;

public class CompleteTaskStep {

private final WorldStateStorage worldStateStorage;
private final Counter completedRequestsCounter;
private final Counter retriedRequestsCounter;

public CompleteTaskStep(
final WorldStateStorage worldStateStorage, final MetricsSystem metricsSystem) {
this.worldStateStorage = worldStateStorage;

completedRequestsCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
"world_state_completed_requests_total",
"Total number of node data requests completed as part of fast sync world state download");
retriedRequestsCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
"world_state_retried_requests_total",
"Total number of node data requests repeated as part of fast sync world state download");
}

public void markAsCompleteOrFailed(
final BlockHeader header,
final WorldDownloadState downloadState,
final Task<NodeDataRequest> task) {
if (task.getData().getData() != null) {
enqueueChildren(task, header, downloadState);
completedRequestsCounter.inc();
task.markCompleted();
downloadState.checkCompletion(worldStateStorage, header);
} else {
retriedRequestsCounter.inc();
task.markFailed();
// Marking the task as failed will add it back to the queue so make sure any threads
// waiting to read from the queue are notified.
downloadState.notifyTaskAvailable();
}
}

private void enqueueChildren(
final Task<NodeDataRequest> task,
final BlockHeader blockHeader,
final WorldDownloadState downloadState) {
final NodeDataRequest request = task.getData();
// Only queue rootnode children if we started from scratch
if (!downloadState.downloadWasResumed() || !isRootState(blockHeader, request)) {
downloadState.enqueueRequests(request.getChildRequests());
}
}

private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipe;
import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Optional;
import java.util.stream.Stream;

public class LoadLocalDataStep {

private final WorldStateStorage worldStateStorage;
private final Counter existingNodeCounter;

public LoadLocalDataStep(
final WorldStateStorage worldStateStorage, final MetricsSystem metricsSystem) {
this.worldStateStorage = worldStateStorage;
existingNodeCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
"world_state_existing_nodes_total",
"Total number of node data requests completed using existing data");
}

public Stream<Task<NodeDataRequest>> loadLocalData(
final Task<NodeDataRequest> task, final Pipe<Task<NodeDataRequest>> completedTasks) {
final NodeDataRequest request = task.getData();
final Optional<BytesValue> existingData = worldStateStorage.getNodeData(request.getHash());
if (existingData.isPresent()) {
existingNodeCounter.inc();
request.setData(existingData.get());
request.setRequiresPersisting(false);
completedTasks.put(task);
return Stream.empty();
}
return Stream.of(task);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.services.tasks.Task;

import java.util.List;

public class PersistDataStep {
private final WorldStateStorage worldStateStorage;

public PersistDataStep(final WorldStateStorage worldStateStorage) {
this.worldStateStorage = worldStateStorage;
}

public List<Task<NodeDataRequest>> persist(
final List<Task<NodeDataRequest>> tasks,
final BlockHeader blockHeader,
final WorldDownloadState downloadState) {
final Updater updater = worldStateStorage.updater();
tasks.stream()
.map(Task::getData)
.filter(request -> request.getData() != null)
.forEach(
request -> {
if (isRootState(blockHeader, request)) {
downloadState.setRootNodeData(request.getData());
} else {
request.persist(updater);
}
});
updater.commit();
return tasks;
}

private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.RetryingGetNodeDataFromPeerTask;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class RequestDataStep {
private static final Logger LOG = LogManager.getLogger();
private final BiFunction<List<Hash>, Long, EthTask<Map<Hash, BytesValue>>> getNodeDataTaskFactory;

public RequestDataStep(final EthContext ethContext, final MetricsSystem metricsSystem) {
this(
(hashes, pivotBlockNumber) ->
RetryingGetNodeDataFromPeerTask.forHashes(
ethContext, hashes, pivotBlockNumber, metricsSystem));
}

RequestDataStep(
final BiFunction<List<Hash>, Long, EthTask<Map<Hash, BytesValue>>> getNodeDataTaskFactory) {
this.getNodeDataTaskFactory = getNodeDataTaskFactory;
}

public CompletableFuture<List<Task<NodeDataRequest>>> requestData(
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader,
final WorldDownloadState downloadState) {
final List<Hash> hashes =
requestTasks.stream()
.map(Task::getData)
.map(NodeDataRequest::getHash)
.distinct()
.collect(Collectors.toList());
return sendRequest(blockHeader, hashes, downloadState)
.thenApply(
data -> {
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = data.get(request.getHash());
if (matchingData != null) {
request.setData(matchingData);
}
}
return requestTasks;
});
}

private CompletableFuture<Map<Hash, BytesValue>> sendRequest(
final BlockHeader blockHeader,
final List<Hash> hashes,
final WorldDownloadState downloadState) {
final EthTask<Map<Hash, BytesValue>> task =
getNodeDataTaskFactory.apply(hashes, blockHeader.getNumber());
downloadState.addOutstandingTask(task);
return task.run()
.handle(
(result, error) -> {
downloadState.removeOutstandingTask(task);
if (error != null) {
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (!(rootCause instanceof TimeoutException
|| rootCause instanceof InterruptedException
|| rootCause instanceof CancellationException
|| rootCause instanceof EthTaskException)) {
LOG.debug("GetNodeDataRequest failed", error);
}
return Collections.emptyMap();
}
downloadState.requestComplete(!result.isEmpty());
return result;
});
}
}
Loading

0 comments on commit b5d025c

Please sign in to comment.