Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Remove SyncState from SyncTargetManager #1188

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ private void executeDownload() {
// Find target, pull checkpoint headers, import, repeat
currentTask =
waitForPeers()
.thenCompose(r -> syncTargetManager.findSyncTarget())
.thenCompose(r -> syncTargetManager.findSyncTarget(syncState.syncTarget()))
.thenApply(this::updateSyncState)
.thenCompose(this::pullCheckpointHeaders)
.thenCompose(this::importBlocks)
.thenCompose(r -> checkSyncTarget())
Expand Down Expand Up @@ -128,6 +129,20 @@ private void executeDownload() {
});
}

private SyncTarget updateSyncState(final SyncTarget newTarget) {
if (isSameAsCurrentTarget(newTarget)) {
return syncState.syncTarget().get();
}
return syncState.setSyncTarget(newTarget.peer(), newTarget.commonAncestor());
}

private Boolean isSameAsCurrentTarget(final SyncTarget newTarget) {
return syncState
.syncTarget()
.map(currentTarget -> currentTarget.equals(newTarget))
.orElse(false);
}

private CompletableFuture<List<BlockHeader>> pullCheckpointHeaders(final SyncTarget syncTarget) {
return syncTargetManager.isSyncTargetDisconnected()
? CompletableFuture.completedFuture(emptyList())
Expand All @@ -151,7 +166,7 @@ private CompletableFuture<Void> checkSyncTarget() {
clearSyncTarget(syncTarget);
return CompletableFuture.completedFuture(null);
}
if (finishedSyncingToCurrentTarget()) {
if (finishedSyncingToCurrentTarget(syncTarget)) {
LOG.info("Finished syncing to target: {}.", syncTarget);
clearSyncTarget(syncTarget);
// Wait a bit before checking for a new sync target
Expand All @@ -164,8 +179,8 @@ private CompletableFuture<Void> checkSyncTarget() {
return CompletableFuture.completedFuture(null);
}

private boolean finishedSyncingToCurrentTarget() {
return !syncTargetManager.syncTargetCanProvideMoreBlocks()
private boolean finishedSyncingToCurrentTarget(final SyncTarget syncTarget) {
return !syncTargetManager.syncTargetCanProvideMoreBlocks(syncTarget)
|| checkpointHeaderManager.checkpointsHaveTimedOut()
|| chainSegmentsHaveTimedOut();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DetermineCommonAncestorTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand All @@ -35,8 +34,6 @@ public abstract class SyncTargetManager<C> {

private static final Logger LOG = LogManager.getLogger();

protected final SyncState syncState;

private volatile long syncTargetDisconnectListenerId;
private volatile boolean syncTargetDisconnected = false;
private final SynchronizerConfiguration config;
Expand All @@ -50,19 +47,17 @@ public SyncTargetManager(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.syncState = syncState;
this.metricsSystem = metricsSystem;
}

public CompletableFuture<SyncTarget> findSyncTarget() {
return syncState
.syncTarget()
public CompletableFuture<SyncTarget> findSyncTarget(
final Optional<SyncTarget> currentSyncTarget) {
return currentSyncTarget
.map(CompletableFuture::completedFuture) // Return an existing sync target if present
.orElseGet(this::selectNewSyncTarget);
}
Expand Down Expand Up @@ -93,7 +88,7 @@ private CompletableFuture<SyncTarget> selectNewSyncTarget() {
if (target == null) {
return waitForPeerAndThenSetSyncTarget();
}
final SyncTarget syncTarget = syncState.setSyncTarget(bestPeer, target);
final SyncTarget syncTarget = new SyncTarget(bestPeer, target);
LOG.info(
"Found common ancestor with peer {} at block {}",
bestPeer,
Expand All @@ -120,7 +115,7 @@ protected Optional<SyncTarget> finalizeSelectedSyncTarget(final SyncTarget syncT
protected abstract CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget();

private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> selectNewSyncTarget());
}

private CompletableFuture<?> waitForNewPeer() {
Expand Down Expand Up @@ -149,12 +144,8 @@ public void clearSyncTarget(final SyncTarget syncTarget) {

public abstract boolean isSyncTargetReached(final EthPeer peer);

public boolean syncTargetCanProvideMoreBlocks() {
if (!syncState.syncTarget().isPresent()) {
LOG.warn("SyncTarget should be set, but is not.");
return false;
}
final EthPeer currentSyncingPeer = syncState.syncTarget().get().peer();
public boolean syncTargetCanProvideMoreBlocks(final SyncTarget syncTarget) {
final EthPeer currentSyncingPeer = syncTarget.peer();
return !isSyncTargetDisconnected() && !isSyncTargetReached(currentSyncingPeer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class FastSyncChainDownloader<C> {
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
pivotBlockHeader),
new FastSyncCheckpointHeaderManager<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.PivotBlockRetriever.MAX_PIVOT_BLOCK_RETRIES;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.SyncTargetManager;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByNumberTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand Down Expand Up @@ -49,10 +49,9 @@ public FastSyncTargetManager(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem,
final BlockHeader pivotBlockHeader) {
super(config, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
super(config, protocolSchedule, protocolContext, ethContext, metricsSystem);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
Expand Down Expand Up @@ -121,6 +120,7 @@ public boolean shouldContinueDownloading() {

@Override
public boolean isSyncTargetReached(final EthPeer peer) {
return syncState.chainHeadNumber() >= pivotBlockHeader.getNumber();
final MutableBlockchain blockchain = protocolContext.getBlockchain();
return blockchain.getChainHeadBlockNumber() >= pivotBlockHeader.getNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public FullSyncDownloader(
ethContext,
syncState,
new FullSyncTargetManager<>(
config, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem),
config, protocolSchedule, protocolContext, ethContext, metricsSystem),
new CheckpointHeaderManager<>(
config, protocolContext, ethContext, syncState, protocolSchedule, metricsSystem),
this::importBlocksForCheckpoints,
Expand Down Expand Up @@ -111,6 +111,8 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
public TrailingPeerRequirements calculateTrailingPeerRequirements() {
return syncState.isInSync()
? TrailingPeerRequirements.UNRESTRICTED
: new TrailingPeerRequirements(syncState.chainHeadNumber(), config.getMaxTrailingPeers());
: new TrailingPeerRequirements(
protocolContext.getBlockchain().getChainHeadBlockNumber(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're loosening the requirements for trailing peers during full sync? It does look like we were being too strict ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not this just removes the indirection when getting the current blockchain head number - previously it went through SyncState which then ran protocolContext.getBlockchain().getChainHeadBlockNumber() so it's just been inlined.

The trailing peer limit during a full sync is set up so that a trailing peer is anyone behind us and at most 25% of our peers are allowed to be trailing. The main aim is to be connected to as many peers who can give us the data we need to sync as possible as few peers who will be asking us for data (which slows us down).

config.getMaxTrailingPeers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
import static java.util.concurrent.CompletableFuture.completedFuture;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.sync.SyncTargetManager;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
Expand All @@ -47,9 +47,8 @@ class FullSyncTargetManager<C> extends SyncTargetManager<C> {
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
super(config, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
super(config, protocolSchedule, protocolContext, ethContext, metricsSystem);
this.config = config;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
Expand Down Expand Up @@ -93,9 +92,10 @@ protected CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget() {
public boolean isSyncTargetReached(final EthPeer peer) {
final long peerHeight = peer.chainState().getEstimatedHeight();
final UInt256 peerTd = peer.chainState().getBestBlock().getTotalDifficulty();
final MutableBlockchain blockchain = protocolContext.getBlockchain();

return peerTd.compareTo(syncState.chainHeadTotalDifficulty()) <= 0
&& peerHeight <= syncState.chainHeadNumber();
return peerTd.compareTo(blockchain.getChainHead().getTotalDifficulty()) <= 0
&& peerHeight <= blockchain.getChainHeadBlockNumber();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.Optional;

Expand All @@ -38,7 +37,7 @@ public class SyncState {
public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
this.blockchain = blockchain;
this.ethPeers = ethPeers;
this.startingBlock = chainHeadNumber();
this.startingBlock = this.blockchain.getChainHeadBlockNumber();
blockchain.observeBlockAdded(
(event, chain) -> {
if (event.isNewCanonicalHead()) {
Expand All @@ -62,21 +61,15 @@ public void addSyncStatusListener(final SyncStatusListener observer) {
}

public SyncStatus syncStatus() {
return new SyncStatus(startingBlock(), chainHeadNumber(), bestChainHeight());
final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber();
return new SyncStatus(
startingBlock(), chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber));
}

public long startingBlock() {
return startingBlock;
}

public long chainHeadNumber() {
return blockchain.getChainHeadBlockNumber();
}

public UInt256 chainHeadTotalDifficulty() {
return blockchain.getChainHead().getTotalDifficulty();
}

public Optional<SyncTarget> syncTarget() {
return syncTarget;
}
Expand Down Expand Up @@ -118,11 +111,6 @@ private void addEstimatedHeightListener(final SyncTarget target) {
target.addPeerChainEstimatedHeightListener(estimatedHeight -> checkInSync());
}

public long bestChainHeight() {
final long localChainHeight = blockchain.getChainHeadBlockNumber();
return bestChainHeight(localChainHeight);
}

public long bestChainHeight(final long localChainHeight) {
return Math.max(
localChainHeight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ public void switchesSyncTarget_betterTd() {
peerB
.getEthPeer()
.chainState()
.updateForAnnouncedBlock(gen.header(), syncState.chainHeadTotalDifficulty().plus(300));
.updateForAnnouncedBlock(
gen.header(), localBlockchain.getChainHead().getTotalDifficulty().plus(300));

// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
Expand Down Expand Up @@ -426,11 +427,13 @@ public void doesNotSwitchSyncTarget_betterTdUnderThreshold() {
bestPeer
.getEthPeer()
.chainState()
.updateForAnnouncedBlock(gen.header(1000), syncState.chainHeadTotalDifficulty().plus(201));
.updateForAnnouncedBlock(
gen.header(1000), localBlockchain.getChainHead().getTotalDifficulty().plus(201));
otherPeer
.getEthPeer()
.chainState()
.updateForAnnouncedBlock(gen.header(1000), syncState.chainHeadTotalDifficulty().plus(300));
.updateForAnnouncedBlock(
gen.header(1000), localBlockchain.getChainHead().getTotalDifficulty().plus(300));

// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.junit.Before;
Expand Down Expand Up @@ -64,8 +64,6 @@ public void setup() {
EthProtocolManagerTestUtil.create(
localBlockchain, localWorldState, new EthScheduler(1, 1, 1, new NoOpMetricsSystem()));
final EthContext ethContext = ethProtocolManager.ethContext();
final SyncState syncState =
new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers());
localBlockchainSetup.importFirstBlocks(5);
otherBlockchainSetup.importFirstBlocks(20);
syncTargetManager =
Expand All @@ -74,7 +72,6 @@ public void setup() {
protocolSchedule,
protocolContext,
ethContext,
syncState,
new NoOpMetricsSystem());
}

Expand All @@ -85,7 +82,7 @@ public void shouldDisconnectPeerIfWorldStateIsUnavailableForCommonAncestor() {
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 20);

final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget();
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget(Optional.empty());

bestPeer.respond(responder);

Expand All @@ -100,7 +97,7 @@ public void shouldAllowSyncTargetWhenIfWorldStateIsAvailableForCommonAncestor()
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 20);

final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget();
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget(Optional.empty());

bestPeer.respond(responder);

Expand Down