Skip to content

Commit

Permalink
Fix miner startup logic (hyperledger#104)
Browse files Browse the repository at this point in the history
Signed-off-by: Meredith Baxter <meredith.baxter@consensys.net>
  • Loading branch information
mbaxter authored Oct 28, 2019
1 parent 2ca95c0 commit 70b0fc2
Show file tree
Hide file tree
Showing 43 changed files with 722 additions and 283 deletions.
73 changes: 45 additions & 28 deletions besu/src/main/java/org/hyperledger/besu/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -43,6 +44,8 @@ public class Runner implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger();

private final Vertx vertx;
private final CountDownLatch vertxShutdownLatch = new CountDownLatch(1);
private final CountDownLatch shutdown = new CountDownLatch(1);

private final NetworkRunner networkRunner;
private final Optional<UpnpNatManager> natManager;
Expand Down Expand Up @@ -78,13 +81,12 @@ public class Runner implements AutoCloseable {
public void start() {
try {
LOG.info("Starting Ethereum main loop ... ");
if (natManager.isPresent()) {
natManager.get().start();
}
natManager.ifPresent(UpnpNatManager::start);
networkRunner.start();
if (networkRunner.getNetwork().isP2pEnabled()) {
besuController.getSynchronizer().start();
}
besuController.getMiningCoordinator().start();
vertx.setPeriodic(
TimeUnit.MINUTES.toMillis(1),
time ->
Expand All @@ -101,40 +103,41 @@ public void start() {
}
}

public void stop() {
jsonRpc.ifPresent(service -> waitForServiceToStop("jsonRpc", service.stop()));
graphQLHttp.ifPresent(service -> waitForServiceToStop("graphQLHttp", service.stop()));
websocketRpc.ifPresent(service -> waitForServiceToStop("websocketRpc", service.stop()));
metrics.ifPresent(service -> waitForServiceToStop("metrics", service.stop()));

besuController.getMiningCoordinator().stop();
waitForServiceToStop("Mining Coordinator", besuController.getMiningCoordinator()::awaitStop);
if (networkRunner.getNetwork().isP2pEnabled()) {
besuController.getSynchronizer().stop();
}

networkRunner.stop();
waitForServiceToStop("Network", networkRunner::awaitStop);

natManager.ifPresent(UpnpNatManager::stop);
besuController.close();
vertx.close((res) -> vertxShutdownLatch.countDown());
waitForServiceToStop("Vertx", vertxShutdownLatch::await);
shutdown.countDown();
}

public void awaitStop() {
try {
networkRunner.awaitStop();
shutdown.await();
} catch (final InterruptedException e) {
LOG.debug("Interrupted, exiting", e);
Thread.currentThread().interrupt();
}
}

@Override
public void close() throws Exception {
try {
if (networkRunner.getNetwork().isP2pEnabled()) {
besuController.getSynchronizer().stop();
}

networkRunner.stop();
networkRunner.awaitStop();

jsonRpc.ifPresent(service -> waitForServiceToStop("jsonRpc", service.stop()));
graphQLHttp.ifPresent(service -> waitForServiceToStop("graphQLHttp", service.stop()));
websocketRpc.ifPresent(service -> waitForServiceToStop("websocketRpc", service.stop()));
metrics.ifPresent(service -> waitForServiceToStop("metrics", service.stop()));

if (natManager.isPresent()) {
natManager.get().stop();
}
} finally {
try {
vertx.close();
} finally {
besuController.close();
}
}
public void close() {
stop();
awaitStop();
}

private void waitForServiceToStop(
Expand All @@ -151,6 +154,15 @@ private void waitForServiceToStop(
}
}

private void waitForServiceToStop(final String serviceName, final SynchronousShutdown shutdown) {
try {
shutdown.await();
} catch (final InterruptedException e) {
LOG.debug("Interrupted while waiting for service " + serviceName + " to stop", e);
Thread.currentThread().interrupt();
}
}

private void waitForServiceToStart(
final String serviceName, final CompletableFuture<?> startFuture) {
while (!startFuture.isDone()) {
Expand Down Expand Up @@ -236,4 +248,9 @@ public Optional<Integer> getMetricsPort() {
Optional<EnodeURL> getLocalEnode() {
return networkRunner.getNetwork().getLocalEnode();
}

@FunctionalInterface
private interface SynchronousShutdown {
void await() throws InterruptedException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -82,11 +78,9 @@ protected MiningCoordinator createMiningCoordinator(
final MiningParameters miningParameters,
final SyncState syncState,
final EthProtocolManager ethProtocolManager) {
final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final CliqueMinerExecutor miningExecutor =
new CliqueMinerExecutor(
protocolContext,
minerThreadPool,
protocolSchedule,
transactionPool.getPendingTransactions(),
nodeKeys,
Expand All @@ -108,16 +102,6 @@ protected MiningCoordinator createMiningCoordinator(

// Clique mining is implicitly enabled.
miningCoordinator.enable();
addShutdownAction(
() -> {
miningCoordinator.disable();
minerThreadPool.shutdownNow();
try {
minerThreadPool.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
LOG.error("Failed to shutdown miner executor");
}
});
return miningCoordinator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package org.hyperledger.besu.controller;

import static org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors.newScheduledThreadPool;

import org.hyperledger.besu.config.IbftConfigOptions;
import org.hyperledger.besu.consensus.common.BlockInterface;
import org.hyperledger.besu.consensus.common.EpochManager;
Expand All @@ -28,6 +26,7 @@
import org.hyperledger.besu.consensus.ibft.IbftBlockInterface;
import org.hyperledger.besu.consensus.ibft.IbftContext;
import org.hyperledger.besu.consensus.ibft.IbftEventQueue;
import org.hyperledger.besu.consensus.ibft.IbftExecutors;
import org.hyperledger.besu.consensus.ibft.IbftGossip;
import org.hyperledger.besu.consensus.ibft.IbftProcessor;
import org.hyperledger.besu.consensus.ibft.IbftProtocolSchedule;
Expand Down Expand Up @@ -66,11 +65,6 @@
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.util.Subscribers;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -110,6 +104,7 @@ protected MiningCoordinator createMiningCoordinator(
final SyncState syncState,
final EthProtocolManager ethProtocolManager) {
final MutableBlockchain blockchain = protocolContext.getBlockchain();
final IbftExecutors ibftExecutors = IbftExecutors.create(metricsSystem);

final IbftBlockCreatorFactory blockCreatorFactory =
new IbftBlockCreatorFactory(
Expand All @@ -133,19 +128,16 @@ protected MiningCoordinator createMiningCoordinator(

final IbftGossip gossiper = new IbftGossip(uniqueMessageMulticaster);

final ScheduledExecutorService timerExecutor =
newScheduledThreadPool("IbftTimerExecutor", 1, metricsSystem);

final IbftFinalState finalState =
new IbftFinalState(
voteTallyCache,
nodeKeys,
Util.publicKeyToAddress(nodeKeys.getPublicKey()),
proposerSelector,
uniqueMessageMulticaster,
new RoundTimer(ibftEventQueue, ibftConfig.getRequestTimeoutSeconds(), timerExecutor),
new RoundTimer(ibftEventQueue, ibftConfig.getRequestTimeoutSeconds(), ibftExecutors),
new BlockTimer(
ibftEventQueue, ibftConfig.getBlockPeriodSeconds(), timerExecutor, clock),
ibftEventQueue, ibftConfig.getBlockPeriodSeconds(), ibftExecutors, clock),
blockCreatorFactory,
new MessageFactory(nodeKeys),
clock);
Expand Down Expand Up @@ -184,29 +176,17 @@ protected MiningCoordinator createMiningCoordinator(

final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
final IbftProcessor ibftProcessor = new IbftProcessor(ibftEventQueue, eventMultiplexer);
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
processorExecutor.execute(ibftProcessor);

final MiningCoordinator ibftMiningCoordinator =
new IbftMiningCoordinator(ibftProcessor, blockCreatorFactory, blockchain, ibftEventQueue);
new IbftMiningCoordinator(
ibftExecutors,
ibftController,
ibftProcessor,
blockCreatorFactory,
blockchain,
ibftEventQueue);
ibftMiningCoordinator.enable();
addShutdownAction(
() -> {
ibftProcessor.stop();
ibftMiningCoordinator.disable();
processorExecutor.shutdownNow();
try {
processorExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
LOG.error("Failed to shutdown ibft processor executor");
}
timerExecutor.shutdownNow();
try {
timerExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
LOG.error("Failed to shutdown timer executor");
}
});

return ibftMiningCoordinator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.consensus.ibftlegacy.protocol.Istanbul64ProtocolManager;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.blockcreation.NoopMiningCoordinator;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
Expand Down Expand Up @@ -63,7 +64,7 @@ protected MiningCoordinator createMiningCoordinator(
final MiningParameters miningParameters,
final SyncState syncState,
final EthProtocolManager ethProtocolManager) {
return null;
return new NoopMiningCoordinator(miningParameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,7 @@
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class MainnetBesuControllerBuilder extends BesuControllerBuilder<Void> {
private static final Logger LOG = LogManager.getLogger();

@Override
protected MiningCoordinator createMiningCoordinator(
Expand All @@ -47,11 +39,9 @@ protected MiningCoordinator createMiningCoordinator(
final MiningParameters miningParameters,
final SyncState syncState,
final EthProtocolManager ethProtocolManager) {
final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final EthHashMinerExecutor executor =
new EthHashMinerExecutor(
protocolContext,
minerThreadPool,
protocolSchedule,
transactionPool.getPendingTransactions(),
miningParameters,
Expand All @@ -67,16 +57,7 @@ protected MiningCoordinator createMiningCoordinator(
if (miningParameters.isMiningEnabled()) {
miningCoordinator.enable();
}
addShutdownAction(
() -> {
miningCoordinator.disable();
minerThreadPool.shutdownNow();
try {
minerThreadPool.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
LOG.error("Failed to shutdown miner executor");
}
});

return miningCoordinator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -49,7 +48,6 @@ public class CliqueMinerExecutor extends AbstractMinerExecutor<CliqueContext, Cl

public CliqueMinerExecutor(
final ProtocolContext<CliqueContext> protocolContext,
final ExecutorService executorService,
final ProtocolSchedule<CliqueContext> protocolSchedule,
final PendingTransactions pendingTransactions,
final KeyPair nodeKeys,
Expand All @@ -59,7 +57,6 @@ public CliqueMinerExecutor(
final Function<Long, Long> gasLimitCalculator) {
super(
protocolContext,
executorService,
protocolSchedule,
pendingTransactions,
miningParams,
Expand All @@ -71,19 +68,7 @@ public CliqueMinerExecutor(
}

@Override
public CliqueBlockMiner startAsyncMining(
final Subscribers<MinedBlockObserver> observers, final BlockHeader parentHeader) {
final CliqueBlockMiner currentRunningMiner = createMiner(observers, parentHeader);
executorService.execute(currentRunningMiner);
return currentRunningMiner;
}

@Override
public CliqueBlockMiner createMiner(final BlockHeader parentHeader) {
return createMiner(Subscribers.none(), parentHeader);
}

private CliqueBlockMiner createMiner(
public CliqueBlockMiner createMiner(
final Subscribers<MinedBlockObserver> observers, final BlockHeader parentHeader) {
final Function<BlockHeader, CliqueBlockCreator> blockCreator =
(header) ->
Expand Down
Loading

0 comments on commit 70b0fc2

Please sign in to comment.