Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Parallel downloader should stop on puts if requested. #927

Merged
merged 2 commits into from
Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager.task;

import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

Expand All @@ -28,61 +26,69 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class AbstractPipelinedPeerTask<I, O> extends AbstractPeerTask<List<O>> {
public abstract class AbstractPipelinedTask<I, O> extends AbstractEthTask<List<O>> {
private static final Logger LOG = LogManager.getLogger();

static final int TIMEOUT_MS = 1000;

private BlockingQueue<I> inboundQueue;
private BlockingQueue<O> outboundQueue;
private List<O> results;
private final BlockingQueue<I> inboundQueue;
private final BlockingQueue<O> outboundQueue;
private final List<O> results;

private boolean shuttingDown = false;
private AtomicReference<Throwable> processingException = new AtomicReference<>(null);
private final AtomicReference<Throwable> processingException = new AtomicReference<>(null);

protected AbstractPipelinedPeerTask(
protected AbstractPipelinedTask(
final BlockingQueue<I> inboundQueue,
final int outboundBacklogSize,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, ethTasksTimer);
super(ethTasksTimer);
this.inboundQueue = inboundQueue;
outboundQueue = new LinkedBlockingQueue<>(outboundBacklogSize);
results = new ArrayList<>();
}

@Override
protected void executeTaskWithPeer(final EthPeer peer) {
protected void executeTask() {
Optional<I> previousInput = Optional.empty();
while (!isDone() && processingException.get() == null) {
if (shuttingDown && inboundQueue.isEmpty()) {
break;
}
final I input;
try {
input = inboundQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (input == null) {
// timed out waiting for a result
try {
while (!isDone() && processingException.get() == null) {
if (shuttingDown && inboundQueue.isEmpty()) {
break;
}
final I input;
try {
input = inboundQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (input == null) {
// timed out waiting for a result
continue;
}
} catch (final InterruptedException e) {
// this is expected
continue;
}
} catch (final InterruptedException e) {
// this is expected
continue;
final Optional<O> output = processStep(input, previousInput);
output.ifPresent(
o -> {
while (!isDone()) {
try {
if (outboundQueue.offer(o, 1, TimeUnit.SECONDS)) {
results.add(o);
break;
}
} catch (final InterruptedException e) {
processingException.compareAndSet(null, e);
break;
}
}
});
previousInput = Optional.of(input);
}
final Optional<O> output = processStep(input, previousInput, peer);
output.ifPresent(
o -> {
try {
outboundQueue.put(o);
} catch (final InterruptedException e) {
processingException.compareAndSet(null, e);
}
results.add(o);
});
previousInput = Optional.of(input);
} catch (final RuntimeException e) {
processingException.compareAndSet(null, e);
}
if (processingException.get() == null) {
result.get().complete(new PeerTaskResult<>(peer, results));
result.get().complete(results);
} else {
result.get().completeExceptionally(processingException.get());
}
Expand All @@ -105,5 +111,5 @@ protected void failExceptionally(final Throwable t) {
cancel();
}

protected abstract Optional<O> processStep(I input, Optional<I> previousInput, EthPeer peer);
protected abstract Optional<O> processStep(I input, Optional<I> previousInput);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
Expand All @@ -29,7 +27,7 @@
import org.apache.logging.log4j.Logger;

public class ParallelDownloadBodiesTask<B>
extends AbstractPipelinedPeerTask<List<BlockHeader>, List<B>> {
extends AbstractPipelinedTask<List<BlockHeader>, List<B>> {
private static final Logger LOG = LogManager.getLogger();

private final BlockHandler<B> blockHandler;
Expand All @@ -38,18 +36,15 @@ public class ParallelDownloadBodiesTask<B>
final BlockHandler<B> blockHandler,
final BlockingQueue<List<BlockHeader>> inboundQueue,
final int outboundBacklogSize,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer);
super(inboundQueue, outboundBacklogSize, ethTasksTimer);

this.blockHandler = blockHandler;
}

@Override
protected Optional<List<B>> processStep(
final List<BlockHeader> headers,
final Optional<List<BlockHeader>> previousHeaders,
final EthPeer peer) {
final List<BlockHeader> headers, final Optional<List<BlockHeader>> previousHeaders) {
LOG.trace(
"Downloading bodies {} to {}",
headers.get(0).getNumber(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
Expand All @@ -32,11 +31,12 @@
import org.apache.logging.log4j.Logger;

public class ParallelDownloadHeadersTask<C>
extends AbstractPipelinedPeerTask<BlockHeader, List<BlockHeader>> {
extends AbstractPipelinedTask<BlockHeader, List<BlockHeader>> {
private static final Logger LOG = LogManager.getLogger();

private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;

ParallelDownloadHeadersTask(
final BlockingQueue<BlockHeader> inboundQueue,
Expand All @@ -45,17 +45,17 @@ public class ParallelDownloadHeadersTask<C>
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer);
super(inboundQueue, outboundBacklogSize, ethTasksTimer);

this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
}

@Override
protected Optional<List<BlockHeader>> processStep(
final BlockHeader nextCheckpointHeader,
final Optional<BlockHeader> previousCheckpointHeader,
final EthPeer peer) {
final Optional<BlockHeader> previousCheckpointHeader) {
if (!previousCheckpointHeader.isPresent()) {
return Optional.empty();
}
Expand All @@ -73,7 +73,6 @@ protected Optional<List<BlockHeader>> processStep(
nextCheckpointHeader,
segmentLength,
ethTasksTimer);
downloadTask.assignPeer(peer);
final CompletableFuture<List<BlockHeader>> headerFuture = executeSubTask(downloadTask::run);

final List<BlockHeader> headers = Lists.newArrayList(previousCheckpointHeader.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand Down Expand Up @@ -120,21 +119,15 @@ protected void executeTask() {
maxActiveChunks,
protocolSchedule,
protocolContext,
ethContext,
ethTasksTimer);
final ParallelDownloadBodiesTask<B> downloadBodiesTask =
new ParallelDownloadBodiesTask<>(
blockHandler,
validateHeadersTask.getOutboundQueue(),
maxActiveChunks,
ethContext,
ethTasksTimer);
blockHandler, validateHeadersTask.getOutboundQueue(), maxActiveChunks, ethTasksTimer);
final ParallelValidateAndImportBodiesTask<B> validateAndImportBodiesTask =
new ParallelValidateAndImportBodiesTask<>(
blockHandler,
downloadBodiesTask.getOutboundQueue(),
Integer.MAX_VALUE,
ethContext,
ethTasksTimer);

// Start the pipeline.
Expand All @@ -148,15 +141,15 @@ protected void executeTask() {
final CompletableFuture<?> downloadBodiesFuture =
scheduler.scheduleServiceTask(downloadBodiesTask);
registerSubTask(downloadBodiesFuture);
final CompletableFuture<AbstractPeerTask.PeerTaskResult<List<List<B>>>> validateBodiesFuture =
final CompletableFuture<List<List<B>>> validateBodiesFuture =
scheduler.scheduleServiceTask(validateAndImportBodiesTask);
registerSubTask(validateBodiesFuture);

// Hook in pipeline completion signaling.
downloadHeadersTask.shutdown();
downloadHeaderFuture.thenRun(() -> validateHeadersTask.shutdown());
validateHeaderFuture.thenRun(() -> downloadBodiesTask.shutdown());
downloadBodiesFuture.thenRun(() -> validateAndImportBodiesTask.shutdown());
downloadHeaderFuture.thenRun(validateHeadersTask::shutdown);
validateHeaderFuture.thenRun(downloadBodiesTask::shutdown);
downloadBodiesFuture.thenRun(validateAndImportBodiesTask::shutdown);

final BiConsumer<? super Object, ? super Throwable> cancelOnException =
(s, e) -> {
Expand All @@ -179,7 +172,7 @@ protected void executeTask() {
} else if (r != null) {
try {
final List<B> importedBlocks =
validateBodiesFuture.get().getResult().stream()
validateBodiesFuture.get().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
result.get().complete(importedBlocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
Expand All @@ -29,7 +27,7 @@
import org.apache.logging.log4j.Logger;

public class ParallelValidateAndImportBodiesTask<B>
extends AbstractPipelinedPeerTask<List<B>, List<B>> {
extends AbstractPipelinedTask<List<B>, List<B>> {
private static final Logger LOG = LogManager.getLogger();

private final BlockHandler<B> blockHandler;
Expand All @@ -38,16 +36,15 @@ public class ParallelValidateAndImportBodiesTask<B>
final BlockHandler<B> blockHandler,
final BlockingQueue<List<B>> inboundQueue,
final int outboundBacklogSize,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer);
super(inboundQueue, outboundBacklogSize, ethTasksTimer);

this.blockHandler = blockHandler;
}

@Override
protected Optional<List<B>> processStep(
final List<B> blocks, final Optional<List<B>> previousBlocks, final EthPeer peer) {
final List<B> blocks, final Optional<List<B>> previousBlocks) {
final long firstBlock = blockHandler.extractBlockNumber(blocks.get(0));
final long lastBlock = blockHandler.extractBlockNumber(blocks.get(blocks.size() - 1));
LOG.debug("Starting import of chain segment {} to {}", firstBlock, lastBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
Expand All @@ -33,7 +31,7 @@
import org.apache.logging.log4j.Logger;

public class ParallelValidateHeadersTask<C>
extends AbstractPipelinedPeerTask<List<BlockHeader>, List<BlockHeader>> {
extends AbstractPipelinedTask<List<BlockHeader>, List<BlockHeader>> {
private static final Logger LOG = LogManager.getLogger();

private final ProtocolSchedule<C> protocolSchedule;
Expand All @@ -46,9 +44,8 @@ public class ParallelValidateHeadersTask<C>
final int outboundBacklogSize,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer);
super(inboundQueue, outboundBacklogSize, ethTasksTimer);

this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
Expand All @@ -57,9 +54,7 @@ public class ParallelValidateHeadersTask<C>

@Override
protected Optional<List<BlockHeader>> processStep(
final List<BlockHeader> headers,
final Optional<List<BlockHeader>> previousHeaders,
final EthPeer peer) {
final List<BlockHeader> headers, final Optional<List<BlockHeader>> previousHeaders) {
LOG.debug(
"Validating Headers {} to {}",
headers.get(0).getNumber(),
Expand Down