Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PIE-1707] Implement a timeout in TransactionMessageProcessor (#1604)
Browse files Browse the repository at this point in the history
* [PIE-1707] Implement a timeout in TransactionMessageProcessor

- `processTransactionsMessage` now takes a `keepAlive` parameter
- don't process the message if expired
- add unit tests
- use a default timeout for transactions (1 minute)

* Update ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java

Co-Authored-By: Nicolas MASSART <NicolasMassart@users.noreply.github.com>
  • Loading branch information
AbdelStark and NicolasMassart authored Jun 25, 2019
1 parent 333135b commit 45e35da
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.time.Instant.now;

import tech.pegasys.pantheon.ethereum.eth.manager.EthMessage;
import tech.pegasys.pantheon.ethereum.eth.manager.EthMessages.MessageCallback;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;

import java.time.Duration;
import java.time.Instant;

class TransactionsMessageHandler implements MessageCallback {

private static final Duration TX_KEEP_ALIVE = Duration.ofMinutes(1);
private final TransactionsMessageProcessor transactionsMessageProcessor;
private final EthScheduler scheduler;

Expand All @@ -32,9 +38,10 @@ public TransactionsMessageHandler(
@Override
public void exec(final EthMessage message) {
final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData());
final Instant startedAt = now();
scheduler.scheduleTxWorkerTask(
() ->
transactionsMessageProcessor.processTransactionsMessage(
message.getPeer(), transactionsMessage));
message.getPeer(), transactionsMessage, startedAt, TX_KEEP_ALIVE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.time.Instant.now;
import static org.apache.logging.log4j.LogManager.getLogger;

import tech.pegasys.pantheon.ethereum.core.Transaction;
Expand All @@ -20,6 +21,8 @@
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;

import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Set;

Expand All @@ -39,6 +42,17 @@ public TransactionsMessageProcessor(
}

void processTransactionsMessage(
final EthPeer peer,
final TransactionsMessage transactionsMessage,
final Instant startedAt,
final Duration keepAlive) {
// Check if message not expired.
if (startedAt.plus(keepAlive).isAfter(now())) {
this.processTransactionsMessage(peer, transactionsMessage);
}
}

private void processTransactionsMessage(
final EthPeer peer, final TransactionsMessage transactionsMessage) {
try {
LOG.trace("Received transactions message from {}", peer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static java.time.Instant.now;
import static java.util.Arrays.asList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.Transaction;
Expand All @@ -41,7 +45,10 @@ public class TransactionsMessageProcessorTest {
@Test
public void shouldMarkAllReceivedTransactionsAsSeen() {
messageHandler.processTransactionsMessage(
peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3)));
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now(),
ofMinutes(1));

verify(transactionTracker)
.markTransactionsAsSeen(peer1, ImmutableSet.of(transaction1, transaction2, transaction3));
Expand All @@ -50,9 +57,31 @@ public void shouldMarkAllReceivedTransactionsAsSeen() {
@Test
public void shouldAddReceivedTransactionsToTransactionPool() {
messageHandler.processTransactionsMessage(
peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3)));

peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now(),
ofMinutes(1));
verify(transactionPool)
.addRemoteTransactions(ImmutableSet.of(transaction1, transaction2, transaction3));
}

@Test
public void shouldNotMarkReceivedExpiredTransactionsAsSeen() {
messageHandler.processTransactionsMessage(
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now().minus(ofMinutes(1)),
ofMillis(1));
verifyZeroInteractions(transactionTracker);
}

@Test
public void shouldNotAddReceivedTransactionsToTransactionPoolIfExpired() {
messageHandler.processTransactionsMessage(
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now().minus(ofMinutes(1)),
ofMillis(1));
verifyZeroInteractions(transactionPool);
}
}

0 comments on commit 45e35da

Please sign in to comment.