Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Tidy-up FastSyncState persistence #845

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncActions;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.PivotHeaderStorage;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncStateStorage;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.NodeDataRequest;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
Expand Down Expand Up @@ -51,16 +52,19 @@ class FastSynchronizer<C> {
private final Path fastSyncDataDirectory;
private final BigQueue<NodeDataRequest> stateQueue;
private final WorldStateDownloader worldStateDownloader;
private final FastSyncState initialSyncState;

private FastSynchronizer(
final FastSyncDownloader<C> fastSyncDownloader,
final Path fastSyncDataDirectory,
final BigQueue<NodeDataRequest> stateQueue,
final WorldStateDownloader worldStateDownloader) {
final WorldStateDownloader worldStateDownloader,
final FastSyncState initialSyncState) {
this.fastSyncDownloader = fastSyncDownloader;
this.fastSyncDataDirectory = fastSyncDataDirectory;
this.stateQueue = stateQueue;
this.worldStateDownloader = worldStateDownloader;
this.initialSyncState = initialSyncState;
}

public static <C> Optional<FastSynchronizer<C>> create(
Expand All @@ -78,8 +82,11 @@ public static <C> Optional<FastSynchronizer<C>> create(
}

final Path fastSyncDataDirectory = getFastSyncDataDirectory(dataDirectory);
final PivotHeaderStorage pivotHeaderStorage = new PivotHeaderStorage(fastSyncDataDirectory);
if (!pivotHeaderStorage.isFastSyncInProgress()
final FastSyncStateStorage fastSyncStateStorage =
new FastSyncStateStorage(fastSyncDataDirectory);
final FastSyncState fastSyncState =
fastSyncStateStorage.loadState(ScheduleBasedBlockHashFunction.create(protocolSchedule));
if (!fastSyncState.getPivotBlockHeader().isPresent()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
!= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info(
Expand All @@ -106,21 +113,25 @@ public static <C> Optional<FastSynchronizer<C>> create(
protocolContext,
ethContext,
syncState,
pivotHeaderStorage,
ethTasksTimer,
metricsSystem.createLabelledCounter(
MetricCategory.SYNCHRONIZER,
"fast_sync_validation_mode",
"Number of blocks validated using light vs full validation during fast sync",
"validationMode")),
worldStateDownloader);
worldStateDownloader,
fastSyncStateStorage);
return Optional.of(
new FastSynchronizer<>(
fastSyncDownloader, fastSyncDataDirectory, stateQueue, worldStateDownloader));
fastSyncDownloader,
fastSyncDataDirectory,
stateQueue,
worldStateDownloader,
fastSyncState));
}

public CompletableFuture<FastSyncState> start() {
return fastSyncDownloader.start();
return fastSyncDownloader.start(initialSyncState);
}

public void deleteFastSyncState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

Expand All @@ -46,7 +43,6 @@ public class FastSyncActions<C> {
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final PivotHeaderStorage pivotHeaderStorage;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final LabelledMetric<Counter> fastSyncValidationCounter;

Expand All @@ -56,26 +52,28 @@ public FastSyncActions(
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final PivotHeaderStorage pivotHeaderStorage,
final LabelledMetric<OperationTimer> ethTasksTimer,
final LabelledMetric<Counter> fastSyncValidationCounter) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.syncState = syncState;
this.pivotHeaderStorage = pivotHeaderStorage;
this.ethTasksTimer = ethTasksTimer;
this.fastSyncValidationCounter = fastSyncValidationCounter;
}

