Skip to content

Commit

Permalink
[skip ci] Introduce EthSchedule.OrderedProcessor
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Dec 19, 2023
1 parent 27ca7f1 commit f63e65d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
Expand All @@ -64,6 +63,7 @@
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import org.hyperledger.besu.testutil.TestClock;

import java.math.BigInteger;
Expand Down Expand Up @@ -100,7 +100,6 @@ public class BesuEventsImplTest {
@Mock private EthPeers mockEthPeers;
@Mock private EthContext mockEthContext;
@Mock private EthMessages mockEthMessages;
@Mock private EthScheduler mockEthScheduler;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private TransactionValidatorFactory mockTransactionValidatorFactory;
Expand Down Expand Up @@ -128,7 +127,7 @@ public void setUp() {

when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthContext.getScheduler()).thenReturn(new DeterministicEthScheduler());
lenient().when(mockEthPeers.streamAvailablePeers()).thenAnswer(z -> Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
lenient().when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void setUp() {
blockchain::getChainHeadHeader);
final ProtocolContext protocolContext = executionContext.getProtocolContext();

EthContext ethContext = mock(EthContext.class);
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void setUp() {
blockchain::getChainHeadHeader);
final ProtocolContext protocolContext = executionContext.getProtocolContext();

EthContext ethContext = mock(EthContext.class);
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -34,6 +36,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -295,4 +299,49 @@ public <T> void failAfterTimeout(final CompletableFuture<T> promise, final Durat
delay,
unit);
}

public <ITEM> OrderedProcessor<ITEM> createOrderedProcessor(final Consumer<ITEM> processor) {
return new OrderedProcessor<>(processor);
}

/**
* This class is a way to execute a set of tasks, one by one, in a strict order, without blocking
* the caller in case there are still previous tasks queued
*
* @param <ITEM> the class of item to be processed
*/
public class OrderedProcessor<ITEM> {
private final Queue<ITEM> blockAddedQueue = new ConcurrentLinkedQueue<>();
private final ReentrantLock blockAddedLock = new ReentrantLock();
private final Consumer<ITEM> processor;

private OrderedProcessor(final Consumer<ITEM> processor) {
this.processor = processor;
}

public void submit(final ITEM item) {
// add the item to the processing queue
blockAddedQueue.add(item);

if (blockAddedLock.hasQueuedThreads()) {
// another thread is already waiting to process the queue with our item, there is no need to
// schedule another thread
LOG.trace(
"Block added event queue is already being processed and an already queued thread is present, nothing to do");
} else {
servicesExecutor.submit(
() -> {
blockAddedLock.lock();
try {
// now that we have the lock, process as many items as possible
for (ITEM i = blockAddedQueue.poll(); i != null; i = blockAddedQueue.poll()) {
processor.accept(i);
}
} finally {
blockAddedLock.unlock();
}
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidator;
Expand All @@ -53,7 +54,6 @@
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand All @@ -62,11 +62,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -108,9 +106,7 @@ public class TransactionPool implements BlockAddedObserver {
private volatile OptionalLong subscribeConnectId = OptionalLong.empty();
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
private final Set<Address> localSenders = ConcurrentHashMap.newKeySet();
private final ReentrantLock blockAddedLock = new ReentrantLock();
private final AtomicReference<Thread> blockAddedQueuedThread = new AtomicReference<>();
private final Queue<BlockAddedEvent> blockAddedQueue = new ConcurrentLinkedQueue<>();
private final EthScheduler.OrderedProcessor<BlockAddedEvent> blockAddedEventOrderedProcessor;

public TransactionPool(
final Supplier<PendingTransactions> pendingTransactionsSupplier,
Expand All @@ -132,6 +128,8 @@ public TransactionPool(
pluginTransactionValidatorFactory == null
? null
: pluginTransactionValidatorFactory.create();
this.blockAddedEventOrderedProcessor =
ethContext.getScheduler().createOrderedProcessor(this::processBlockAddedEvent);
initLogForReplay();
}

Expand Down Expand Up @@ -327,76 +325,24 @@ public void onBlockAdded(final BlockAddedEvent event) {
if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED)
|| event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) {

// add the event to the processing queue
blockAddedQueue.add(event);
processBlockAddedQueue();
blockAddedEventOrderedProcessor.submit(event);
}
}
}

private void processBlockAddedQueue() {
// we want to process the added block asynchronously,
// but at the same time we must ensure that blocks are processed in order one at time
ethContext
.getScheduler()
.scheduleServiceTask(
() -> {
// if we were the thread waiting to be executed, then clear the queued thread
blockAddedQueuedThread.compareAndSet(Thread.currentThread(), null);

int blockProcessed = 0;
if (!blockAddedQueue.isEmpty()) {
if (blockAddedLock.tryLock()) {
// no other thread is processing the queue, so start processing it
try {
for (BlockAddedEvent e = blockAddedQueue.poll();
e != null;
e = blockAddedQueue.poll()) {
final long started = System.currentTimeMillis();
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule
.getByBlockHeader(e.getBlock().getHeader())
.getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atTrace()
.setMessage(
"Block added event {} processed in {}ms, block processed by this thread {}")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.addArgument(++blockProcessed)
.log();
}
} finally {
blockAddedLock.unlock();
}
} else {
blockAddedQueuedThread.getAndUpdate(
qt -> {
if (qt == null) {
// if no queued thread, then try later
LOG.trace(
"Block added event queue already being processed, retry later, queue thread {}",
qt);
ethContext
.getScheduler()
.scheduleFutureTask(
this::processBlockAddedQueue, Duration.ofMillis(100));
} else {
LOG.trace(
"Block added event queue already being processed and an already queued thread present {}, nothing to do",
qt);
}
// record ourselves as queued thread
return Thread.currentThread();
});
return null;
}
}
return null;
});
private void processBlockAddedEvent(final BlockAddedEvent e) {
final long started = System.currentTimeMillis();
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule.getByBlockHeader(e.getBlock().getHeader()).getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atTrace()
.setMessage("Block added event {} processed in {}ms")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.log();
}

private void reAddTransactions(final List<Transaction> reAddTransactions) {
Expand Down

0 comments on commit f63e65d

Please sign in to comment.