Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

NC-1880 High TX volume swamps block processing #337

Merged
merged 5 commits into from
Dec 3, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
* name change
* Allow txWorkerExecutor threads to be independently configured
  * push the above to the config and across the tests
  • Loading branch information
shemnon committed Nov 30, 2018
commit 7b858dad2e5a06abc0037d4d9bf80dffd8ac4fe6
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public Istanbul64ProtocolManager(
final Blockchain blockchain,
final int networkId,
final boolean fastSyncEnabled,
final int workers) {
super(blockchain, networkId, fastSyncEnabled, workers);
final int syncWorkers,
final int txWorkers) {
super(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,24 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final Blockchain blockchain,
final int networkId,
final boolean fastSyncEnabled,
final int workers,
final int syncWorkers,
final int txWorkers,
final int requestLimit) {
this(blockchain, networkId, fastSyncEnabled, requestLimit, new EthScheduler(workers));
this(
blockchain,
networkId,
fastSyncEnabled,
requestLimit,
new EthScheduler(syncWorkers, txWorkers));
}

public EthProtocolManager(
final Blockchain blockchain,
final int networkId,
final boolean fastSyncEnabled,
final int workers) {
this(blockchain, networkId, fastSyncEnabled, workers, DEFAULT_REQUEST_LIMIT);
final int syncWorkers,
final int txWorkers) {
this(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers, DEFAULT_REQUEST_LIMIT);
}

public EthContext ethContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ public class EthScheduler {
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final CountDownLatch shutdown = new CountDownLatch(1);

protected final ExecutorService workerExecutor;
protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService transactionsExecutor;
protected final ExecutorService txWorkerExecutor;

EthScheduler(final int workerCount) {
EthScheduler(final int syncWorkerCount, final int txWorkerCount) {
this(
Executors.newFixedThreadPool(
workerCount,
syncWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Workers")
.build()),
Expand All @@ -59,25 +59,25 @@ public class EthScheduler {
.setNameFormat(EthScheduler.class.getSimpleName() + "Timer")
.build()),
Executors.newFixedThreadPool(
workerCount,
txWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions")
.build()));
}

protected EthScheduler(
final ExecutorService workerExecutor,
final ExecutorService syncWorkerExecutor,
final ScheduledExecutorService scheduler,
final ExecutorService transactionExecutor) {
this.workerExecutor = workerExecutor;
final ExecutorService txWorkerExecutor) {
this.syncWorkerExecutor = syncWorkerExecutor;
this.scheduler = scheduler;
this.transactionsExecutor = transactionExecutor;
this.txWorkerExecutor = txWorkerExecutor;
}

public <T> CompletableFuture<T> scheduleWorkerTask(final Supplier<CompletableFuture<T>> future) {
final CompletableFuture<T> promise = new CompletableFuture<>();
final Future<?> workerFuture =
workerExecutor.submit(
syncWorkerExecutor.submit(
() -> {
future
.get()
Expand All @@ -101,11 +101,11 @@ public <T> CompletableFuture<T> scheduleWorkerTask(final Supplier<CompletableFut
}

public Future<?> scheduleWorkerTask(final Runnable command) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduleSyncWorkerTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Also scheduleTxWorkerTask, so both align with the field name.

return workerExecutor.submit(command);
return syncWorkerExecutor.submit(command);
}

public Future<?> scheduleTransactionTask(final Runnable command) {
return transactionsExecutor.submit(command);
return txWorkerExecutor.submit(command);
}

public CompletableFuture<Void> scheduleFutureTask(
Expand Down Expand Up @@ -192,7 +192,7 @@ private <T> CompletableFuture<T> timeout(
public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.trace("Stopping " + getClass().getSimpleName());
workerExecutor.shutdown();
syncWorkerExecutor.shutdown();
scheduler.shutdown();
shutdown.countDown();
} else {
Expand All @@ -202,10 +202,10 @@ public void stop() {

public void awaitStop() throws InterruptedException {
shutdown.await();
if (!workerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
workerExecutor.shutdownNow();
workerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
syncWorkerExecutor.shutdownNow();
syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class SynchronizerConfiguration {
private final long trailingPeerBlocksBehindThreshold;
private final int maxTrailingPeers;
private final int downloaderParallelism;
private final int transactionsParallelism;

private SynchronizerConfiguration(
final SyncMode requestedSyncMode,
Expand All @@ -67,7 +68,8 @@ private SynchronizerConfiguration(
final int downloaderChainSegmentSize,
final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers,
final int downloaderParallelism) {
final int downloaderParallelism,
final int transactionsParallelism) {
this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
Expand All @@ -82,6 +84,7 @@ private SynchronizerConfiguration(
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
this.maxTrailingPeers = maxTrailingPeers;
this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism;
}

/**
Expand Down Expand Up @@ -122,7 +125,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) {
downloaderChainSegmentSize,
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism);
downloaderParallelism,
transactionsParallelism);
}

public static Builder builder() {
Expand Down Expand Up @@ -203,6 +207,10 @@ public int downloaderParallelism() {
return downloaderParallelism;
}

public int transactionsParallelism() {
return transactionsParallelism;
}

/**
* The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all
* blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight
Expand All @@ -228,6 +236,7 @@ public static class Builder {
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
private int transactionsParallelism = 2;

public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
Expand Down Expand Up @@ -299,6 +308,11 @@ public Builder downloaderParallelisim(final int downloaderParallelism) {
return this;
}

public Builder transactionsParallelism(final int transactionsParallelism) {
this.transactionsParallelism = transactionsParallelism;
return this;
}

public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
syncMode,
Expand All @@ -314,7 +328,8 @@ public SynchronizerConfiguration build() {
downloaderChainSegmentSize,
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism);
downloaderParallelism,
transactionsParallelism);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ public class DeterministicEthScheduler extends EthScheduler {
this.timeoutPolicy = timeoutPolicy;
}

MockExecutorService mockWorkerExecutor() {
return (MockExecutorService) workerExecutor;
MockExecutorService mockSyncWorkerExecutor() {
return (MockExecutorService) syncWorkerExecutor;
}

MockScheduledExecutor mockScheduledExecutor() {
return (MockScheduledExecutor) scheduler;
}

MockScheduledExecutor mockTransactionsExecutor() {
return (MockScheduledExecutor) transactionsExecutor;
return (MockScheduledExecutor) txWorkerExecutor;
}

@Override
Expand Down
Loading