Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize with nonce distance #2505

Merged
merged 13 commits into from
Aug 12, 2021
Merged
Prev Previous commit
Next Next commit
some tests need to consider if their nonces should be from unique sen…
…ders

Signed-off-by: Justin Florentine <justin.florentine@consensys.net>
  • Loading branch information
jflo committed Jul 7, 2021
commit a4eb2ce9732b7f476916b0134c6e8ec74e9c43dc
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ public void setUp() {
when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthPeers.streamAvailablePeers())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty())
.thenReturn(Stream.empty());
when(mockEthPeers.streamAvailablePeers()).thenAnswer(__ -> Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
when(mockProtocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec);
Expand Down Expand Up @@ -436,7 +432,7 @@ public void transactionDroppedEventDoesNotFireAfterUnsubscribe() {
transactionPool.addLocalTransaction(TX2);

assertThat(result.get()).isNotNull();
serviceImpl.removeTransactionAddedListener(id);
serviceImpl.removeTransactionDroppedListener(id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch.

result.set(null);

transactionPool.addLocalTransaction(TX2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class PendingTransactions {
*/
private final NavigableSet<TransactionInfo> prioritizedTransactionsStaticRange =
new TreeSet<>(
comparing(TransactionInfo::isReceivedFromLocalSource).reversed()
comparing(TransactionInfo::isReceivedFromLocalSource)
.thenComparing(
transactionInfo ->
transactionInfo
Expand All @@ -92,32 +92,34 @@ public class PendingTransactions {
.get()
.getValue()
.longValue())
.thenComparing(transactionInfo -> distanceFromNextNonce(transactionInfo)).reversed()
.thenComparing(transactionInfo -> distanceFromNextNonce(transactionInfo))
.thenComparing(TransactionInfo::getSequence)
.reversed());

private final NavigableSet<TransactionInfo> prioritizedTransactionsDynamicRange =
new TreeSet<>(
comparing(TransactionInfo::isReceivedFromLocalSource).reversed()
comparing(TransactionInfo::isReceivedFromLocalSource)
.thenComparing(
transactionInfo ->
transactionInfo
.getTransaction()
.getMaxFeePerGas()
.map(maxFeePerGas -> maxFeePerGas.getValue().longValue())
.orElse(transactionInfo.getGasPrice().toLong()))
.thenComparing(transactionInfo -> distanceFromNextNonce(transactionInfo)).reversed()
.thenComparing(transactionInfo -> distanceFromNextNonce(transactionInfo))
.thenComparing(TransactionInfo::getSequence)
.reversed());


private Long distanceFromNextNonce(final TransactionInfo incomingTx) {
TransactionsForSenderInfo inPool = transactionsBySender.get(incomingTx.getSender());
jflo marked this conversation as resolved.
Show resolved Hide resolved
if(inPool == null) { //nothing in pool, you're next
if (inPool == null) { // nothing in pool, you're next
jflo marked this conversation as resolved.
Show resolved Hide resolved
return 0L;
}
long minNonceForAccount = inPool.streamTransactionInfos().mapToLong(TransactionInfo::getNonce).min().getAsLong();
return incomingTx.getNonce() - minNonceForAccount;
long minNonceForAccount =
inPool.streamTransactionInfos().mapToLong(TransactionInfo::getNonce).min().getAsLong();
// despite this looking backwards, it produces the sort order we want.
// greater distances produce more negative results, which are then .reversed()
return minNonceForAccount - incomingTx.getNonce();
}

private Optional<Long> baseFee;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,15 @@ public void shouldGetTransactionByHash() {

@Test
public void shouldDropOldestTransactionWhenLimitExceeded() {
final Transaction oldestTransaction = new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(0L)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());
final Transaction oldestTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(0L)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());
transactions.addRemoteTransaction(oldestTransaction);
for (int i = 1; i < MAX_TRANSACTIONS; i++) {
final Transaction newerTransaction = new TransactionTestFixture()
final Transaction newerTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(0L)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());
Expand All @@ -137,7 +139,8 @@ public void shouldDropOldestTransactionWhenLimitExceeded() {
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);
assertThat(metricsSystem.getCounterValue(REMOVED_COUNTER, REMOTE, DROPPED)).isZero();

final Transaction lastTransaction = new TransactionTestFixture()
final Transaction lastTransaction =
new TransactionTestFixture()
.value(Wei.of(1L))
.nonce(MAX_TRANSACTIONS + 1)
.createTransaction(SIGNATURE_ALGORITHM.get().generateKeyPair());
Expand Down Expand Up @@ -166,12 +169,13 @@ public void shouldHandleMaximumTransactionLimitCorrectlyWhenSameTransactionAdded
@Test
public void shouldPrioritizeLocalTransaction() {

transactions.subscribeDroppedTransactions(new PendingTransactionDroppedListener() {
@Override
public void onTransactionDropped(final Transaction transaction) {
assertThat(transactions.getLocalTransactions().contains(transaction)).isFalse();
}
});
transactions.subscribeDroppedTransactions(
new PendingTransactionDroppedListener() {
@Override
public void onTransactionDropped(final Transaction transaction) {
assertThat(transactions.getLocalTransactions().contains(transaction)).isFalse();
}
});

final Transaction localTransaction = createTransaction(0);
transactions.addLocalTransaction(localTransaction);
Expand All @@ -186,27 +190,27 @@ public void onTransactionDropped(final Transaction transaction) {
@Test
public void shouldPrioritizeGasPriceThenTimeAddedToPool() {
transactions.subscribeDroppedTransactions(
new PendingTransactionDroppedListener() {
@Override
public void onTransactionDropped(final Transaction transaction) {
assertThat(transaction.getGasPrice().get().toLong()).isLessThan(100);
}
}
);
new PendingTransactionDroppedListener() {
@Override
public void onTransactionDropped(final Transaction transaction) {
assertThat(transaction.getGasPrice().get().toLong()).isLessThan(100);
}
});
final List<Transaction> lowGasPriceTransactions =
IntStream.range(0, MAX_TRANSACTIONS)
.mapToObj(
i ->
transactionWithNonceSenderAndGasPrice(
i + 1, SIGNATURE_ALGORITHM.get().generateKeyPair(), 10))
i + 1, SIGNATURE_ALGORITHM.get().generateKeyPair(), 10 + i))
.collect(Collectors.toUnmodifiableList());

// Fill the pool
lowGasPriceTransactions.forEach(transactions::addRemoteTransaction);

// This should kick the oldest tx with the low gas price out, namely the first one we added
final Transaction highGasPriceTransaction =
transactionWithNonceSenderAndGasPrice(MAX_TRANSACTIONS + 10, SIGNATURE_ALGORITHM.get().generateKeyPair(), 100);
transactionWithNonceSenderAndGasPrice(
MAX_TRANSACTIONS + 10, SIGNATURE_ALGORITHM.get().generateKeyPair(), 100);
transactions.addRemoteTransaction(highGasPriceTransaction);
assertThat(transactions.size()).isEqualTo(MAX_TRANSACTIONS);

Expand All @@ -217,12 +221,13 @@ public void onTransactionDropped(final Transaction transaction) {

@Test
public void shouldStartDroppingLocalTransactionsWhenPoolIsFullOfLocalTransactions() {
transactions.subscribeDroppedTransactions(new PendingTransactionDroppedListener() {
@Override
public void onTransactionDropped(final Transaction transaction) {
assertTransactionNotPending(transaction);
}
});
transactions.subscribeDroppedTransactions(
new PendingTransactionDroppedListener() {
@Override
public void onTransactionDropped(final Transaction transaction) {
assertTransactionNotPending(transaction);
}
});

for (int i = 0; i <= MAX_TRANSACTIONS; i++) {
transactions.addLocalTransaction(createTransaction(i));
Expand Down Expand Up @@ -715,7 +720,7 @@ public void shouldNotIncrementAddedCounterWhenLocalTransactionAlreadyPresent() {
@Test
public void assertThatCorrectNonceIsReturned() {
assertThat(transactions.getNextNonceForSender(transaction1.getSender())).isEmpty();
addLocalTransactions(1,2,4,5);
addLocalTransactions(1, 2, 4, 5);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(3);
Expand All @@ -724,12 +729,9 @@ public void assertThatCorrectNonceIsReturned() {
.isPresent()
.hasValue(6);
addLocalTransactions(6, 10);
//We have a logic change here. Previous implementation intended for oldest tx (1) regardless of nonce to be dropped.
//Since nonce distance is considered before sequence (and it must, because sequences are global) in the new logic,
//nonce 6 gets dropped. Next nonce is expected to be 6.
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(7); //i do believe 6 is the correct next nonce to return, however in reality 6 is gone and never coming back
.hasValue(6); // Debateable
}

@Test
Expand Down Expand Up @@ -757,17 +759,6 @@ private void addLocalTransactions(final long... nonces) {
transactions.addLocalTransaction(createTransaction(nonce));
}
}
/*
private void addLocalTransactionFrom(final Address from, final KeyPair kp, final long... nonces) {
for (final long nonce : nonces) {
final Transaction ltf = new TransactionTestFixture()
.sender(from)
.value(Wei.of(nonce))
.nonce(nonce)
.createTransaction(kp);
transactions.addLocalTransaction(ltf);
}
}*/

private static BlockHeader mockBlockHeader() {
final BlockHeader blockHeader = mock(BlockHeader.class);
Expand All @@ -778,7 +769,6 @@ private static BlockHeader mockBlockHeader() {
@Test
public void shouldIgnoreFutureNoncedTxs() {


// create maxtx transaction with valid addresses/nonces
// all addresses should be unique, chained txs will be checked in another test.
// TODO: how do we test around reorgs? do we?
Expand All @@ -787,47 +777,53 @@ public void shouldIgnoreFutureNoncedTxs() {
KeyPair kp = SIGNATURE_ALGORITHM.get().generateKeyPair();
Address a = Util.publicKeyToAddress(kp.getPublicKey());
Transaction t =
new TransactionTestFixture()
.sender(a)
.value(Wei.of(2))
.maxPriorityFeePerGas(Optional.of(Wei.of(2L)))
.nonce(entries)
.createTransaction(kp);
new TransactionTestFixture()
.sender(a)
.value(Wei.of(2))
.maxPriorityFeePerGas(Optional.of(Wei.of(2L)))
.nonce(entries)
.createTransaction(kp);
transactions.addRemoteTransaction(t);
toValidate.add(t);
}

// create maxtx transaction with nonces in the future, could be any volume though since pool already full
// create maxtx transaction with nonces in the future, could be any volume though since pool
// already full
List<Transaction> attackTxs = new ArrayList<>();
KeyPair attackerKp = SIGNATURE_ALGORITHM.get().generateKeyPair();
Address attackerA = Util.publicKeyToAddress(attackerKp.getPublicKey());

transactions.subscribeDroppedTransactions(new PendingTransactionDroppedListener() {
@Override
public void onTransactionDropped(final Transaction transaction) {
System.out.println("dropping transaction from "+ transaction.getSender().toHexString());
}
});
transactions.subscribeDroppedTransactions(
new PendingTransactionDroppedListener() {
@Override
public void onTransactionDropped(final Transaction transaction) {
System.out.println(
"dropping transaction from " + transaction.getSender().toHexString());
}
});

for (int entries = 10; entries < transactions.maxSize() + 10; entries++) { //badguy nonces are 2 digits
for (int entries = 10;
entries < transactions.maxSize() + 10;
entries++) { // badguy nonces are 2 digits
Transaction t =
new TransactionTestFixture()
.sender(attackerA)
.value(Wei.of(2))
.nonce(entries)
.maxPriorityFeePerGas(Optional.of(Wei.of(2L)))
.createTransaction(attackerKp);
new TransactionTestFixture()
.sender(attackerA)
.value(Wei.of(2))
.nonce(entries)
.maxPriorityFeePerGas(Optional.of(Wei.of(2L)))
.createTransaction(attackerKp);
attackTxs.add(t);
transactions.addRemoteTransaction(t); //all but the last one of these should be dropped
transactions.addRemoteTransaction(t); // all but the last one of these should be dropped
}

//assert txpool contains 1st attack

// assert txpool contains 1st attack

assertThat(transactions.getTransactionByHash(attackTxs.get(0).getHash())).isNotEmpty();
//assert txpool does not contain rest of attack
attackTxs.stream().skip(1L).forEach(t -> assertThat(transactions.getTransactionByHash(t.getHash())).isEmpty());
//assert that only 1 of the valid batch was purged
// assert txpool does not contain rest of attack
attackTxs.stream()
.skip(1L)
.forEach(t -> assertThat(transactions.getTransactionByHash(t.getHash())).isEmpty());
// assert that only 1 of the valid batch was purged
assertThat(toValidate.isEmpty()).isFalse();
}

Expand Down