Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PIE-7] Ignore transactions from the network while behind chain head (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish authored Apr 15, 2019
1 parent 302d4f1 commit 42ecfb5
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ public SyncTarget setSyncTarget(final EthPeer peer, final BlockHeader commonAnce
}

public boolean isInSync() {
return isInSync(SYNC_TOLERANCE);
}

public boolean isInSync(final long syncTolerance) {
return syncTarget
.map(
t -> t.estimatedTargetHeight() - blockchain.getChainHeadBlockNumber() <= SYNC_TOLERANCE)
.map(t -> t.estimatedTargetHeight() - blockchain.getChainHeadBlockNumber() <= syncTolerance)
.orElse(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tech.pegasys.pantheon.ethereum.core.AccountFilter;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator.TransactionInvalidReason;
Expand All @@ -49,21 +50,25 @@
*/
public class TransactionPool implements BlockAddedObserver {
private static final Logger LOG = getLogger();
private static final long SYNC_TOLERANCE = 100L;
private final PendingTransactions pendingTransactions;
private final ProtocolSchedule<?> protocolSchedule;
private final ProtocolContext<?> protocolContext;
private final TransactionBatchAddedListener transactionBatchAddedListener;
private final SyncState syncState;
private Optional<AccountFilter> accountFilter = Optional.empty();

public TransactionPool(
final PendingTransactions pendingTransactions,
final ProtocolSchedule<?> protocolSchedule,
final ProtocolContext<?> protocolContext,
final TransactionBatchAddedListener transactionBatchAddedListener) {
final TransactionBatchAddedListener transactionBatchAddedListener,
final SyncState syncState) {
this.pendingTransactions = pendingTransactions;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.transactionBatchAddedListener = transactionBatchAddedListener;
this.syncState = syncState;
}

public List<Transaction> getLocalTransactions() {
Expand All @@ -88,6 +93,9 @@ public ValidationResult<TransactionInvalidReason> addLocalTransaction(
public void addRemoteTransactions(final Collection<Transaction> transactions) {
final Set<Transaction> addedTransactions = new HashSet<>();
for (final Transaction transaction : sortByNonce(transactions)) {
if (!syncState.isInSync(SYNC_TOLERANCE)) {
return;
}
final ValidationResult<TransactionInvalidReason> validationResult =
validateTransaction(transaction);
if (validationResult.isValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;

Expand All @@ -28,7 +29,8 @@ public static TransactionPool createTransactionPool(
final EthContext ethContext,
final Clock clock,
final int maxPendingTransactions,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final SyncState syncState) {
final PendingTransactions pendingTransactions =
new PendingTransactions(maxPendingTransactions, clock, metricsSystem);

Expand All @@ -41,7 +43,8 @@ public static TransactionPool createTransactionPool(
pendingTransactions,
protocolSchedule,
protocolContext,
new TransactionSender(transactionTracker, transactionsMessageSender, ethContext));
new TransactionSender(transactionTracker, transactionsMessageSender, ethContext),
syncState);

final TransactionsMessageHandler transactionsMessageHandler =
new TransactionsMessageHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.StatusMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPoolFactory;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
Expand Down Expand Up @@ -1054,7 +1055,8 @@ public void transactionMessagesGoToTheCorrectExecutor() {
ethManager.ethContext(),
TestClock.fixed(),
PendingTransactions.MAX_PENDING_TRANSACTIONS,
metricsSystem);
metricsSystem,
mock(SyncState.class));

// Send just a transaction message.
final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import static java.util.Collections.singletonList;
import static org.assertj.core.util.Preconditions.checkNotNull;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;

Expand All @@ -29,6 +32,7 @@
import tech.pegasys.pantheon.ethereum.eth.EthereumWireProtocolConfiguration;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkRunner;
Expand Down Expand Up @@ -139,14 +143,19 @@ public TestNode(
(connection, reason, initiatedByPeer) -> disconnections.put(connection, reason));

final EthContext ethContext = ethProtocolManager.ethContext();

SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);

transactionPool =
TransactionPoolFactory.createTransactionPool(
protocolSchedule,
protocolContext,
ethContext,
TestClock.fixed(),
PendingTransactions.MAX_PENDING_TRANSACTIONS,
metricsSystem);
metricsSystem,
syncState);
networkRunner.start();

selfPeer = new DefaultPeer(id(), endpoint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.TransactionTestFixture;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
Expand Down Expand Up @@ -85,21 +86,24 @@ public class TransactionPoolTest {
new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem);
private final Transaction transaction1 = createTransaction(1);
private final Transaction transaction2 = createTransaction(2);
private final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create();
private final ProtocolContext<Void> protocolContext = executionContext.getProtocolContext();
private TransactionPool transactionPool;
private long genesisBlockGasLimit;
private final AccountFilter accountFilter = mock(AccountFilter.class);

@Before
public void setUp() {
final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create();
blockchain = executionContext.getBlockchain();
final ProtocolContext<Void> protocolContext = executionContext.getProtocolContext();
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec);
when(protocolSpec.getTransactionValidator()).thenReturn(transactionValidator);
genesisBlockGasLimit = executionContext.getGenesis().getHeader().getGasLimit();
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);

transactionPool =
new TransactionPool(transactions, protocolSchedule, protocolContext, batchAddedListener);
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
blockchain.observeBlockAdded(transactionPool);
}

Expand Down Expand Up @@ -179,7 +183,7 @@ public void shouldRemovePendingTransactionsFromAllBlocksOnAForkWhenItBecomesTheC
}

@Test
public void shouldReaddTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs() {
public void shouldReadTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs() {
givenTransactionIsValid(transaction1);
givenTransactionIsValid(transaction2);
transactions.addRemoteTransaction(transaction1);
Expand All @@ -205,7 +209,7 @@ public void shouldReaddTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs(
}

@Test
public void shouldNotReaddTransactionsThatAreInBothForksWhenReorgHappens() {
public void shouldNotReadTransactionsThatAreInBothForksWhenReorgHappens() {
givenTransactionIsValid(transaction1);
givenTransactionIsValid(transaction2);
transactions.addRemoteTransaction(transaction1);
Expand Down Expand Up @@ -424,6 +428,69 @@ public void shouldAllowTransactionWhenAccountWhitelistControllerIsNotPresent() {
assertTransactionPending(transaction1);
}

@Test
public void shouldRejectRemoteTransactionsWhenNotInSync() {
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(false);
TransactionPool transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);

final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1);
final Transaction transaction3 = builder.nonce(3).createTransaction(KEY_PAIR1);

when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction1), nullable(Account.class), eq(true)))
.thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction2), nullable(Account.class), eq(true)))
.thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction3), nullable(Account.class), eq(true)))
.thenReturn(valid());

