Skip to content

Commit

Permalink
Prioritize with nonce distance (hyperledger#2505)
Browse files Browse the repository at this point in the history

Signed-off-by: Justin Florentine <justin.florentine@consensys.net>

Co-authored-by: matkt <karim.t2am@gmail.com>
  • Loading branch information
jflo and matkt authored Aug 12, 2021
1 parent b0e57a4 commit 1f4f131
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ public void setUp() {
when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthPeers.streamAvailablePeers())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty());
when(mockEthPeers.streamAvailablePeers()).thenAnswer(__ -> Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
when(mockProtocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec);
Expand Down Expand Up @@ -436,7 +432,7 @@ public void transactionDroppedEventDoesNotFireAfterUnsubscribe() {
transactionPool.addLocalTransaction(TX2);

assertThat(result.get()).isNotNull();
serviceImpl.removeTransactionAddedListener(id);
serviceImpl.removeTransactionDroppedListener(id);
result.set(null);

transactionPool.addLocalTransaction(TX2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class PendingTransactions {
.get()
.getValue()
.longValue())
.thenComparing(this::distanceFromNextNonce)
.thenComparing(TransactionInfo::getSequence)
.reversed());

Expand All @@ -105,8 +106,23 @@ public class PendingTransactions {
.getMaxFeePerGas()
.map(maxFeePerGas -> maxFeePerGas.getValue().longValue())
.orElse(transactionInfo.getGasPrice().toLong()))
.thenComparing(this::distanceFromNextNonce)
.thenComparing(TransactionInfo::getSequence)
.reversed());

private Long distanceFromNextNonce(final TransactionInfo incomingTx) {
final TransactionsForSenderInfo inPool = transactionsBySender.get(incomingTx.getSender());
if ((inPool == null)
|| (inPool.streamTransactionInfos().count() < 1)) { // nothing in pool, you're next
return 0L;
}
long minNonceForAccount =
inPool.streamTransactionInfos().mapToLong(TransactionInfo::getNonce).min().getAsLong();
// despite this looking backwards, it produces the sort order we want.
// greater distances produce more negative results, which are then .reversed()
return minNonceForAccount - incomingTx.getNonce();
}

private Optional<Long> baseFee;
private final Map<Address, TransactionsForSenderInfo> transactionsBySender =
new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@Value.Style(allParameters = true)
public interface TransactionPoolConfiguration {
int DEFAULT_TX_MSG_KEEP_ALIVE = 60;
int MAX_PENDING_TRANSACTIONS = 4096;
int MAX_PENDING_TRANSACTIONS = 4096 * 8;
int MAX_PENDING_TRANSACTIONS_HASHES = 4096;
int DEFAULT_TX_RETENTION_HOURS = 13;
Percentage DEFAULT_PRICE_BUMP = Percentage.fromInt(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.crypto.KeyPair;
Expand Down Expand Up @@ -122,15 +122,30 @@ public void shouldGetTransactionByHash() {

@Test
public void shouldDropOldestTransactionWhenLimitExceeded() {
final Transaction oldestTransaction = createTransaction(0);
final Transaction oldestTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(0L)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());
transactions.addRemoteTransaction(oldestTransaction);
for (int i = 1; i < MAX_TRANSACTIONS; i++) {
transactions.addRemoteTransaction(createTransaction(i));
final Transaction newerTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(0L)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());
transactions.addRemoteTransaction(newerTransaction);
}
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);
assertThat(metricsSystem.getCounterValue(REMOVED_COUNTER, REMOTE, DROPPED)).isZero();

transactions.addRemoteTransaction(createTransaction(MAX_TRANSACTIONS + 1));
final Transaction lastTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(MAX_TRANSACTIONS + 1)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());

transactions.addRemoteTransaction(lastTransaction);
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);
assertTransactionNotPending(oldestTransaction);
assertThat(metricsSystem.getCounterValue(REMOVED_COUNTER, REMOTE, DROPPED)).isEqualTo(1);
Expand All @@ -153,6 +168,11 @@ public void shouldHandleMaximumTransactionLimitCorrectlyWhenSameTransactionAdded

