Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Ibft pantheon controller (#461)
Browse files Browse the repository at this point in the history
  • Loading branch information
jframe authored Dec 19, 2018
1 parent 2fc38e7 commit 7f00580
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
*/
package tech.pegasys.pantheon.consensus.ibft;

import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;

import java.util.Optional;
import java.util.concurrent.Executors;
Expand All @@ -25,45 +30,37 @@

/** Execution context for draining queued ibft events and applying them to a maintained state */
public class IbftProcessor implements Runnable {

private static final Logger LOG = LogManager.getLogger();

private final IbftEventQueue incomingQueue;
private final ScheduledExecutorService roundTimerExecutor;
private final RoundTimer roundTimer;
private final IbftStateMachine stateMachine;
private volatile boolean shutdown = false;
private final IbftController ibftController;

/**
* Construct a new IbftProcessor
*
* @param incomingQueue The event queue from which to drain new events
* @param baseRoundExpirySeconds The expiry time in milliseconds of round 0
* @param stateMachine an IbftStateMachine ready to process events and maintain state
* @param ibftController an object capable of handling any/all IBFT events
*/
public IbftProcessor(
final IbftEventQueue incomingQueue,
final int baseRoundExpirySeconds,
final IbftStateMachine stateMachine) {
public IbftProcessor(final IbftEventQueue incomingQueue, final IbftController ibftController) {
// Spawning the round timer with a single thread as we should never have more than 1 timer in
// flight at a time
this(
incomingQueue,
baseRoundExpirySeconds,
stateMachine,
Executors.newSingleThreadScheduledExecutor());
this(incomingQueue, ibftController, Executors.newSingleThreadScheduledExecutor());
}

@VisibleForTesting
IbftProcessor(
final IbftEventQueue incomingQueue,
final int baseRoundExpirySeconds,
final IbftStateMachine stateMachine,
final IbftController ibftController,
final ScheduledExecutorService roundTimerExecutor) {
this.incomingQueue = incomingQueue;
this.ibftController = ibftController;
this.roundTimerExecutor = roundTimerExecutor;
this.roundTimer = new RoundTimer(incomingQueue, baseRoundExpirySeconds, roundTimerExecutor);
this.stateMachine = stateMachine;
}

public void start() {
ibftController.start();
}

/** Indicate to the processor that it should gracefully stop at its next opportunity */
Expand All @@ -74,25 +71,46 @@ public void stop() {
@Override
public void run() {
while (!shutdown) {
Optional<IbftEvent> newEvent = Optional.empty();
try {
newEvent = Optional.ofNullable(incomingQueue.poll(2, TimeUnit.SECONDS));
} catch (final InterruptedException interrupt) {
// If the queue was interrupted propagate it and spin to check our shutdown status
Thread.currentThread().interrupt();
}

newEvent.ifPresent(
ibftEvent -> {
try {
stateMachine.processEvent(ibftEvent, roundTimer);
} catch (final Exception e) {
LOG.error(
"State machine threw exception while processing event {" + ibftEvent + "}", e);
}
});
nextIbftEvent().ifPresent(this::handleIbftEvent);
}
// Clean up the executor service the round timer has been utilising
roundTimerExecutor.shutdownNow();
}

private void handleIbftEvent(final IbftEvent ibftEvent) {
try {
switch (ibftEvent.getType()) {
case MESSAGE:
final IbftReceivedMessageEvent rxEvent = (IbftReceivedMessageEvent) ibftEvent;
ibftController.handleMessageEvent(rxEvent);
break;
case ROUND_EXPIRY:
final RoundExpiry roundExpiryEvent = (RoundExpiry) ibftEvent;
ibftController.handleRoundExpiry(roundExpiryEvent);
break;
case NEW_CHAIN_HEAD:
final NewChainHead newChainHead = (NewChainHead) ibftEvent;
ibftController.handleNewBlockEvent(newChainHead);
break;
case BLOCK_TIMER_EXPIRY:
final BlockTimerExpiry blockTimerExpiry = (BlockTimerExpiry) ibftEvent;
ibftController.handleBlockTimerExpiry(blockTimerExpiry);
break;
default:
throw new RuntimeException("Illegal event in queue.");
}
} catch (final Exception e) {
LOG.error("State machine threw exception while processing event {" + ibftEvent + "}", e);
}
}

private Optional<IbftEvent> nextIbftEvent() {
try {
return Optional.ofNullable(incomingQueue.poll(500, TimeUnit.MILLISECONDS));
} catch (final InterruptedException interrupt) {
// If the queue was interrupted propagate it and spin to check our shutdown status
Thread.currentThread().interrupt();
return Optional.empty();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,77 @@
*/
package tech.pegasys.pantheon.consensus.ibft.blockcreation;

import static org.apache.logging.log4j.LogManager.getLogger;

import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftProcessor;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.ethereum.blockcreation.MiningCoordinator;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedObserver;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.util.bytes.BytesValue;

public class IbftMiningCoordinator implements MiningCoordinator {
import org.apache.logging.log4j.Logger;

public class IbftMiningCoordinator implements MiningCoordinator, BlockAddedObserver {

private final IbftBlockCreatorFactory blockCreatorFactory;
private static final Logger LOG = getLogger();
protected final Blockchain blockchain;
private final IbftEventQueue eventQueue;
private final IbftProcessor ibftProcessor;

public IbftMiningCoordinator(final IbftBlockCreatorFactory blockCreatorFactory) {
public IbftMiningCoordinator(
final IbftProcessor ibftProcessor,
final IbftBlockCreatorFactory blockCreatorFactory,
final Blockchain blockchain,
final IbftEventQueue eventQueue) {
this.ibftProcessor = ibftProcessor;
this.blockCreatorFactory = blockCreatorFactory;
this.eventQueue = eventQueue;

this.blockchain = blockchain;
this.blockchain.observeBlockAdded(this);
}

@Override
public void enable() {}
public void enable() {
ibftProcessor.start();
// IbftProcessor is implicitly running (but maybe should have a discard" all)
}

@Override
public void disable() {}
public void disable() {
ibftProcessor.stop();
}

@Override
public boolean isRunning() {
return true;
}

@Override
public void setMinTransactionGasPrice(final Wei minGasPrice) {}
public void setMinTransactionGasPrice(final Wei minGasPrice) {
blockCreatorFactory.setMinTransactionGasPrice(minGasPrice);
}

@Override
public Wei getMinTransactionGasPrice() {
return null;
return blockCreatorFactory.getMinTransactionGasPrice();
}

@Override
public void setExtraData(final BytesValue extraData) {
blockCreatorFactory.setExtraData(extraData);
}

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
LOG.info("New canonical head detected. {} ", event.isNewCanonicalHead());
if (event.isNewCanonicalHead()) {
eventQueue.add(new NewChainHead(event.getBlock().getHeader()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.verify;

import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -39,20 +40,20 @@
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class IbftProcessorTest {
private ScheduledExecutorService mockExecutorService;
private IbftStateMachine mockStateMachine;
private IbftController mockIbftController;

@Before
public void initialise() {
mockExecutorService = mock(ScheduledExecutorService.class);
mockStateMachine = mock(IbftStateMachine.class);
mockIbftController = mock(IbftController.class);
}

@Test
public void handlesStopGracefully() throws InterruptedException {
final IbftEventQueue mockQueue = mock(IbftEventQueue.class);
Mockito.when(mockQueue.poll(anyLong(), any())).thenReturn(null);
final IbftProcessor processor =
new IbftProcessor(mockQueue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(mockQueue, mockIbftController, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -81,7 +82,7 @@ public void handlesStopGracefully() throws InterruptedException {
@Test
public void cleanupExecutorsAfterShutdownNow() throws InterruptedException {
final IbftProcessor processor =
new IbftProcessor(new IbftEventQueue(), 1, mockStateMachine, mockExecutorService);
new IbftProcessor(new IbftEventQueue(), mockIbftController, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -111,7 +112,7 @@ public void handlesQueueInterruptGracefully() throws InterruptedException {
Mockito.when(mockQueue.poll(anyLong(), any())).thenThrow(new InterruptedException());

final IbftProcessor processor =
new IbftProcessor(mockQueue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(mockQueue, mockIbftController, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -144,7 +145,7 @@ public void handlesQueueInterruptGracefully() throws InterruptedException {
public void drainEventsIntoStateMachine() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue();
final IbftProcessor processor =
new IbftProcessor(queue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(queue, mockIbftController, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand All @@ -160,6 +161,6 @@ public void drainEventsIntoStateMachine() throws InterruptedException {
processor.stop();
processorExecutor.shutdown();

verify(mockStateMachine, times(2)).processEvent(eq(roundExpiryEvent), any());
verify(mockIbftController, times(2)).handleRoundExpiry(eq(roundExpiryEvent));
}
}
Loading

0 comments on commit 7f00580

Please sign in to comment.