Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize with nonce distance #2505

Merged
merged 13 commits into from
Aug 12, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ public void setUp() {
when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthPeers.streamAvailablePeers())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty());
when(mockEthPeers.streamAvailablePeers()).thenAnswer(__ -> Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
when(mockProtocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec);
Expand Down Expand Up @@ -436,7 +432,7 @@ public void transactionDroppedEventDoesNotFireAfterUnsubscribe() {
transactionPool.addLocalTransaction(TX2);

assertThat(result.get()).isNotNull();
serviceImpl.removeTransactionAddedListener(id);
serviceImpl.removeTransactionDroppedListener(id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch.

result.set(null);

transactionPool.addLocalTransaction(TX2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class PendingTransactions {
.get()
.getValue()
.longValue())
.thenComparing(this::distanceFromNextNonce)
.thenComparing(TransactionInfo::getSequence)
.reversed());

Expand All @@ -105,8 +106,23 @@ public class PendingTransactions {
.getMaxFeePerGas()
.map(maxFeePerGas -> maxFeePerGas.getValue().longValue())
.orElse(transactionInfo.getGasPrice().toLong()))
.thenComparing(this::distanceFromNextNonce)
.thenComparing(TransactionInfo::getSequence)
.reversed());

private Long distanceFromNextNonce(final TransactionInfo incomingTx) {
final TransactionsForSenderInfo inPool = transactionsBySender.get(incomingTx.getSender());
if ((inPool == null)
|| (inPool.streamTransactionInfos().count() < 1)) { // nothing in pool, you're next
return 0L;
}
long minNonceForAccount =
inPool.streamTransactionInfos().mapToLong(TransactionInfo::getNonce).min().getAsLong();
// despite this looking backwards, it produces the sort order we want.
// greater distances produce more negative results, which are then .reversed()
return minNonceForAccount - incomingTx.getNonce();
}

private Optional<Long> baseFee;
private final Map<Address, TransactionsForSenderInfo> transactionsBySender =
new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@Value.Style(allParameters = true)
public interface TransactionPoolConfiguration {
int DEFAULT_TX_MSG_KEEP_ALIVE = 60;
int MAX_PENDING_TRANSACTIONS = 4096;
int MAX_PENDING_TRANSACTIONS = 4096 * 8;
int MAX_PENDING_TRANSACTIONS_HASHES = 4096;
int DEFAULT_TX_RETENTION_HOURS = 13;
Percentage DEFAULT_PRICE_BUMP = Percentage.fromInt(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.crypto.KeyPair;
Expand Down Expand Up @@ -122,15 +122,30 @@ public void shouldGetTransactionByHash() {

@Test
public void shouldDropOldestTransactionWhenLimitExceeded() {
final Transaction oldestTransaction = createTransaction(0);
final Transaction oldestTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(0L)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());
transactions.addRemoteTransaction(oldestTransaction);
for (int i = 1; i < MAX_TRANSACTIONS; i++) {
transactions.addRemoteTransaction(createTransaction(i));
final Transaction newerTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(0L)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());
transactions.addRemoteTransaction(newerTransaction);
}
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);
assertThat(metricsSystem.getCounterValue(REMOVED_COUNTER, REMOTE, DROPPED)).isZero();

transactions.addRemoteTransaction(createTransaction(MAX_TRANSACTIONS + 1));
final Transaction lastTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(MAX_TRANSACTIONS + 1)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());

transactions.addRemoteTransaction(lastTransaction);
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);
assertTransactionNotPending(oldestTransaction);
assertThat(metricsSystem.getCounterValue(REMOVED_COUNTER, REMOTE, DROPPED)).isEqualTo(1);
Expand All @@ -153,6 +168,11 @@ public void shouldHandleMaximumTransactionLimitCorrectlyWhenSameTransactionAdded

@Test
public void shouldPrioritizeLocalTransaction() {

transactions.subscribeDroppedTransactions(
transaction ->
assertThat(transactions.getLocalTransactions().contains(transaction)).isFalse());

final Transaction localTransaction = createTransaction(0);
transactions.addLocalTransaction(localTransaction);

Expand All @@ -165,17 +185,23 @@ public void shouldPrioritizeLocalTransaction() {

@Test
public void shouldPrioritizeGasPriceThenTimeAddedToPool() {
transactions.subscribeDroppedTransactions(
transaction -> assertThat(transaction.getGasPrice().get().toLong()).isLessThan(100));
final List<Transaction> lowGasPriceTransactions =
IntStream.range(0, MAX_TRANSACTIONS)
.mapToObj(i -> transactionWithNonceSenderAndGasPrice(i + 1, KEYS1, 10))
.mapToObj(
i ->
transactionWithNonceSenderAndGasPrice(
i + 1, SIGNATURE_ALGORITHM.get().generateKeyPair(), 10 + i))
.collect(Collectors.toUnmodifiableList());

// Fill the pool
lowGasPriceTransactions.forEach(transactions::addRemoteTransaction);

// This should kick the oldest tx with the low gas price out, namely the first one we added
final Transaction highGasPriceTransaction =
transactionWithNonceSenderAndGasPrice(MAX_TRANSACTIONS + 1, KEYS1, 100);
transactionWithNonceSenderAndGasPrice(
MAX_TRANSACTIONS + 10, SIGNATURE_ALGORITHM.get().generateKeyPair(), 100);
transactions.addRemoteTransaction(highGasPriceTransaction);
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);

Expand All @@ -186,14 +212,12 @@ public void shouldPrioritizeGasPriceThenTimeAddedToPool() {

@Test
public void shouldStartDroppingLocalTransactionsWhenPoolIsFullOfLocalTransactions() {
final Transaction firstLocalTransaction = createTransaction(0);
transactions.addLocalTransaction(firstLocalTransaction);
transactions.subscribeDroppedTransactions(this::assertTransactionNotPending);

for (int i = 1; i <= MAX_TRANSACTIONS; i++) {
for (int i = 0; i <= MAX_TRANSACTIONS; i++) {
transactions.addLocalTransaction(createTransaction(i));
}
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);
assertTransactionNotPending(firstLocalTransaction);
}

@Test
Expand All @@ -207,17 +231,16 @@ public void shouldNotifyListenerWhenRemoteTransactionAdded() {

@Test
public void shouldNotNotifyListenerAfterUnsubscribe() {

final long id = transactions.subscribePendingTransactions(listener);

transactions.addRemoteTransaction(transaction1);

verify(listener).onTransactionAdded(transaction1);

transactions.unsubscribePendingTransactions(id);

verifyNoMoreInteractions(listener);
transactions.addRemoteTransaction(transaction2);

verifyZeroInteractions(listener);
}

@Test
Expand Down Expand Up @@ -277,7 +300,7 @@ public void shouldNotNotifyDroppedListenerWhenTransactionAddedToBlock() {

transactions.transactionAddedToBlock(transaction1);

verifyZeroInteractions(droppedListener);
verifyNoInteractions(droppedListener);
}

@Test
Expand Down Expand Up @@ -473,7 +496,7 @@ public void shouldNotReplaceTransactionWithSameSenderAndNonceWhenGasPriceIsLower
assertTransactionNotPending(transaction1b);
assertTransactionPending(transaction1);
assertThat(transactions.size()).isEqualTo(1);
verifyZeroInteractions(listener);
verifyNoInteractions(listener);
}

@Test
Expand Down Expand Up @@ -681,18 +704,18 @@ public void shouldNotIncrementAddedCounterWhenLocalTransactionAlreadyPresent() {
@Test
public void assertThatCorrectNonceIsReturned() {
assertThat(transactions.getNextNonceForSender(transaction1.getSender())).isEmpty();
addLocalTransactions(1, 2, 4, 5);
addLocalTransactions(1, 2, 4);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(3);
addLocalTransactions(3);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(6);
addLocalTransactions(6, 10);
.hasValue(5);
addLocalTransactions(5);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(7);
.hasValue(6);
}

@Test
Expand Down Expand Up @@ -726,4 +749,60 @@ private static BlockHeader mockBlockHeader() {
when(blockHeader.getBaseFee()).thenReturn(Optional.empty());
return blockHeader;
}

@Test
public void shouldIgnoreFutureNoncedTxs() {

// create maxtx transactions with valid addresses/nonces
// all addresses should be unique, chained txs will be checked in another test.
// TODO: how do we test around reorgs? do we?
List<Transaction> toValidate = new ArrayList<>((int) transactions.maxSize());
for (int entries = 1; entries <= transactions.maxSize(); entries++) {
KeyPair kp = SIGNATURE_ALGORITHM.get().generateKeyPair();
Address a = Util.publicKeyToAddress(kp.getPublicKey());
Transaction t =
new TransactionTestFixture()
.sender(a)
.value(Wei.of(2))
.maxPriorityFeePerGas(Optional.of(Wei.of(2L)))
.nonce(entries)
.createTransaction(kp);
transactions.addRemoteTransaction(t);
toValidate.add(t);
}

// create maxtx transaction with nonces in the future, could be any volume though since pool
// already full
List<Transaction> attackTxs = new ArrayList<>();
KeyPair attackerKp = SIGNATURE_ALGORITHM.get().generateKeyPair();
Address attackerA = Util.publicKeyToAddress(attackerKp.getPublicKey());

for (int entries = 10;
entries < transactions.maxSize() + 10;
entries++) { // badguy nonces are 2 digits
Transaction t =
new TransactionTestFixture()
.sender(attackerA)
.value(Wei.of(2))
.nonce(entries)
.maxPriorityFeePerGas(Optional.of(Wei.of(2L)))
.createTransaction(attackerKp);
attackTxs.add(t);
transactions.addRemoteTransaction(t); // all but the last one of these should be dropped
}

// assert txpool contains 1st attack

assertThat(transactions.getTransactionByHash(attackTxs.get(0).getHash())).isNotEmpty();
// assert txpool does not contain rest of attack
attackTxs.stream()
.skip(1L)
.forEach(t -> assertThat(transactions.getTransactionByHash(t.getHash())).isEmpty());
// assert that only 1 of the valid batch was purged
long droppedValidCount =
toValidate.stream()
.filter(t -> transactions.getTransactionByHash(t.getHash()).isEmpty())
.count();
assertThat(droppedValidCount).isEqualTo(1L);
}
}