public CompletableFuture<Void> waitForSuitablePeers() {
public CompletableFuture<FastSyncState> waitForSuitablePeers(final FastSyncState fastSyncState) {
if (fastSyncState.hasPivotBlockHeader()) {
return waitForAnyPeer().thenApply(ignore -> fastSyncState);
}

final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(
ethContext, syncConfig.getFastSyncMinimumPeerCount(), ethTasksTimer);

final EthScheduler scheduler = ethContext.getScheduler();
final CompletableFuture<Void> result = new CompletableFuture<>();
final CompletableFuture<FastSyncState> result = new CompletableFuture<>();
scheduler
.timeout(waitForPeersTask, syncConfig.getFastSyncMaximumPeerWaitTime())
.handle(
Expand All @@ -89,7 +87,7 @@ public CompletableFuture<Void> waitForSuitablePeers() {
LOG.warn(
"Maximum wait time for fast sync reached but no peers available. Continuing to wait for any available peer.");
waitForAnyPeer()
.thenAccept(result::complete)
.thenAccept(value -> result.complete(fastSyncState))
.exceptionally(
taskError -> {
result.completeExceptionally(error);
Expand Down Expand Up @@ -130,17 +128,10 @@ private void waitForAnyPeer(final CompletableFuture<Void> result) {
});
}

public CompletableFuture<FastSyncState> selectPivotBlock() {
return loadPivotBlockFromStorage().orElseGet(this::selectPivotBlockFromPeers);
}

private Optional<CompletableFuture<FastSyncState>> loadPivotBlockFromStorage() {
return pivotHeaderStorage
.loadPivotBlockHeader(ScheduleBasedBlockHashFunction.create(protocolSchedule))
.map(
header ->
completedFuture(
new FastSyncState(OptionalLong.of(header.getNumber()), Optional.of(header))));
public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fastSyncState) {
return fastSyncState.hasPivotBlockHeader()
? completedFuture(fastSyncState)
: selectPivotBlockFromPeers();
}

private CompletableFuture<FastSyncState> selectPivotBlockFromPeers() {
Expand All @@ -156,7 +147,7 @@ private CompletableFuture<FastSyncState> selectPivotBlockFromPeers() {
throw new FastSyncException(CHAIN_TOO_SHORT);
} else {
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
return completedFuture(new FastSyncState(OptionalLong.of(pivotBlockNumber)));
return completedFuture(new FastSyncState(pivotBlockNumber));
}
})
.orElseGet(this::retrySelectPivotBlockAfterDelay);
Expand All @@ -167,7 +158,7 @@ private CompletableFuture<FastSyncState> retrySelectPivotBlockAfterDelay() {
return ethContext
.getScheduler()
.scheduleFutureTask(
() -> waitForAnyPeer().thenCompose(ignore -> selectPivotBlock()),
() -> waitForAnyPeer().thenCompose(ignore -> selectPivotBlockFromPeers()),
Duration.ofSeconds(1));
}

Expand All @@ -181,16 +172,10 @@ public CompletableFuture<FastSyncState> downloadPivotBlockHeader(
ethContext,
ethTasksTimer,
currentState.getPivotBlockNumber().getAsLong())
.downloadPivotBlockHeader()
.thenApply(this::storePivotBlockHeader);
}

private FastSyncState storePivotBlockHeader(final FastSyncState fastSyncState) {
pivotHeaderStorage.storePivotBlockHeader(fastSyncState.getPivotBlockHeader().get());
return fastSyncState;
.downloadPivotBlockHeader();
}

public CompletableFuture<Void> downloadChain(final FastSyncState currentState) {
public CompletableFuture<FastSyncState> downloadChain(final FastSyncState currentState) {
final FastSyncChainDownloader<C> downloader =
new FastSyncChainDownloader<>(
syncConfig,
Expand All @@ -201,6 +186,6 @@ public CompletableFuture<Void> downloadChain(final FastSyncState currentState) {
ethTasksTimer,
fastSyncValidationCounter,
currentState.getPivotBlockHeader().get());
return downloader.start();
return downloader.start().thenApply(ignore -> currentState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,38 @@ public class FastSyncDownloader<C> {
private static final Logger LOG = LogManager.getLogger();
private final FastSyncActions<C> fastSyncActions;
private final WorldStateDownloader worldStateDownloader;
private final FastSyncStateStorage fastSyncStateStorage;

public FastSyncDownloader(
final FastSyncActions<C> fastSyncActions, final WorldStateDownloader worldStateDownloader) {
final FastSyncActions<C> fastSyncActions,
final WorldStateDownloader worldStateDownloader,
final FastSyncStateStorage fastSyncStateStorage) {
this.fastSyncActions = fastSyncActions;
this.worldStateDownloader = worldStateDownloader;
this.fastSyncStateStorage = fastSyncStateStorage;
}

public CompletableFuture<FastSyncState> start() {
public CompletableFuture<FastSyncState> start(final FastSyncState fastSyncState) {
LOG.info("Fast sync enabled");
return fastSyncActions
.waitForSuitablePeers()
.thenCompose(state -> fastSyncActions.selectPivotBlock())
.waitForSuitablePeers(fastSyncState)
.thenCompose(fastSyncActions::selectPivotBlock)
.thenCompose(fastSyncActions::downloadPivotBlockHeader)
.thenApply(this::storeState)
.thenCompose(this::downloadChainAndWorldState);
}

private FastSyncState storeState(final FastSyncState state) {
fastSyncStateStorage.storeState(state);
return state;
}

private CompletableFuture<FastSyncState> downloadChainAndWorldState(
final FastSyncState currentState) {
final CompletableFuture<Void> worldStateFuture =
worldStateDownloader.run(currentState.getPivotBlockHeader().get());
final CompletableFuture<Void> chainFuture = fastSyncActions.downloadChain(currentState);
final CompletableFuture<FastSyncState> chainFuture =
fastSyncActions.downloadChain(currentState);

// If either download fails, cancel the other one.
chainFuture.exceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@

public class FastSyncState {

public static FastSyncState EMPTY_SYNC_STATE =
new FastSyncState(OptionalLong.empty(), Optional.empty());

private final OptionalLong pivotBlockNumber;
private final Optional<BlockHeader> pivotBlockHeader;

public FastSyncState() {
this(OptionalLong.empty(), Optional.empty());
public FastSyncState(final long pivotBlockNumber) {
this(OptionalLong.of(pivotBlockNumber), Optional.empty());
}

public FastSyncState(final OptionalLong pivotBlockNumber) {
this(pivotBlockNumber, Optional.empty());
public FastSyncState(final BlockHeader pivotBlockHeader) {
this(OptionalLong.of(pivotBlockHeader.getNumber()), Optional.of(pivotBlockHeader));
}

public FastSyncState(
private FastSyncState(
final OptionalLong pivotBlockNumber, final Optional<BlockHeader> pivotBlockHeader) {
this.pivotBlockNumber = pivotBlockNumber;
this.pivotBlockHeader = pivotBlockHeader;
Expand All @@ -47,6 +50,10 @@ public Optional<BlockHeader> getPivotBlockHeader() {
return pivotBlockHeader;
}

public boolean hasPivotBlockHeader() {
return pivotBlockHeader.isPresent();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,57 @@
import java.util.Optional;

import com.google.common.io.Files;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PivotHeaderStorage {

/**
* Supports persisting fast sync state to disk to enable resuming after a restart.
*
* <p>Note that a {@link FastSyncState} with a block number selected but no pivot block header is
* not stored. If we haven't yet retrieved and confirmed the actual block header we can't have
* started downloading data so should pick a new pivot block when resuming. Once we have the pivot
* block header we want to continue with that pivot block so the world state downloaded matches up.
*/
public class FastSyncStateStorage {
private static final Logger LOG = LogManager.getLogger();
private static final String PIVOT_BLOCK_HEADER_FILENAME = "pivotBlockHeader.rlp";
private final File pivotBlockHeaderFile;

public PivotHeaderStorage(final Path fastSyncDataDir) {
public FastSyncStateStorage(final Path fastSyncDataDir) {
pivotBlockHeaderFile = fastSyncDataDir.resolve(PIVOT_BLOCK_HEADER_FILENAME).toFile();
}

public boolean isFastSyncInProgress() {
return pivotBlockHeaderFile.isFile();
}

public Optional<BlockHeader> loadPivotBlockHeader(final BlockHashFunction blockHashFunction) {
public FastSyncState loadState(final BlockHashFunction blockHashFunction) {
try {
if (!isFastSyncInProgress()) {
return Optional.empty();
return FastSyncState.EMPTY_SYNC_STATE;
}
final BytesValue rlp = BytesValue.wrap(Files.toByteArray(pivotBlockHeaderFile));
return Optional.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like this needs to be wrapped in an Optional now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

BlockHeader.readFrom(new BytesValueRLPInput(rlp, false), blockHashFunction));
BlockHeader.readFrom(new BytesValueRLPInput(rlp, false), blockHashFunction))
.map(FastSyncState::new)
.orElse(FastSyncState.EMPTY_SYNC_STATE);
} catch (final IOException e) {
throw new IllegalStateException(
"Unable to read fast sync status file: " + pivotBlockHeaderFile.getAbsolutePath());
}
}

public void storePivotBlockHeader(final BlockHeader pivotBlockHeader) {
public void storeState(final FastSyncState state) {
if (!state.hasPivotBlockHeader()) {
if (!pivotBlockHeaderFile.delete() && pivotBlockHeaderFile.exists()) {
LOG.error(
"Unable to delete fast sync status file: " + pivotBlockHeaderFile.getAbsolutePath());
}
return;
}
try {
final BytesValueRLPOutput output = new BytesValueRLPOutput();
pivotBlockHeader.writeTo(output);
state.getPivotBlockHeader().get().writeTo(output);
Files.write(output.encoded().getArrayUnsafe(), pivotBlockHeaderFile);
} catch (final IOException e) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -117,7 +115,7 @@ private void countHeader(final BlockHeader header, final int confirmationsRequir
if (confirmations >= confirmationsRequired) {
LOG.info(
"Confirmed pivot block hash {} with {} confirmations", header.getHash(), confirmations);
result.complete(new FastSyncState(OptionalLong.of(header.getNumber()), Optional.of(header)));
result.complete(new FastSyncState(header));
getHeaderTasks.forEach(RetryingGetHeaderFromPeerByNumberTask::cancel);
}
}
Expand Down
Loading