diff --git a/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java b/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java index f83e57d63b..ecbd355d51 100644 --- a/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java +++ b/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java @@ -29,8 +29,9 @@ public Istanbul64ProtocolManager( final Blockchain blockchain, final int networkId, final boolean fastSyncEnabled, - final int workers) { - super(blockchain, networkId, fastSyncEnabled, workers); + final int syncWorkers, + final int txWorkers) { + super(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java index 9f9564840a..e77bbc822f 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java @@ -100,7 +100,7 @@ protected final CompletableFuture executeSubTask( */ protected final CompletableFuture executeWorkerSubTask( final EthScheduler scheduler, final Supplier> subTask) { - return executeSubTask(() -> scheduler.scheduleWorkerTask(subTask)); + return executeSubTask(() -> scheduler.scheduleSyncWorkerTask(subTask)); } public final T result() { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java index a8a0a13b79..ff0d8044d8 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java @@ -90,17 +90,24 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final Blockchain blockchain, final int networkId, final boolean fastSyncEnabled, - final int workers, + final int syncWorkers, + final int txWorkers, final int requestLimit) { - this(blockchain, networkId, fastSyncEnabled, requestLimit, new EthScheduler(workers)); + this( + blockchain, + networkId, + fastSyncEnabled, + requestLimit, + new EthScheduler(syncWorkers, txWorkers)); } public EthProtocolManager( final Blockchain blockchain, final int networkId, final boolean fastSyncEnabled, - final int workers) { - this(blockchain, networkId, fastSyncEnabled, workers, DEFAULT_REQUEST_LIMIT); + final int syncWorkers, + final int txWorkers) { + this(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers, DEFAULT_REQUEST_LIMIT); } public EthContext ethContext() { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java index a62c0dedf0..33c05458f7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java @@ -41,13 +41,14 @@ public class EthScheduler { private final AtomicBoolean stopped = new AtomicBoolean(false); private final CountDownLatch shutdown = new CountDownLatch(1); - protected final ExecutorService workerExecutor; + protected final ExecutorService syncWorkerExecutor; protected final ScheduledExecutorService scheduler; + protected final ExecutorService txWorkerExecutor; - EthScheduler(final int workerCount) { + EthScheduler(final int syncWorkerCount, final int txWorkerCount) { this( Executors.newFixedThreadPool( - workerCount, + syncWorkerCount, new ThreadFactoryBuilder() .setNameFormat(EthScheduler.class.getSimpleName() + "-Workers") .build()), @@ -56,19 +57,28 @@ public class EthScheduler { new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat(EthScheduler.class.getSimpleName() + "Timer") + .build()), + Executors.newFixedThreadPool( + txWorkerCount, + new ThreadFactoryBuilder() + .setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions") .build())); } protected EthScheduler( - final ExecutorService workerExecutor, final ScheduledExecutorService scheduler) { - this.workerExecutor = workerExecutor; + final ExecutorService syncWorkerExecutor, + final ScheduledExecutorService scheduler, + final ExecutorService txWorkerExecutor) { + this.syncWorkerExecutor = syncWorkerExecutor; this.scheduler = scheduler; + this.txWorkerExecutor = txWorkerExecutor; } - public CompletableFuture scheduleWorkerTask(final Supplier> future) { + public CompletableFuture scheduleSyncWorkerTask( + final Supplier> future) { final CompletableFuture promise = new CompletableFuture<>(); final Future workerFuture = - workerExecutor.submit( + syncWorkerExecutor.submit( () -> { future .get() @@ -91,8 +101,12 @@ public CompletableFuture scheduleWorkerTask(final Supplier scheduleWorkerTask(final Runnable command) { - return workerExecutor.submit(command); + public Future scheduleSyncWorkerTask(final Runnable command) { + return syncWorkerExecutor.submit(command); + } + + public Future scheduleTxWorkerTask(final Runnable command) { + return txWorkerExecutor.submit(command); } public CompletableFuture scheduleFutureTask( @@ -179,7 +193,7 @@ private CompletableFuture timeout( public void stop() { if (stopped.compareAndSet(false, true)) { LOG.trace("Stopping " + getClass().getSimpleName()); - workerExecutor.shutdown(); + syncWorkerExecutor.shutdown(); scheduler.shutdown(); shutdown.countDown(); } else { @@ -189,10 +203,10 @@ public void stop() { public void awaitStop() throws InterruptedException { shutdown.await(); - if (!workerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { + if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName()); - workerExecutor.shutdownNow(); - workerExecutor.awaitTermination(2L, TimeUnit.MINUTES); + syncWorkerExecutor.shutdownNow(); + syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES); } if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) { LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName()); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 4af2ca3f62..82a1a6c515 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -128,7 +128,7 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai protocolSchedule, protocolContext, readyForImport, HeaderValidationMode.FULL); ethContext .getScheduler() - .scheduleWorkerTask(importBlocksTask) + .scheduleSyncWorkerTask(importBlocksTask) .whenComplete( (r, t) -> { if (r != null) { @@ -255,7 +255,7 @@ CompletableFuture importOrSavePendingBlock(final Block block) { final OperationTimer.TimingContext blockTimer = announcedBlockIngestTimer.startTimer(); return ethContext .getScheduler() - .scheduleWorkerTask(importTask::run) + .scheduleSyncWorkerTask(importTask::run) .whenComplete( (r, t) -> { if (t != null) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 59f4b879ec..0a7fddf092 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -52,6 +52,7 @@ public class SynchronizerConfiguration { private final long trailingPeerBlocksBehindThreshold; private final int maxTrailingPeers; private final int downloaderParallelism; + private final int transactionsParallelism; private SynchronizerConfiguration( final SyncMode requestedSyncMode, @@ -67,7 +68,8 @@ private SynchronizerConfiguration( final int downloaderChainSegmentSize, final long trailingPeerBlocksBehindThreshold, final int maxTrailingPeers, - final int downloaderParallelism) { + final int downloaderParallelism, + final int transactionsParallelism) { this.requestedSyncMode = requestedSyncMode; this.fastSyncPivotDistance = fastSyncPivotDistance; this.fastSyncFullValidationRate = fastSyncFullValidationRate; @@ -82,6 +84,7 @@ private SynchronizerConfiguration( this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold; this.maxTrailingPeers = maxTrailingPeers; this.downloaderParallelism = downloaderParallelism; + this.transactionsParallelism = transactionsParallelism; } /** @@ -122,7 +125,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) { downloaderChainSegmentSize, trailingPeerBlocksBehindThreshold, maxTrailingPeers, - downloaderParallelism); + downloaderParallelism, + transactionsParallelism); } public static Builder builder() { @@ -203,6 +207,10 @@ public int downloaderParallelism() { return downloaderParallelism; } + public int transactionsParallelism() { + return transactionsParallelism; + } + /** * The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all * blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight @@ -228,6 +236,7 @@ public static class Builder { private long trailingPeerBlocksBehindThreshold; private int maxTrailingPeers = Integer.MAX_VALUE; private int downloaderParallelism = 2; + private int transactionsParallelism = 2; public Builder fastSyncPivotDistance(final int distance) { fastSyncPivotDistance = distance; @@ -299,6 +308,11 @@ public Builder downloaderParallelisim(final int downloaderParallelism) { return this; } + public Builder transactionsParallelism(final int transactionsParallelism) { + this.transactionsParallelism = transactionsParallelism; + return this; + } + public SynchronizerConfiguration build() { return new SynchronizerConfiguration( syncMode, @@ -314,7 +328,8 @@ public SynchronizerConfiguration build() { downloaderChainSegmentSize, trailingPeerBlocksBehindThreshold, maxTrailingPeers, - downloaderParallelism); + downloaderParallelism, + transactionsParallelism); } } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionSender.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionSender.java index b559f4c636..1e1be00acf 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionSender.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionSender.java @@ -42,6 +42,6 @@ public void onTransactionsAdded(final Iterable transactions) { transaction -> transactionTracker.addToPeerSendQueue(peer, transaction))); ethContext .getScheduler() - .scheduleWorkerTask(transactionsMessageSender::sendTransactionsToPeers); + .scheduleSyncWorkerTask(transactionsMessageSender::sendTransactionsToPeers); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java index 9b7de06cd1..e802ad5864 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java @@ -32,7 +32,7 @@ public TransactionsMessageHandler( @Override public void exec(final EthMessage message) { final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData()); - scheduler.scheduleWorkerTask( + scheduler.scheduleTxWorkerTask( () -> transactionsMessageProcessor.processTransactionsMessage( message.getPeer(), transactionsMessage)); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java index 9d08b934bb..dea698c3d8 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java @@ -27,18 +27,22 @@ public class DeterministicEthScheduler extends EthScheduler { } DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) { - super(new MockExecutorService(), new MockScheduledExecutor()); + super(new MockExecutorService(), new MockScheduledExecutor(), new MockExecutorService()); this.timeoutPolicy = timeoutPolicy; } - MockExecutorService mockWorkerExecutor() { - return (MockExecutorService) workerExecutor; + MockExecutorService mockSyncWorkerExecutor() { + return (MockExecutorService) syncWorkerExecutor; } MockScheduledExecutor mockScheduledExecutor() { return (MockScheduledExecutor) scheduler; } + MockScheduledExecutor mockTransactionsExecutor() { + return (MockScheduledExecutor) txWorkerExecutor; + } + @Override public void failAfterTimeout(final CompletableFuture promise, final Duration timeout) { if (timeoutPolicy.shouldTimeout()) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java index c6f2cf1fcd..9a87eb882f 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java @@ -13,18 +13,22 @@ package tech.pegasys.pantheon.ethereum.eth.manager; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; +import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockBody; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.eth.EthProtocol; import tech.pegasys.pantheon.ethereum.eth.EthProtocol.EthVersion; @@ -40,12 +44,15 @@ import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage; import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage; import tech.pegasys.pantheon.ethereum.eth.messages.StatusMessage; +import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage; +import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPoolFactory; import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage; +import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; import tech.pegasys.pantheon.util.uint.UInt256; @@ -57,6 +64,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -72,6 +81,7 @@ public final class EthProtocolManagerTest { private static Blockchain blockchain; private static ProtocolSchedule protocolSchedule; private static BlockDataGenerator gen; + private static ProtocolContext protocolContext; @BeforeClass public static void setup() { @@ -80,12 +90,13 @@ public static void setup() { blockchainSetupUtil.importAllBlocks(); blockchain = blockchainSetupUtil.getBlockchain(); protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); + protocolContext = blockchainSetupUtil.getProtocolContext(); assert (blockchainSetupUtil.getMaxBlockNumber() >= 20L); } @Test public void disconnectOnUnsolicitedMessage() { - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final MessageData messageData = BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {}); @@ -96,7 +107,7 @@ public void disconnectOnUnsolicitedMessage() { @Test public void disconnectOnFailureToSendStatusMessage() { - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final MessageData messageData = BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); final MockPeerConnection peer = @@ -108,7 +119,7 @@ public void disconnectOnFailureToSendStatusMessage() { @Test public void disconnectOnWrongChainId() { - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final MessageData messageData = BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); final MockPeerConnection peer = @@ -131,7 +142,7 @@ public void disconnectOnWrongChainId() { @Test public void disconnectOnWrongGenesisHash() { - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final MessageData messageData = BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); final MockPeerConnection peer = @@ -154,7 +165,7 @@ public void disconnectOnWrongGenesisHash() { @Test(expected = ConditionTimeoutException.class) public void doNotDisconnectOnValidMessage() { - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final MessageData messageData = GetBlockBodiesMessage.create(Collections.singletonList(gen.hash())); final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {}); @@ -169,7 +180,7 @@ public void doNotDisconnectOnValidMessage() { @Test public void respondToGetHeaders() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final long startBlock = 5L; final int blockCount = 5; final MessageData messageData = @@ -201,7 +212,7 @@ public void respondToGetHeadersWithinLimits() throws ExecutionException, Interru final CompletableFuture done = new CompletableFuture<>(); final int limit = 5; try (final EthProtocolManager ethManager = - new EthProtocolManager(blockchain, 1, true, 1, limit)) { + new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) { final long startBlock = 5L; final int blockCount = 10; final MessageData messageData = @@ -231,7 +242,7 @@ public void respondToGetHeadersWithinLimits() throws ExecutionException, Interru @Test public void respondToGetHeadersReversed() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final long endBlock = 10L; final int blockCount = 5; final MessageData messageData = GetBlockHeadersMessage.create(endBlock, blockCount, 0, true); @@ -260,7 +271,7 @@ public void respondToGetHeadersReversed() throws ExecutionException, Interrupted @Test public void respondToGetHeadersWithSkip() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final long startBlock = 5L; final int blockCount = 5; final int skip = 1; @@ -292,7 +303,7 @@ public void respondToGetHeadersWithSkip() throws ExecutionException, Interrupted public void respondToGetHeadersReversedWithSkip() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final long endBlock = 10L; final int blockCount = 5; final int skip = 1; @@ -336,7 +347,7 @@ private MockPeerConnection setupPeer( private MockPeerConnection setupPeerWithoutStatusExchange( final EthProtocolManager ethManager, final PeerSendHandler onSend) { - final Set caps = new HashSet<>(Arrays.asList(EthProtocol.ETH63)); + final Set caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); final MockPeerConnection peer = new MockPeerConnection(caps, onSend); ethManager.handleNewConnection(peer); return peer; @@ -345,7 +356,7 @@ private MockPeerConnection setupPeerWithoutStatusExchange( @Test public void respondToGetHeadersPartial() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final long startBlock = blockchain.getChainHeadBlockNumber() - 1L; final int blockCount = 5; final MessageData messageData = @@ -375,7 +386,7 @@ public void respondToGetHeadersPartial() throws ExecutionException, InterruptedE @Test public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final long startBlock = blockchain.getChainHeadBlockNumber() + 1; final int blockCount = 5; final MessageData messageData = @@ -402,7 +413,7 @@ public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedExc @Test public void respondToGetBodies() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { // Setup blocks query final long startBlock = blockchain.getChainHeadBlockNumber() - 5; final int blockCount = 2; @@ -446,7 +457,7 @@ public void respondToGetBodiesWithinLimits() throws ExecutionException, Interrup final CompletableFuture done = new CompletableFuture<>(); final int limit = 5; try (final EthProtocolManager ethManager = - new EthProtocolManager(blockchain, 1, true, 1, limit)) { + new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) { // Setup blocks query final int blockCount = 10; final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount; @@ -488,7 +499,7 @@ public void respondToGetBodiesWithinLimits() throws ExecutionException, Interrup @Test public void respondToGetBodiesPartial() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { // Setup blocks query final long expectedBlockNumber = blockchain.getChainHeadBlockNumber() - 1; final BlockHeader header = blockchain.getBlockHeader(expectedBlockNumber).get(); @@ -524,7 +535,7 @@ public void respondToGetBodiesPartial() throws ExecutionException, InterruptedEx @Test public void respondToGetReceipts() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { // Setup blocks query final long startBlock = blockchain.getChainHeadBlockNumber() - 5; final int blockCount = 2; @@ -567,7 +578,7 @@ public void respondToGetReceiptsWithinLimits() throws ExecutionException, Interr final CompletableFuture done = new CompletableFuture<>(); final int limit = 5; try (final EthProtocolManager ethManager = - new EthProtocolManager(blockchain, 1, true, 1, limit)) { + new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) { // Setup blocks query final int blockCount = 10; final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount; @@ -608,7 +619,7 @@ public void respondToGetReceiptsWithinLimits() throws ExecutionException, Interr @Test public void respondToGetReceiptsPartial() throws ExecutionException, InterruptedException { final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { // Setup blocks query final long blockNumber = blockchain.getChainHeadBlockNumber() - 5; final BlockHeader header = blockchain.getBlockHeader(blockNumber).get(); @@ -643,7 +654,7 @@ public void respondToGetReceiptsPartial() throws ExecutionException, Interrupted @Test public void newBlockMinedSendsNewBlockMessageToAllPeers() { - final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1); + final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1); // Define handler to validate response final PeerSendHandler onSend = mock(PeerSendHandler.class); @@ -705,7 +716,7 @@ public void shouldSuccessfullyRespondToGetHeadersRequestLessThanZero() blockchain.appendBlock(block, receipts); final CompletableFuture done = new CompletableFuture<>(); - try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { + try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) { final long startBlock = 1L; final int requestedBlockCount = 13; final int receivedBlockCount = 2; @@ -728,7 +739,7 @@ public void shouldSuccessfullyRespondToGetHeadersRequestLessThanZero() done.complete(null); }; - final Set caps = new HashSet<>(Arrays.asList(EthProtocol.ETH63)); + final Set caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); final MockPeerConnection peer = new MockPeerConnection(caps, onSend); ethManager.handleNewConnection(peer); final StatusMessage statusMessage = @@ -744,4 +755,38 @@ public void shouldSuccessfullyRespondToGetHeadersRequestLessThanZero() done.get(); } } + + @Test + public void transactionMessagesGoToTheCorrectExecutor() { + // Create a mock ethScheduler to hold our mock executors. + final ExecutorService worker = mock(ExecutorService.class); + final ScheduledExecutorService scheduled = mock(ScheduledExecutorService.class); + final ExecutorService transactions = mock(ExecutorService.class); + final EthScheduler ethScheduler = new EthScheduler(worker, scheduled, transactions); + + // Create the fake TransactionMessage to feed to the EthManager. + final BlockDataGenerator gen = new BlockDataGenerator(1); + final List txes = Collections.singletonList(gen.transaction()); + final MessageData initialMessage = TransactionsMessage.create(txes); + final MessageData raw = new RawMessage(EthPV62.TRANSACTIONS, initialMessage.getData()); + final TransactionsMessage transactionMessage = TransactionsMessage.readFrom(raw); + + try (final EthProtocolManager ethManager = + new EthProtocolManager(blockchain, 1, true, 1, ethScheduler)) { + + // Create a transaction pool. This has a side effect of registring a listener for the + // transactions message. + TransactionPoolFactory.createTransactionPool( + protocolSchedule, protocolContext, ethManager.ethContext()); + + // Send just a transaction message. + final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {}); + ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(peer, transactionMessage)); + + // Verify the regular message executor and scheduled executor got nothing to execute. + verifyZeroInteractions(worker, scheduled); + // Verify our transactions executor got something to execute. + verify(transactions).submit((Runnable) any()); + } + } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java index 30ea9bbf70..2ed3a8726e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java @@ -31,7 +31,7 @@ public class EthSchedulerTest { private DeterministicEthScheduler ethScheduler; - private MockExecutorService workerExecutor; + private MockExecutorService syncWorkerExecutor; private MockScheduledExecutor scheduledExecutor; private AtomicBoolean shouldTimeout; @@ -39,14 +39,14 @@ public class EthSchedulerTest { public void setup() { shouldTimeout = new AtomicBoolean(false); ethScheduler = new DeterministicEthScheduler(shouldTimeout::get); - workerExecutor = ethScheduler.mockWorkerExecutor(); + syncWorkerExecutor = ethScheduler.mockSyncWorkerExecutor(); scheduledExecutor = ethScheduler.mockScheduledExecutor(); } @Test public void scheduleWorkerTask_completesWhenScheduledTaskCompletes() { final CompletableFuture future = new CompletableFuture<>(); - final CompletableFuture result = ethScheduler.scheduleWorkerTask(() -> future); + final CompletableFuture result = ethScheduler.scheduleSyncWorkerTask(() -> future); assertThat(result.isDone()).isFalse(); future.complete("bla"); @@ -58,7 +58,7 @@ public void scheduleWorkerTask_completesWhenScheduledTaskCompletes() { @Test public void scheduleWorkerTask_completesWhenScheduledTaskFails() { final CompletableFuture future = new CompletableFuture<>(); - final CompletableFuture result = ethScheduler.scheduleWorkerTask(() -> future); + final CompletableFuture result = ethScheduler.scheduleSyncWorkerTask(() -> future); assertThat(result.isDone()).isFalse(); future.completeExceptionally(new RuntimeException("whoops")); @@ -70,7 +70,7 @@ public void scheduleWorkerTask_completesWhenScheduledTaskFails() { @Test public void scheduleWorkerTask_completesWhenScheduledTaskIsCancelled() { final CompletableFuture future = new CompletableFuture<>(); - final CompletableFuture result = ethScheduler.scheduleWorkerTask(() -> future); + final CompletableFuture result = ethScheduler.scheduleSyncWorkerTask(() -> future); assertThat(result.isDone()).isFalse(); future.cancel(false); @@ -82,10 +82,10 @@ public void scheduleWorkerTask_completesWhenScheduledTaskIsCancelled() { @Test public void scheduleWorkerTask_cancelsScheduledFutureWhenResultIsCancelled() { final CompletableFuture result = - ethScheduler.scheduleWorkerTask(() -> new CompletableFuture<>()); + ethScheduler.scheduleSyncWorkerTask(() -> new CompletableFuture<>()); - assertThat(workerExecutor.getScheduledFutures().size()).isEqualTo(1); - final Future future = workerExecutor.getScheduledFutures().get(0); + assertThat(syncWorkerExecutor.getScheduledFutures().size()).isEqualTo(1); + final Future future = syncWorkerExecutor.getScheduledFutures().get(0); verify(future, times(0)).cancel(anyBoolean()); result.cancel(true); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java index 6b77b98ca9..9f424e570f 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java @@ -99,7 +99,8 @@ public TestNode( genesisState.writeStateTo(worldStateArchive.getMutable()); final ProtocolContext protocolContext = new ProtocolContext<>(blockchain, worldStateArchive, null); - final EthProtocolManager ethProtocolManager = new EthProtocolManager(blockchain, 1, false, 1); + final EthProtocolManager ethProtocolManager = + new EthProtocolManager(blockchain, 1, false, 1, 1); final NetworkRunner networkRunner = NetworkRunner.builder() diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java index 28c44cbf9e..dd5959b64c 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java @@ -144,7 +144,8 @@ public static PantheonController init( protocolContext.getBlockchain(), networkId, fastSyncEnabled, - syncConfig.downloaderParallelism()); + syncConfig.downloaderParallelism(), + syncConfig.transactionsParallelism()); final SyncState syncState = new SyncState( protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers()); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java index 37a8893517..c600d5f591 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java @@ -144,7 +144,8 @@ public static PantheonController init( protocolContext.getBlockchain(), networkId, fastSyncEnabled, - syncConfig.downloaderParallelism()); + syncConfig.downloaderParallelism(), + syncConfig.transactionsParallelism()); } else { ethSubProtocol = EthProtocol.get(); ethProtocolManager = @@ -152,7 +153,8 @@ public static PantheonController init( protocolContext.getBlockchain(), networkId, fastSyncEnabled, - syncConfig.downloaderParallelism()); + syncConfig.downloaderParallelism(), + syncConfig.transactionsParallelism()); } final SyncState syncState = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index c3b12e78b7..5a679e74cf 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -150,7 +150,8 @@ public static PantheonController init( protocolContext.getBlockchain(), networkId, fastSyncEnabled, - syncConfig.downloaderParallelism()); + syncConfig.downloaderParallelism(), + syncConfig.transactionsParallelism()); final SubProtocol ethSubProtocol = EthProtocol.get(); final SyncState syncState = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java index 90e135620d..190bc62c8c 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java @@ -117,7 +117,8 @@ public static PantheonController init( .getChainId() .orElse(MainnetProtocolSchedule.DEFAULT_CHAIN_ID), fastSyncEnabled, - syncConfig.downloaderParallelism()); + syncConfig.downloaderParallelism(), + syncConfig.transactionsParallelism()); final SyncState syncState = new SyncState( protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers());