Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

EthScheduler additions #767

Merged
merged 5 commits into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@ public Istanbul64ProtocolManager(
final int networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers) {
super(blockchain, worldStateArchive, networkId, fastSyncEnabled, syncWorkers, txWorkers);
final int txWorkers,
final int computationWorkers) {
super(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,15 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final int requestLimit) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
requestLimit,
new EthScheduler(syncWorkers, txWorkers));
new EthScheduler(syncWorkers, txWorkers, computationWorkers));
}

public EthProtocolManager(
Expand All @@ -111,14 +112,16 @@ public EthProtocolManager(
final int networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers) {
final int txWorkers,
final int computationWorkers) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers,
DEFAULT_REQUEST_LIMIT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ public class EthScheduler {
protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService txWorkerExecutor;
private final ExecutorService servicesExecutor;
private final ExecutorService computationExecutor;

EthScheduler(final int syncWorkerCount, final int txWorkerCount) {
EthScheduler(
final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) {
this(
Executors.newFixedThreadPool(
syncWorkerCount,
Expand All @@ -62,16 +65,29 @@ public class EthScheduler {
txWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions-%d")
.build()),
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Services-%d")
.build()),
Executors.newFixedThreadPool(
computationWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Computation-%d")
.build()));
}

protected EthScheduler(
final ExecutorService syncWorkerExecutor,
final ScheduledExecutorService scheduler,
final ExecutorService txWorkerExecutor) {
final ExecutorService txWorkerExecutor,
final ExecutorService servicesExecutor,
final ExecutorService computationExecutor) {
this.syncWorkerExecutor = syncWorkerExecutor;
this.scheduler = scheduler;
this.txWorkerExecutor = txWorkerExecutor;
this.servicesExecutor = servicesExecutor;
this.computationExecutor = computationExecutor;
}

public <T> CompletableFuture<T> scheduleSyncWorkerTask(
Expand Down Expand Up @@ -101,12 +117,20 @@ public <T> CompletableFuture<T> scheduleSyncWorkerTask(
return promise;
}

public Future<?> scheduleSyncWorkerTask(final Runnable command) {
return syncWorkerExecutor.submit(command);
public void scheduleSyncWorkerTask(final Runnable command) {
syncWorkerExecutor.submit(command);
}

public void scheduleTxWorkerTask(final Runnable command) {
txWorkerExecutor.submit(command);
}

CompletableFuture<Void> scheduleServiceTask(final Runnable service) {
return CompletableFuture.runAsync(service, servicesExecutor);
}

public Future<?> scheduleTxWorkerTask(final Runnable command) {
return txWorkerExecutor.submit(command);
<T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
return CompletableFuture.supplyAsync(computation, computationExecutor);
}

public CompletableFuture<Void> scheduleFutureTask(
Expand Down Expand Up @@ -194,25 +218,46 @@ public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.trace("Stopping " + getClass().getSimpleName());
syncWorkerExecutor.shutdown();
txWorkerExecutor.shutdown();
scheduler.shutdown();
servicesExecutor.shutdown();
computationExecutor.shutdown();
shutdown.countDown();
} else {
LOG.trace("Attempted to stop already stopped " + getClass().getSimpleName());
}
}

public void awaitStop() throws InterruptedException {
void awaitStop() throws InterruptedException {
shutdown.await();
if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
syncWorkerExecutor.shutdownNow();
syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error(
"{} transaction worker executor did not shutdown cleanly.",
this.getClass().getSimpleName());
txWorkerExecutor.shutdownNow();
txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());
scheduler.shutdownNow();
scheduler.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} services executor did not shutdown cleanly.", this.getClass().getSimpleName());
servicesExecutor.shutdownNow();
servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!computationExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error(
"{} computation executor did not shutdown cleanly.", this.getClass().getSimpleName());
computationExecutor.shutdownNow();
computationExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is starting to add up to a lot of waiting. Maybe we should set it up so that we call shutdown on all of them, wait then shutdownNow on any not shutdown, then wait and log any that still failed to shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like a good first issue - https://github.com/PegaSysEng/pantheon/issues/768

LOG.trace("{} stopped.", this.getClass().getSimpleName());
}

Expand All @@ -222,7 +267,7 @@ private <T> CompletableFuture<T> failAfterTimeout(final Duration timeout) {
return promise;
}

public <T> void failAfterTimeout(final CompletableFuture<T> promise) {
<T> void failAfterTimeout(final CompletableFuture<T> promise) {
failAfterTimeout(promise, defaultTimeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public class SynchronizerConfiguration {
private static final Logger LOG = LogManager.getLogger();

// TODO: Determine reasonable defaults here
public static int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500;
public static float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
public static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500;
public static final float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5);
private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 200;
private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10;
Expand Down Expand Up @@ -62,6 +62,7 @@ public class SynchronizerConfiguration {
private final int maxTrailingPeers;
private final int downloaderParallelism;
private final int transactionsParallelism;
private final int computationParallelism;

private SynchronizerConfiguration(
final SyncMode requestedSyncMode,
Expand All @@ -82,7 +83,8 @@ private SynchronizerConfiguration(
final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers,
final int downloaderParallelism,
final int transactionsParallelism) {
final int transactionsParallelism,
final int computationParallelism) {
this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
Expand All @@ -102,6 +104,7 @@ private SynchronizerConfiguration(
this.maxTrailingPeers = maxTrailingPeers;
this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism;
this.computationParallelism = computationParallelism;
}

/**
Expand Down Expand Up @@ -147,7 +150,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) {
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism,
transactionsParallelism);
transactionsParallelism,
computationParallelism);
}

public static Builder builder() {
Expand Down Expand Up @@ -232,6 +236,10 @@ public int transactionsParallelism() {
return transactionsParallelism;
}

public int computationParallelism() {
return computationParallelism;
}

/**
* The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all
* blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight
Expand Down Expand Up @@ -274,6 +282,7 @@ public static class Builder {
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
private int transactionsParallelism = 2;
private int computationParallelism = Runtime.getRuntime().availableProcessors();

public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
Expand Down Expand Up @@ -350,6 +359,11 @@ public Builder transactionsParallelism(final int transactionsParallelism) {
return this;
}

public Builder computationParallelism(final int computationParallelism) {
this.computationParallelism = computationParallelism;
return this;
}

public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
syncMode,
Expand All @@ -370,7 +384,8 @@ public SynchronizerConfiguration build() {
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism,
transactionsParallelism);
transactionsParallelism,
computationParallelism);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ public DeterministicEthScheduler() {
}

public DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) {
super(new MockExecutorService(), new MockScheduledExecutor(), new MockExecutorService());
super(
new MockExecutorService(),
new MockScheduledExecutor(),
new MockExecutorService(),
new MockExecutorService(),
new MockExecutorService());
this.timeoutPolicy = timeoutPolicy;
}

Expand Down
Loading