transactionPool.addRemoteTransactions(asList(transaction3, transaction1, transaction2));

assertTransactionNotPending(transaction1);
assertTransactionNotPending(transaction2);
assertTransactionNotPending(transaction3);
verifyZeroInteractions(batchAddedListener);
}

@Test
public void shouldAllowRemoteTransactionsWhenInSync() {
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);
TransactionPool transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);

final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1);
final Transaction transaction3 = builder.nonce(3).createTransaction(KEY_PAIR1);

when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction1), nullable(Account.class), eq(true)))
.thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction2), nullable(Account.class), eq(true)))
.thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction3), nullable(Account.class), eq(true)))
.thenReturn(valid());

transactionPool.addRemoteTransactions(asList(transaction3, transaction1, transaction2));

assertTransactionPending(transaction1);
assertTransactionPending(transaction2);
assertTransactionPending(transaction3);
}

private void assertTransactionPending(final Transaction t) {
assertThat(transactions.getTransactionByHash(t.hash())).contains(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
Expand All @@ -29,6 +30,7 @@
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
Expand Down Expand Up @@ -76,6 +78,7 @@ public class EthGetFilterChangesIntegrationTest {
private final JsonRpcParameter parameters = new JsonRpcParameter();
private FilterManager filterManager;
private EthGetFilterChanges method;
private final SyncState syncState = mock(SyncState.class);

@Before
public void setUp() {
Expand All @@ -87,7 +90,8 @@ public void setUp() {
transactions,
executionContext.getProtocolSchedule(),
protocolContext,
batchAddedListener);
batchAddedListener,
syncState);
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, protocolContext.getWorldStateArchive());
filterManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ static PantheonController<CliqueContext> init(
ethProtocolManager.ethContext(),
clock,
maxPendingTransactions,
metricsSystem);
metricsSystem,
syncState);

final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final CliqueMinerExecutor miningExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ static PantheonController<IbftContext> init(
istanbul64ProtocolManager.ethContext(),
clock,
maxPendingTransactions,
metricsSystem);
metricsSystem,
syncState);

return new IbftLegacyPantheonController(
protocolSchedule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ static PantheonController<IbftContext> init(
ethContext,
clock,
maxPendingTransactions,
metricsSystem);
metricsSystem,
syncState);

final IbftEventQueue ibftEventQueue = new IbftEventQueue(ibftConfig.getMessageQueueLimit());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public static PantheonController<Void> init(
ethProtocolManager.ethContext(),
clock,
maxPendingTransactions,
metricsSystem);
metricsSystem,
syncState);

final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final EthHashMinerExecutor executor =
Expand Down

0 comments on commit 42ecfb5

Please sign in to comment.