@Test
public void shouldPrioritizeLocalTransaction() {

transactions.subscribeDroppedTransactions(
transaction ->
assertThat(transactions.getLocalTransactions().contains(transaction)).isFalse());

final Transaction localTransaction = createTransaction(0);
transactions.addLocalTransaction(localTransaction);

Expand All @@ -165,17 +185,23 @@ public void shouldPrioritizeLocalTransaction() {

@Test
public void shouldPrioritizeGasPriceThenTimeAddedToPool() {
transactions.subscribeDroppedTransactions(
transaction -> assertThat(transaction.getGasPrice().get().toLong()).isLessThan(100));
final List<Transaction> lowGasPriceTransactions =
IntStream.range(0, MAX_TRANSACTIONS)
.mapToObj(i -> transactionWithNonceSenderAndGasPrice(i + 1, KEYS1, 10))
.mapToObj(
i ->
transactionWithNonceSenderAndGasPrice(
i + 1, SIGNATURE_ALGORITHM.get().generateKeyPair(), 10 + i))
.collect(Collectors.toUnmodifiableList());

// Fill the pool
lowGasPriceTransactions.forEach(transactions::addRemoteTransaction);

// This should kick the oldest tx with the low gas price out, namely the first one we added
final Transaction highGasPriceTransaction =
transactionWithNonceSenderAndGasPrice(MAX_TRANSACTIONS + 1, KEYS1, 100);
transactionWithNonceSenderAndGasPrice(
MAX_TRANSACTIONS + 10, SIGNATURE_ALGORITHM.get().generateKeyPair(), 100);
transactions.addRemoteTransaction(highGasPriceTransaction);
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);

Expand All @@ -186,14 +212,12 @@ public void shouldPrioritizeGasPriceThenTimeAddedToPool() {

@Test
public void shouldStartDroppingLocalTransactionsWhenPoolIsFullOfLocalTransactions() {
final Transaction firstLocalTransaction = createTransaction(0);
transactions.addLocalTransaction(firstLocalTransaction);
transactions.subscribeDroppedTransactions(this::assertTransactionNotPending);

for (int i = 1; i <= MAX_TRANSACTIONS; i++) {
for (int i = 0; i <= MAX_TRANSACTIONS; i++) {
transactions.addLocalTransaction(createTransaction(i));
}
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);
assertTransactionNotPending(firstLocalTransaction);
}

@Test
Expand All @@ -207,17 +231,16 @@ public void shouldNotifyListenerWhenRemoteTransactionAdded() {

@Test
public void shouldNotNotifyListenerAfterUnsubscribe() {

final long id = transactions.subscribePendingTransactions(listener);

transactions.addRemoteTransaction(transaction1);

verify(listener).onTransactionAdded(transaction1);

transactions.unsubscribePendingTransactions(id);

verifyNoMoreInteractions(listener);
transactions.addRemoteTransaction(transaction2);

verifyZeroInteractions(listener);
}

@Test
Expand Down Expand Up @@ -277,7 +300,7 @@ public void shouldNotNotifyDroppedListenerWhenTransactionAddedToBlock() {

transactions.transactionAddedToBlock(transaction1);

verifyZeroInteractions(droppedListener);
verifyNoInteractions(droppedListener);
}

@Test
Expand Down Expand Up @@ -473,7 +496,7 @@ public void shouldNotReplaceTransactionWithSameSenderAndNonceWhenGasPriceIsLower
assertTransactionNotPending(transaction1b);
assertTransactionPending(transaction1);
assertThat(transactions.size()).isEqualTo(1);
verifyZeroInteractions(listener);
verifyNoInteractions(listener);
}

@Test
Expand Down Expand Up @@ -681,18 +704,18 @@ public void shouldNotIncrementAddedCounterWhenLocalTransactionAlreadyPresent() {
@Test
public void assertThatCorrectNonceIsReturned() {
assertThat(transactions.getNextNonceForSender(transaction1.getSender())).isEmpty();
addLocalTransactions(1, 2, 4, 5);
addLocalTransactions(1, 2, 4);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(3);
addLocalTransactions(3);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(6);
addLocalTransactions(6, 10);
.hasValue(5);
addLocalTransactions(5);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(7);
.hasValue(6);
}

@Test
Expand Down Expand Up @@ -726,4 +749,60 @@ private static BlockHeader mockBlockHeader() {
when(blockHeader.getBaseFee()).thenReturn(Optional.empty());
return blockHeader;
}

@Test
public void shouldIgnoreFutureNoncedTxs() {

// create maxtx transactions with valid addresses/nonces
// all addresses should be unique, chained txs will be checked in another test.
// TODO: how do we test around reorgs? do we?
List<Transaction> toValidate = new ArrayList<>((int) transactions.maxSize());
for (int entries = 1; entries <= transactions.maxSize(); entries++) {
KeyPair kp = SIGNATURE_ALGORITHM.get().generateKeyPair();
Address a = Util.publicKeyToAddress(kp.getPublicKey());
Transaction t =
new TransactionTestFixture()
.sender(a)
.value(Wei.of(2))
.maxPriorityFeePerGas(Optional.of(Wei.of(2L)))
.nonce(entries)
.createTransaction(kp);
transactions.addRemoteTransaction(t);
toValidate.add(t);
}

// create maxtx transaction with nonces in the future, could be any volume though since pool
// already full
List<Transaction> attackTxs = new ArrayList<>();
KeyPair attackerKp = SIGNATURE_ALGORITHM.get().generateKeyPair();
Address attackerA = Util.publicKeyToAddress(attackerKp.getPublicKey());

for (int entries = 10;
entries < transactions.maxSize() + 10;
entries++) { // badguy nonces are 2 digits
Transaction t =
new TransactionTestFixture()
.sender(attackerA)
.value(Wei.of(2))
.nonce(entries)
.maxPriorityFeePerGas(Optional.of(Wei.of(2L)))
.createTransaction(attackerKp);
attackTxs.add(t);
transactions.addRemoteTransaction(t); // all but the last one of these should be dropped
}

// assert txpool contains 1st attack

assertThat(transactions.getTransactionByHash(attackTxs.get(0).getHash())).isNotEmpty();
// assert txpool does not contain rest of attack
attackTxs.stream()
.skip(1L)
.forEach(t -> assertThat(transactions.getTransactionByHash(t.getHash())).isEmpty());
// assert that only 1 of the valid batch was purged
long droppedValidCount =
toValidate.stream()
.filter(t -> transactions.getTransactionByHash(t.getHash()).isEmpty())
.count();
assertThat(droppedValidCount).isEqualTo(1L);
}
}

0 comments on commit 1f4f131

Please sign in to comment.