diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PeerTransactionTracker.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PeerTransactionTracker.java index 92f010619f..887c4b30e4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PeerTransactionTracker.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PeerTransactionTracker.java @@ -26,7 +26,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -class PeerTransactionTracker implements DisconnectCallback { +public class PeerTransactionTracker implements DisconnectCallback { private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 10_000; private final Map> seenTransactions = new ConcurrentHashMap<>(); private final Map> transactionsToSend = new ConcurrentHashMap<>(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java index 7cdb3efae7..07fdf82d4d 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java @@ -25,6 +25,8 @@ import tech.pegasys.pantheon.ethereum.core.AccountFilter; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Transaction; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator; @@ -57,18 +59,31 @@ public class TransactionPool implements BlockAddedObserver { private final TransactionBatchAddedListener transactionBatchAddedListener; private final SyncState syncState; private Optional accountFilter = Optional.empty(); + private final PeerTransactionTracker peerTransactionTracker; public TransactionPool( final PendingTransactions pendingTransactions, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final TransactionBatchAddedListener transactionBatchAddedListener, - final SyncState syncState) { + final SyncState syncState, + final EthContext ethContext, + final PeerTransactionTracker peerTransactionTracker) { this.pendingTransactions = pendingTransactions; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.transactionBatchAddedListener = transactionBatchAddedListener; this.syncState = syncState; + this.peerTransactionTracker = peerTransactionTracker; + + ethContext.getEthPeers().subscribeConnect(this::handleConnect); + } + + private void handleConnect(final EthPeer peer) { + List localTransactions = getLocalTransactions(); + for (Transaction transaction : localTransactions) { + peerTransactionTracker.addToPeerSendQueue(peer, transaction); + } } public List getLocalTransactions() { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java index 876b3926d3..aa86b63432 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java @@ -44,7 +44,9 @@ public static TransactionPool createTransactionPool( protocolSchedule, protocolContext, new TransactionSender(transactionTracker, transactionsMessageSender, ethContext), - syncState); + syncState, + ethContext, + transactionTracker); final TransactionsMessageHandler transactionsMessageHandler = new TransactionsMessageHandler( diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java index 057edbb9fa..e33a4a1ca3 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java @@ -48,6 +48,11 @@ import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.core.TransactionTestFixture; import tech.pegasys.pantheon.ethereum.core.Wei; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; @@ -59,7 +64,9 @@ import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.uint.UInt256; +import java.util.Collections; import java.util.List; +import java.util.Set; import org.junit.Before; import org.junit.Test; @@ -91,6 +98,9 @@ public class TransactionPoolTest { private TransactionPool transactionPool; private long genesisBlockGasLimit; private final AccountFilter accountFilter = mock(AccountFilter.class); + private SyncState syncState; + private EthContext ethContext; + private PeerTransactionTracker peerTransactionTracker; @Before public void setUp() { @@ -98,12 +108,21 @@ public void setUp() { when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec); when(protocolSpec.getTransactionValidator()).thenReturn(transactionValidator); genesisBlockGasLimit = executionContext.getGenesis().getHeader().getGasLimit(); - SyncState syncState = mock(SyncState.class); + syncState = mock(SyncState.class); when(syncState.isInSync(anyLong())).thenReturn(true); - + ethContext = mock(EthContext.class); + EthPeers ethPeers = mock(EthPeers.class); + when(ethContext.getEthPeers()).thenReturn(ethPeers); + peerTransactionTracker = mock(PeerTransactionTracker.class); transactionPool = new TransactionPool( - transactions, protocolSchedule, protocolContext, batchAddedListener, syncState); + transactions, + protocolSchedule, + protocolContext, + batchAddedListener, + syncState, + ethContext, + peerTransactionTracker); blockchain.observeBlockAdded(transactionPool); } @@ -434,7 +453,13 @@ public void shouldRejectRemoteTransactionsWhenNotInSync() { when(syncState.isInSync(anyLong())).thenReturn(false); TransactionPool transactionPool = new TransactionPool( - transactions, protocolSchedule, protocolContext, batchAddedListener, syncState); + transactions, + protocolSchedule, + protocolContext, + batchAddedListener, + syncState, + ethContext, + peerTransactionTracker); final TransactionTestFixture builder = new TransactionTestFixture(); final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1); @@ -462,12 +487,6 @@ public void shouldRejectRemoteTransactionsWhenNotInSync() { @Test public void shouldAllowRemoteTransactionsWhenInSync() { - SyncState syncState = mock(SyncState.class); - when(syncState.isInSync(anyLong())).thenReturn(true); - TransactionPool transactionPool = - new TransactionPool( - transactions, protocolSchedule, protocolContext, batchAddedListener, syncState); - final TransactionTestFixture builder = new TransactionTestFixture(); final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1); final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1); @@ -491,6 +510,40 @@ public void shouldAllowRemoteTransactionsWhenInSync() { assertTransactionPending(transaction3); } + @Test + public void shouldSendOnlyLocalTransactionToNewlyConnectedPeer() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + EthContext ethContext = ethProtocolManager.ethContext(); + PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker(); + TransactionPool transactionPool = + new TransactionPool( + transactions, + protocolSchedule, + protocolContext, + batchAddedListener, + syncState, + ethContext, + peerTransactionTracker); + + final TransactionTestFixture builder = new TransactionTestFixture(); + final Transaction transactionLocal = builder.nonce(1).createTransaction(KEY_PAIR1); + final Transaction transactionRemote = builder.nonce(2).createTransaction(KEY_PAIR1); + when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid()); + when(transactionValidator.validateForSender( + any(Transaction.class), nullable(Account.class), eq(true))) + .thenReturn(valid()); + + transactionPool.addLocalTransaction(transactionLocal); + transactionPool.addRemoteTransactions(Collections.singletonList(transactionRemote)); + + RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); + + Set transactionsToSendToPeer = + peerTransactionTracker.claimTransactionsToSendToPeer(peer.getEthPeer()); + + assertThat(transactionsToSendToPeer).containsExactly(transactionLocal); + } + private void assertTransactionPending(final Transaction t) { assertThat(transactions.getTransactionByHash(t.hash())).contains(t); } diff --git a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java index 430d824b48..7cccfeb43c 100644 --- a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java @@ -17,6 +17,7 @@ import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; import tech.pegasys.pantheon.ethereum.ProtocolContext; @@ -30,7 +31,10 @@ import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.core.Wei; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; +import tech.pegasys.pantheon.ethereum.eth.transactions.PeerTransactionTracker; import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; @@ -85,13 +89,21 @@ public void setUp() { final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create(); blockchain = executionContext.getBlockchain(); final ProtocolContext protocolContext = executionContext.getProtocolContext(); + + PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class); + EthContext ethContext = mock(EthContext.class); + EthPeers ethPeers = mock(EthPeers.class); + when(ethContext.getEthPeers()).thenReturn(ethPeers); + transactionPool = new TransactionPool( transactions, executionContext.getProtocolSchedule(), protocolContext, batchAddedListener, - syncState); + syncState, + ethContext, + peerTransactionTracker); final BlockchainQueries blockchainQueries = new BlockchainQueries(blockchain, protocolContext.getWorldStateArchive()); filterManager =