Skip to content

Commit

Permalink
Revert "make mark/sweep prepare async with main thread (hyperledger#947
Browse files Browse the repository at this point in the history
…)" (hyperledger#1001)

Suspected of making PrunerTest intermittent.

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
  • Loading branch information
ajsutton authored May 29, 2020
1 parent 4adfb37 commit f781dce
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private void testPruner(
markSweepPruner,
blockchain,
new PrunerConfiguration(blockConfirmations, numBlocksToKeep),
new MockExecutorService());
MockExecutorService::new);

pruner.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.hyperledger.besu.ethereum.core.Hash;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand All @@ -47,17 +47,18 @@ public class Pruner {
private final long blockConfirmations;

private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private final ExecutorService executorService;
private final Supplier<ExecutorService> executorServiceSupplier;
private ExecutorService executorService;

@VisibleForTesting
Pruner(
final MarkSweepPruner pruningStrategy,
final Blockchain blockchain,
final PrunerConfiguration prunerConfiguration,
final ExecutorService executorService) {
final Supplier<ExecutorService> executorServiceSupplier) {
this.pruningStrategy = pruningStrategy;
this.blockchain = blockchain;
this.executorService = executorService;
this.executorServiceSupplier = executorServiceSupplier;
this.blocksRetained = prunerConfiguration.getBlocksRetained();
this.blockConfirmations = prunerConfiguration.getBlockConfirmations();
checkArgument(
Expand All @@ -69,34 +70,27 @@ public Pruner(
final MarkSweepPruner pruningStrategy,
final Blockchain blockchain,
final PrunerConfiguration prunerConfiguration) {
this(
pruningStrategy,
blockchain,
prunerConfiguration,
// This is basically the out-of-the-box `Executors.newSingleThreadExecutor` except we want
// the `corePoolSize` to be 0
new ThreadPoolExecutor(
0,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
this(pruningStrategy, blockchain, prunerConfiguration, getDefaultExecutorSupplier());
}

private static Supplier<ExecutorService> getDefaultExecutorSupplier() {
return () ->
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.setNameFormat("StatePruning-%d")
.build()));
.build());
}

public void start() {
execute(
() -> {
if (state.compareAndSet(State.IDLE, State.RUNNING)) {
LOG.info("Starting Pruner.");
pruningStrategy.prepare();
blockAddedObserverId = blockchain.observeBlockAdded(this::handleNewBlock);
}
});

if (state.compareAndSet(State.IDLE, State.RUNNING)) {
LOG.info("Starting Pruner.");
executorService = executorServiceSupplier.get();
pruningStrategy.prepare();
blockAddedObserverId = blockchain.observeBlockAdded(this::handleNewBlock);
}
}

public void stop() {
Expand Down Expand Up @@ -164,7 +158,7 @@ private void execute(final Runnable action) {
try {
executorService.execute(action);
} catch (final Throwable t) {
LOG.error("Pruner failed", t);
LOG.error("Pruning failed", t);
pruningStrategy.cleanup();
pruningPhase.set(PruningPhase.IDLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.MockExecutorService;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -52,20 +52,25 @@ public class PrunerTest {
private final BlockDataGenerator gen = new BlockDataGenerator();

@Mock private MarkSweepPruner markSweepPruner;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ExecutorService mockExecutorService = new MockExecutorService();
private final Supplier<ExecutorService> mockExecutorServiceSupplier = () -> mockExecutorService;

private final Block genesisBlock = gen.genesisBlock();

@Test
public void shouldMarkCorrectBlockAndSweep() throws ExecutionException, InterruptedException {
public void shouldMarkCorrectBlockAndSweep() {
final BlockchainStorage blockchainStorage =
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions());
final MutableBlockchain blockchain =
DefaultBlockchain.createMutable(genesisBlock, blockchainStorage, metricsSystem);

final Pruner pruner =
new Pruner(markSweepPruner, blockchain, new PrunerConfiguration(0, 1), executorService);
new Pruner(
markSweepPruner,
blockchain,
new PrunerConfiguration(0, 1),
mockExecutorServiceSupplier);
pruner.start();

final Block block1 = appendBlockWithParent(blockchain, genesisBlock);
Expand All @@ -86,7 +91,11 @@ public void shouldOnlySweepAfterBlockConfirmationPeriodAndRetentionPeriodEnds()
DefaultBlockchain.createMutable(genesisBlock, blockchainStorage, metricsSystem);

final Pruner pruner =
new Pruner(markSweepPruner, blockchain, new PrunerConfiguration(1, 2), executorService);
new Pruner(
markSweepPruner,
blockchain,
new PrunerConfiguration(1, 2),
mockExecutorServiceSupplier);
pruner.start();

final Hash markBlockStateRootHash =
Expand All @@ -113,7 +122,11 @@ public void abortsPruningWhenFullyMarkedBlockNoLongerOnCanonicalChain() {

// start pruner so it can start handling block added events
final Pruner pruner =
new Pruner(markSweepPruner, blockchain, new PrunerConfiguration(0, 1), executorService);
new Pruner(
markSweepPruner,
blockchain,
new PrunerConfiguration(0, 1),
mockExecutorServiceSupplier);
pruner.start();

/*
Expand Down Expand Up @@ -150,17 +163,26 @@ public void shouldRejectInvalidArguments() {
assertThatThrownBy(
() ->
new Pruner(
markSweepPruner, mockchain, new PrunerConfiguration(-1, -2), executorService))
markSweepPruner,
mockchain,
new PrunerConfiguration(-1, -2),
mockExecutorServiceSupplier))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(
() ->
new Pruner(
markSweepPruner, mockchain, new PrunerConfiguration(10, 8), executorService))
markSweepPruner,
mockchain,
new PrunerConfiguration(10, 8),
mockExecutorServiceSupplier))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(
() ->
new Pruner(
markSweepPruner, mockchain, new PrunerConfiguration(10, 10), executorService))
markSweepPruner,
mockchain,
new PrunerConfiguration(10, 10),
mockExecutorServiceSupplier))
.isInstanceOf(IllegalArgumentException.class);
}

Expand All @@ -173,9 +195,12 @@ public void shouldCleanUpPruningStrategyOnShutdown() throws InterruptedException
DefaultBlockchain.createMutable(genesisBlock, blockchainStorage, metricsSystem);

final Pruner pruner =
new Pruner(markSweepPruner, blockchain, new PrunerConfiguration(0, 1), executorService);
new Pruner(
markSweepPruner,
blockchain,
new PrunerConfiguration(0, 1),
mockExecutorServiceSupplier);
pruner.start();
Thread.sleep(1);
pruner.stop();
verify(markSweepPruner).cleanup();
}
Expand Down

0 comments on commit f781dce

Please sign in to comment.