Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Do parallel extract signatures in the parallel block importer. (#844)
Browse files Browse the repository at this point in the history
* Do parallel extract signatures in the parallel block importer.

* remove the extraction from FullSyncBlockHandler
  • Loading branch information
shemnon authored Feb 20, 2019
1 parent aad2d17 commit 0cf066d
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public interface BlockHandler<B> {
CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks);

long extractBlockNumber(final B block);

CompletableFuture<Void> executeParallelCalculations(List<B> blocks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,9 @@ private BlockImporter<C> getBlockImporter(final BlockWithReceipts blockWithRecei
public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) {
return blockWithReceipt.getHeader().getNumber();
}

@Override
public CompletableFuture<Void> executeParallelCalculations(final List<BlockWithReceipts> blocks) {
return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
Expand All @@ -25,6 +26,7 @@
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -68,26 +70,23 @@ public CompletableFuture<List<Block>> validateAndImportBlocks(final List<Block>
@Override
public CompletableFuture<List<Block>> downloadBlocks(final List<BlockHeader> headers) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer)
.run()
.thenCompose(this::extractTransactionSenders);
.run();
}

@Override
public long extractBlockNumber(final Block block) {
return block.getHeader().getNumber();
}

private CompletableFuture<List<Block>> extractTransactionSenders(final List<Block> blocks) {
LOG.debug(
"Extracting sender {} to {}",
blocks.get(0).getHeader().getNumber(),
blocks.get(blocks.size() - 1).getHeader().getNumber());
@Override
public CompletableFuture<Void> executeParallelCalculations(final List<Block> blocks) {
final EthScheduler ethScheduler = ethContext.getScheduler();
final List<CompletableFuture<?>> calculations = new ArrayList<>();
for (final Block block : blocks) {
for (final Transaction transaction : block.getBody().getTransactions()) {
// This method internally performs the transaction sender extraction.
transaction.getSender();
for (final Transaction tx : block.getBody().getTransactions()) {
calculations.add(ethScheduler.scheduleComputationTask(tx::getSender));
}
}
return CompletableFuture.completedFuture(blocks);
return CompletableFuture.allOf(calculations.toArray(new CompletableFuture<?>[0]));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

class ParallelExtractTxSignaturesTask<B> extends AbstractPipelinedTask<List<B>, List<B>> {
private static final Logger LOG = LogManager.getLogger();

private final BlockHandler<B> blockHandler;

ParallelExtractTxSignaturesTask(
final BlockHandler<B> blockHandler,
final BlockingQueue<List<B>> inboundQueue,
final int outboundBacklogSize,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(inboundQueue, outboundBacklogSize, ethTasksTimer);
this.blockHandler = blockHandler;
}

@Override
protected Optional<List<B>> processStep(
final List<B> bodies, final Optional<List<B>> previousBodies) {
LOG.trace(
"Calculating fields for transactions between {} to {}",
blockHandler.extractBlockNumber(bodies.get(0)),
blockHandler.extractBlockNumber(bodies.get(bodies.size() - 1)));

try {
blockHandler.executeParallelCalculations(bodies).get();
} catch (final InterruptedException | ExecutionException e) {
result.get().completeExceptionally(e);
return Optional.empty();
}
LOG.debug(
"Calculated fields for transactions between {} to {}",
blockHandler.extractBlockNumber(bodies.get(0)),
blockHandler.extractBlockNumber(bodies.get(bodies.size() - 1)));
return Optional.of(bodies);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,13 @@ protected void executeTask() {
final ParallelDownloadBodiesTask<B> downloadBodiesTask =
new ParallelDownloadBodiesTask<>(
blockHandler, validateHeadersTask.getOutboundQueue(), maxActiveChunks, ethTasksTimer);
final ParallelExtractTxSignaturesTask<B> extractTxSignaturesTask =
new ParallelExtractTxSignaturesTask<>(
blockHandler, downloadBodiesTask.getOutboundQueue(), maxActiveChunks, ethTasksTimer);
final ParallelValidateAndImportBodiesTask<B> validateAndImportBodiesTask =
new ParallelValidateAndImportBodiesTask<>(
blockHandler,
downloadBodiesTask.getOutboundQueue(),
extractTxSignaturesTask.getOutboundQueue(),
Integer.MAX_VALUE,
ethTasksTimer);

Expand All @@ -141,6 +144,9 @@ protected void executeTask() {
final CompletableFuture<?> downloadBodiesFuture =
scheduler.scheduleServiceTask(downloadBodiesTask);
registerSubTask(downloadBodiesFuture);
final CompletableFuture<?> extractTxSignaturesFuture =
scheduler.scheduleServiceTask(extractTxSignaturesTask);
registerSubTask(extractTxSignaturesFuture);
final CompletableFuture<List<List<B>>> validateBodiesFuture =
scheduler.scheduleServiceTask(validateAndImportBodiesTask);
registerSubTask(validateBodiesFuture);
Expand All @@ -149,14 +155,16 @@ protected void executeTask() {
downloadHeadersTask.shutdown();
downloadHeaderFuture.thenRun(validateHeadersTask::shutdown);
validateHeaderFuture.thenRun(downloadBodiesTask::shutdown);
downloadBodiesFuture.thenRun(validateAndImportBodiesTask::shutdown);
downloadBodiesFuture.thenRun(extractTxSignaturesTask::shutdown);
extractTxSignaturesFuture.thenRun(validateAndImportBodiesTask::shutdown);

final BiConsumer<? super Object, ? super Throwable> cancelOnException =
(s, e) -> {
if (e != null && !(e instanceof CancellationException)) {
downloadHeadersTask.cancel();
validateHeadersTask.cancel();
downloadBodiesTask.cancel();
extractTxSignaturesTask.cancel();
validateAndImportBodiesTask.cancel();
result.get().completeExceptionally(e);
}
Expand All @@ -165,6 +173,7 @@ protected void executeTask() {
downloadHeaderFuture.whenComplete(cancelOnException);
validateHeaderFuture.whenComplete(cancelOnException);
downloadBodiesFuture.whenComplete(cancelOnException);
extractTxSignaturesFuture.whenComplete(cancelOnException);
validateBodiesFuture.whenComplete(
(r, e) -> {
if (e != null) {
Expand Down

0 comments on commit 0cf066d

Please sign in to comment.