Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Time all tasks #361

Merged
merged 7 commits into from
Dec 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand All @@ -22,14 +25,21 @@

public abstract class AbstractEthTask<T> implements EthTask<T> {

protected double taskTimeInSec = -1.0D;
protected OperationTimer taskTimer;
protected final AtomicReference<CompletableFuture<T>> result = new AtomicReference<>();
protected volatile Collection<CompletableFuture<?>> subTaskFutures =
new ConcurrentLinkedDeque<>();

protected AbstractEthTask(final LabelledMetric<OperationTimer> ethTasksTimer) {

taskTimer = ethTasksTimer.labels(getClass().getSimpleName());
}

@Override
public final CompletableFuture<T> run() {
if (result.compareAndSet(null, new CompletableFuture<>())) {
executeTask();
executeTaskTimed();
result
.get()
.whenComplete(
Expand Down Expand Up @@ -117,6 +127,17 @@ public final T result() {
/** Execute core task logic. */
protected abstract void executeTask();

/** Executes the task while timed by a timer. */
public void executeTaskTimed() {
final OperationTimer.TimingContext timingContext = taskTimer.startTimer();
executeTask();
taskTimeInSec = timingContext.stopTimer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use the try (final OperationTimer.TimingContext timingContext = taskTimer.startTimer()) or an explicit final block here to ensure the timer completes even if the task fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the final block because close doesn't return a time and we want to keep that for logging at higher levels.

}

public double getTaskTimeInSec() {
return taskTimeInSec;
}

/** Cleanup any resources when task completes. */
protected void cleanup() {
for (final CompletableFuture<?> subTaskFuture : subTaskFutures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.util.Optional;
Expand All @@ -29,8 +31,11 @@ public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
private final int requestCode;
private volatile ResponseStream responseStream;

protected AbstractPeerRequestTask(final EthContext ethContext, final int requestCode) {
super(ethContext);
protected AbstractPeerRequestTask(
final EthContext ethContext,
final int requestCode,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, ethTasksTimer);
this.requestCode = requestCode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.Optional;

public abstract class AbstractPeerTask<R> extends AbstractEthTask<PeerTaskResult<R>> {
protected Optional<EthPeer> assignedPeer = Optional.empty();
protected final EthContext ethContext;

protected AbstractPeerTask(final EthContext ethContext) {
protected AbstractPeerTask(
final EthContext ethContext, final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.ethContext = ethContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
Expand All @@ -37,13 +39,19 @@ public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends
private final EthContext ethContext;
private final int maxRetries;
private int retryCount = 0;
private final LabelledMetric<OperationTimer> ethTasksTimer;

/**
* @param ethContext The context of the current Eth network we are attached to.
* @param maxRetries Maximum number of retries to accept before completing exceptionally.
*/
public AbstractRetryingPeerTask(final EthContext ethContext, final int maxRetries) {
public AbstractRetryingPeerTask(
final EthContext ethContext,
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
this.maxRetries = maxRetries;
}

Expand All @@ -69,7 +77,7 @@ protected void executeTask() {
if (peerResult.size() > 0) {
retryCount = 0;
}
executeTask();
executeTaskTimed();
}
});
}
Expand All @@ -87,13 +95,13 @@ private void handleTaskError(final Throwable error) {
if (cause instanceof NoAvailablePeersException) {
LOG.info("No peers available, wait for peer.");
// Wait for new peer to connect
final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext);
final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, ethTasksTimer);
executeSubTask(
() ->
ethContext
.getScheduler()
.timeout(waitTask, Duration.ofSeconds(5))
.whenComplete((r, t) -> executeTask()));
.whenComplete((r, t) -> executeTaskTimed()));
return;
}

Expand All @@ -104,7 +112,9 @@ private void handleTaskError(final Throwable error) {
// Wait before retrying on failure
executeSubTask(
() ->
ethContext.getScheduler().scheduleFutureTask(this::executeTask, Duration.ofSeconds(1)));
ethContext
.getScheduler()
.scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1)));
}

protected abstract boolean isRetryableError(Throwable error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.uint.UInt256;

Expand All @@ -62,12 +61,12 @@ public class BlockPropagationManager<C> {
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final LabelledMetric<OperationTimer> ethTasksTimer;

private final AtomicBoolean started = new AtomicBoolean(false);

private final Set<Hash> requestedBlocks = new ConcurrentSet<>();
private final PendingBlocks pendingBlocks;
private final OperationTimer announcedBlockIngestTimer;

BlockPropagationManager(
final SynchronizerConfiguration config,
Expand All @@ -76,20 +75,15 @@ public class BlockPropagationManager<C> {
final EthContext ethContext,
final SyncState syncState,
final PendingBlocks pendingBlocks,
final MetricsSystem metricsSystem) {
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;

this.syncState = syncState;
this.pendingBlocks = pendingBlocks;

this.announcedBlockIngestTimer =
metricsSystem.createTimer(
MetricCategory.BLOCKCHAIN,
"pantheon_blockchain_announcedBlock_ingest",
"Time to ingest a single announced block");
}

public void start() {
Expand Down Expand Up @@ -125,7 +119,11 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai
if (!readyForImport.isEmpty()) {
final Supplier<CompletableFuture<List<Block>>> importBlocksTask =
PersistBlockTask.forUnorderedBlocks(
protocolSchedule, protocolContext, readyForImport, HeaderValidationMode.FULL);
protocolSchedule,
protocolContext,
readyForImport,
HeaderValidationMode.FULL,
ethTasksTimer);
ethContext
.getScheduler()
.scheduleSyncWorkerTask(importBlocksTask)
Expand Down Expand Up @@ -225,7 +223,8 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) {
private CompletableFuture<Block> processAnnouncedBlock(
final EthPeer peer, final NewBlockHash newBlock) {
final AbstractPeerTask<Block> getBlockTask =
GetBlockFromPeerTask.create(protocolSchedule, ethContext, newBlock.hash()).assignPeer(peer);
GetBlockFromPeerTask.create(protocolSchedule, ethContext, newBlock.hash(), ethTasksTimer)
.assignPeer(peer);

return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult()));
}
Expand All @@ -251,8 +250,7 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block) {
// Import block
final PersistBlockTask<C> importTask =
PersistBlockTask.create(
protocolSchedule, protocolContext, block, HeaderValidationMode.FULL);
final OperationTimer.TimingContext blockTimer = announcedBlockIngestTimer.startTimer();
protocolSchedule, protocolContext, block, HeaderValidationMode.FULL, ethTasksTimer);
return ethContext
.getScheduler()
.scheduleSyncWorkerTask(importTask::run)
Expand All @@ -265,7 +263,7 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block) {
block.getHeader().getNumber(),
block.getHash());
} else {
final double timeInMs = blockTimer.stopTimer() * 1000;
final double timeInMs = importTask.getTaskTimeInSec() * 1000;
LOG.info(
String.format(
"Successfully imported announced block %d (%s) in %01.3fms.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetHeadersFromPeerByHashTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import org.apache.logging.log4j.Logger;

Expand All @@ -33,29 +35,33 @@ public class ChainHeadTracker implements ConnectCallback {
private final EthContext ethContext;
private final ProtocolSchedule<?> protocolSchedule;
private final TrailingPeerLimiter trailingPeerLimiter;
private final LabelledMetric<OperationTimer> ethTasksTimer;

public ChainHeadTracker(
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final TrailingPeerLimiter trailingPeerLimiter) {
final TrailingPeerLimiter trailingPeerLimiter,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.ethContext = ethContext;
this.protocolSchedule = protocolSchedule;
this.trailingPeerLimiter = trailingPeerLimiter;
this.ethTasksTimer = ethTasksTimer;
}

public static void trackChainHeadForPeers(
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final Blockchain blockchain,
final SynchronizerConfiguration syncConfiguration) {
final SynchronizerConfiguration syncConfiguration,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethContext.getEthPeers(),
blockchain,
syncConfiguration.trailingPeerBlocksBehindThreshold(),
syncConfiguration.maxTrailingPeers());
final ChainHeadTracker tracker =
new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter);
new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter, ethTasksTimer);
ethContext.getEthPeers().subscribeConnect(tracker);
blockchain.observeBlockAdded(trailingPeerLimiter);
}
Expand All @@ -64,7 +70,10 @@ public static void trackChainHeadForPeers(
public void onPeerConnected(final EthPeer peer) {
LOG.debug("Requesting chain head info for {}", peer);
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, Hash.wrap(peer.chainState().getBestBlock().getHash()))
protocolSchedule,
ethContext,
Hash.wrap(peer.chainState().getBestBlock().getHash()),
ethTasksTimer)
.assignPeer(peer)
.run()
.whenComplete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -42,7 +43,7 @@ public DefaultSynchronizer(
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.syncState = syncState;
this.blockPropagationManager =
new BlockPropagationManager<>(
Expand All @@ -52,12 +53,13 @@ public DefaultSynchronizer(
ethContext,
syncState,
new PendingBlocks(),
metricsSystem);
ethTasksTimer);
this.downloader =
new Downloader<>(syncConfig, protocolSchedule, protocolContext, ethContext, syncState);
new Downloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig);
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
if (syncConfig.syncMode().equals(SyncMode.FAST)) {
LOG.info("Fast sync enabled.");
}
Expand Down
Loading