diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java index 93ab606d67..6ef8fa1ece 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java @@ -81,9 +81,12 @@ public SyncTarget setSyncTarget(final EthPeer peer, final BlockHeader commonAnce } public boolean isInSync() { + return isInSync(SYNC_TOLERANCE); + } + + public boolean isInSync(final long syncTolerance) { return syncTarget - .map( - t -> t.estimatedTargetHeight() - blockchain.getChainHeadBlockNumber() <= SYNC_TOLERANCE) + .map(t -> t.estimatedTargetHeight() - blockchain.getChainHeadBlockNumber() <= syncTolerance) .orElse(true); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java index 8c7d6f8ccd..7cdb3efae7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java @@ -25,6 +25,7 @@ import tech.pegasys.pantheon.ethereum.core.AccountFilter; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Transaction; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator; import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator.TransactionInvalidReason; @@ -49,21 +50,25 @@ */ public class TransactionPool implements BlockAddedObserver { private static final Logger LOG = getLogger(); + private static final long SYNC_TOLERANCE = 100L; private final PendingTransactions pendingTransactions; private final ProtocolSchedule protocolSchedule; private final ProtocolContext protocolContext; private final TransactionBatchAddedListener transactionBatchAddedListener; + private final SyncState syncState; private Optional accountFilter = Optional.empty(); public TransactionPool( final PendingTransactions pendingTransactions, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, - final TransactionBatchAddedListener transactionBatchAddedListener) { + final TransactionBatchAddedListener transactionBatchAddedListener, + final SyncState syncState) { this.pendingTransactions = pendingTransactions; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.transactionBatchAddedListener = transactionBatchAddedListener; + this.syncState = syncState; } public List getLocalTransactions() { @@ -88,6 +93,9 @@ public ValidationResult addLocalTransaction( public void addRemoteTransactions(final Collection transactions) { final Set addedTransactions = new HashSet<>(); for (final Transaction transaction : sortByNonce(transactions)) { + if (!syncState.isInSync(SYNC_TOLERANCE)) { + return; + } final ValidationResult validationResult = validateTransaction(transaction); if (validationResult.isValid()) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java index 65539d0ed7..876b3926d3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java @@ -15,6 +15,7 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.metrics.MetricsSystem; @@ -28,7 +29,8 @@ public static TransactionPool createTransactionPool( final EthContext ethContext, final Clock clock, final int maxPendingTransactions, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final SyncState syncState) { final PendingTransactions pendingTransactions = new PendingTransactions(maxPendingTransactions, clock, metricsSystem); @@ -41,7 +43,8 @@ public static TransactionPool createTransactionPool( pendingTransactions, protocolSchedule, protocolContext, - new TransactionSender(transactionTracker, transactionsMessageSender, ethContext)); + new TransactionSender(transactionTracker, transactionsMessageSender, ethContext), + syncState); final TransactionsMessageHandler transactionsMessageHandler = new TransactionsMessageHandler( diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java index 9740789cbd..fe477cfe84 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java @@ -49,6 +49,7 @@ import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage; import tech.pegasys.pantheon.ethereum.eth.messages.StatusMessage; import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPoolFactory; import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; @@ -1054,7 +1055,8 @@ public void transactionMessagesGoToTheCorrectExecutor() { ethManager.ethContext(), TestClock.fixed(), PendingTransactions.MAX_PENDING_TRANSACTIONS, - metricsSystem); + metricsSystem, + mock(SyncState.class)); // Send just a transaction message. final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {}); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java index d6acd931ca..3ebad80906 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java @@ -14,6 +14,9 @@ import static java.util.Collections.singletonList; import static org.assertj.core.util.Preconditions.checkNotNull; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive; @@ -29,6 +32,7 @@ import tech.pegasys.pantheon.ethereum.eth.EthereumWireProtocolConfiguration; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction; import tech.pegasys.pantheon.ethereum.p2p.NetworkRunner; @@ -139,6 +143,10 @@ public TestNode( (connection, reason, initiatedByPeer) -> disconnections.put(connection, reason)); final EthContext ethContext = ethProtocolManager.ethContext(); + + SyncState syncState = mock(SyncState.class); + when(syncState.isInSync(anyLong())).thenReturn(true); + transactionPool = TransactionPoolFactory.createTransactionPool( protocolSchedule, @@ -146,7 +154,8 @@ public TestNode( ethContext, TestClock.fixed(), PendingTransactions.MAX_PENDING_TRANSACTIONS, - metricsSystem); + metricsSystem, + syncState); networkRunner.start(); selfPeer = new DefaultPeer(id(), endpoint()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java index 7640a631ba..057edbb9fa 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java @@ -48,6 +48,7 @@ import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.core.TransactionTestFixture; import tech.pegasys.pantheon.ethereum.core.Wei; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; @@ -85,21 +86,24 @@ public class TransactionPoolTest { new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); private final Transaction transaction1 = createTransaction(1); private final Transaction transaction2 = createTransaction(2); + private final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create(); + private final ProtocolContext protocolContext = executionContext.getProtocolContext(); private TransactionPool transactionPool; private long genesisBlockGasLimit; private final AccountFilter accountFilter = mock(AccountFilter.class); @Before public void setUp() { - final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create(); blockchain = executionContext.getBlockchain(); - final ProtocolContext protocolContext = executionContext.getProtocolContext(); when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec); when(protocolSpec.getTransactionValidator()).thenReturn(transactionValidator); genesisBlockGasLimit = executionContext.getGenesis().getHeader().getGasLimit(); + SyncState syncState = mock(SyncState.class); + when(syncState.isInSync(anyLong())).thenReturn(true); transactionPool = - new TransactionPool(transactions, protocolSchedule, protocolContext, batchAddedListener); + new TransactionPool( + transactions, protocolSchedule, protocolContext, batchAddedListener, syncState); blockchain.observeBlockAdded(transactionPool); } @@ -179,7 +183,7 @@ public void shouldRemovePendingTransactionsFromAllBlocksOnAForkWhenItBecomesTheC } @Test - public void shouldReaddTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs() { + public void shouldReadTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs() { givenTransactionIsValid(transaction1); givenTransactionIsValid(transaction2); transactions.addRemoteTransaction(transaction1); @@ -205,7 +209,7 @@ public void shouldReaddTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs( } @Test - public void shouldNotReaddTransactionsThatAreInBothForksWhenReorgHappens() { + public void shouldNotReadTransactionsThatAreInBothForksWhenReorgHappens() { givenTransactionIsValid(transaction1); givenTransactionIsValid(transaction2); transactions.addRemoteTransaction(transaction1); @@ -424,6 +428,69 @@ public void shouldAllowTransactionWhenAccountWhitelistControllerIsNotPresent() { assertTransactionPending(transaction1); } + @Test + public void shouldRejectRemoteTransactionsWhenNotInSync() { + SyncState syncState = mock(SyncState.class); + when(syncState.isInSync(anyLong())).thenReturn(false); + TransactionPool transactionPool = + new TransactionPool( + transactions, protocolSchedule, protocolContext, batchAddedListener, syncState); + + final TransactionTestFixture builder = new TransactionTestFixture(); + final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1); + final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1); + final Transaction transaction3 = builder.nonce(3).createTransaction(KEY_PAIR1); + + when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid()); + when(transactionValidator.validateForSender( + eq(transaction1), nullable(Account.class), eq(true))) + .thenReturn(valid()); + when(transactionValidator.validateForSender( + eq(transaction2), nullable(Account.class), eq(true))) + .thenReturn(valid()); + when(transactionValidator.validateForSender( + eq(transaction3), nullable(Account.class), eq(true))) + .thenReturn(valid()); + + transactionPool.addRemoteTransactions(asList(transaction3, transaction1, transaction2)); + + assertTransactionNotPending(transaction1); + assertTransactionNotPending(transaction2); + assertTransactionNotPending(transaction3); + verifyZeroInteractions(batchAddedListener); + } + + @Test + public void shouldAllowRemoteTransactionsWhenInSync() { + SyncState syncState = mock(SyncState.class); + when(syncState.isInSync(anyLong())).thenReturn(true); + TransactionPool transactionPool = + new TransactionPool( + transactions, protocolSchedule, protocolContext, batchAddedListener, syncState); + + final TransactionTestFixture builder = new TransactionTestFixture(); + final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1); + final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1); + final Transaction transaction3 = builder.nonce(3).createTransaction(KEY_PAIR1); + + when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid()); + when(transactionValidator.validateForSender( + eq(transaction1), nullable(Account.class), eq(true))) + .thenReturn(valid()); + when(transactionValidator.validateForSender( + eq(transaction2), nullable(Account.class), eq(true))) + .thenReturn(valid()); + when(transactionValidator.validateForSender( + eq(transaction3), nullable(Account.class), eq(true))) + .thenReturn(valid()); + + transactionPool.addRemoteTransactions(asList(transaction3, transaction1, transaction2)); + + assertTransactionPending(transaction1); + assertTransactionPending(transaction2); + assertTransactionPending(transaction3); + } + private void assertTransactionPending(final Transaction t) { assertThat(transactions.getTransactionByHash(t.hash())).contains(t); } diff --git a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java index 2589619bec..430d824b48 100644 --- a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java @@ -16,6 +16,7 @@ import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; import tech.pegasys.pantheon.ethereum.ProtocolContext; @@ -29,6 +30,7 @@ import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.core.Wei; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; @@ -76,6 +78,7 @@ public class EthGetFilterChangesIntegrationTest { private final JsonRpcParameter parameters = new JsonRpcParameter(); private FilterManager filterManager; private EthGetFilterChanges method; + private final SyncState syncState = mock(SyncState.class); @Before public void setUp() { @@ -87,7 +90,8 @@ public void setUp() { transactions, executionContext.getProtocolSchedule(), protocolContext, - batchAddedListener); + batchAddedListener, + syncState); final BlockchainQueries blockchainQueries = new BlockchainQueries(blockchain, protocolContext.getWorldStateArchive()); filterManager = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java index 1243d6b3b5..5c7ff97f65 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java @@ -181,7 +181,8 @@ static PantheonController init( ethProtocolManager.ethContext(), clock, maxPendingTransactions, - metricsSystem); + metricsSystem, + syncState); final ExecutorService minerThreadPool = Executors.newCachedThreadPool(); final CliqueMinerExecutor miningExecutor = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java index 92b5525b09..63a984857e 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java @@ -185,7 +185,8 @@ static PantheonController init( istanbul64ProtocolManager.ethContext(), clock, maxPendingTransactions, - metricsSystem); + metricsSystem, + syncState); return new IbftLegacyPantheonController( protocolSchedule, diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index 556bf21a5e..82a1742625 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -210,7 +210,8 @@ static PantheonController init( ethContext, clock, maxPendingTransactions, - metricsSystem); + metricsSystem, + syncState); final IbftEventQueue ibftEventQueue = new IbftEventQueue(ibftConfig.getMessageQueueLimit()); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java index 2bdd769a20..a17031fc3e 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java @@ -158,7 +158,8 @@ public static PantheonController init( ethProtocolManager.ethContext(), clock, maxPendingTransactions, - metricsSystem); + metricsSystem, + syncState); final ExecutorService minerThreadPool = Executors.newCachedThreadPool(); final EthHashMinerExecutor executor =