Skip to content

Commit

Permalink
[PIE-1784] Select the pivot block from a minimal peer set (PegaSysEng…
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaxter authored Jul 17, 2019
1 parent 7c24b9e commit dff81a5
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public Optional<EthPeer> bestPeer() {
return streamAvailablePeers().max(BEST_CHAIN);
}

public Optional<EthPeer> bestPeerWithHeightEstimate() {
return streamAvailablePeers().filter(p -> p.chainState().hasEstimatedHeight()).max(BEST_CHAIN);
}

@FunctionalInterface
public interface ConnectCallback {
void onPeerConnected(EthPeer newPeer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,8 @@ public CompletableFuture<FastSyncState> waitForSuitablePeers(final FastSyncState
return waitForAnyPeer().thenApply(ignore -> fastSyncState);
}

final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(
ethContext, syncConfig.getFastSyncMinimumPeerCount(), metricsSystem);

LOG.debug("Waiting for at least {} peers.", syncConfig.getFastSyncMinimumPeerCount());
return ethContext
.getScheduler()
.scheduleServiceTask(waitForPeersTask)
return waitForPeers(syncConfig.getFastSyncMinimumPeerCount())
.thenApply(successfulWaitResult -> fastSyncState);
}

Expand All @@ -104,6 +98,12 @@ private CompletableFuture<Void> waitForAnyPeer() {
});
}

private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}

public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fastSyncState) {
return fastSyncState.hasPivotBlockHeader()
? completedFuture(fastSyncState)
Expand All @@ -113,14 +113,28 @@ public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fas
private CompletableFuture<FastSyncState> selectPivotBlockFromPeers() {
return ethContext
.getEthPeers()
.bestPeer()
.filter(peer -> peer.chainState().hasEstimatedHeight())
.bestPeerWithHeightEstimate()
// Only select a pivot block number when we have a minimum number of height estimates
.filter(
peer -> {
final long peerCount = countPeersWithEstimatedHeight();
final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount();
if (peerCount < minPeerCount) {
LOG.info(
"Waiting for peers with chain height information. {} / {} required peers currently available.",
peerCount,
minPeerCount);
return false;
}
return true;
})
.map(
peer -> {
final long pivotBlockNumber =
peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance();
if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) {
// Peer's chain isn't long enough, return an empty value so we can try again.
LOG.info("Waiting for peer with sufficient chain height");
return null;
}
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
Expand All @@ -131,12 +145,21 @@ private CompletableFuture<FastSyncState> selectPivotBlockFromPeers() {
.orElseGet(this::retrySelectPivotBlockAfterDelay);
}

private long countPeersWithEstimatedHeight() {
return ethContext
.getEthPeers()
.streamAvailablePeers()
.filter(peer -> peer.chainState().hasEstimatedHeight())
.count();
}

private CompletableFuture<FastSyncState> retrySelectPivotBlockAfterDelay() {
LOG.info("Waiting for peer with sufficient chain height");
return ethContext
.getScheduler()
.scheduleFutureTask(
() -> waitForAnyPeer().thenCompose(ignore -> selectPivotBlockFromPeers()),
() ->
waitForPeers(syncConfig.getFastSyncMinimumPeerCount())
.thenCompose(ignore -> selectPivotBlockFromPeers()),
Duration.ofSeconds(1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CancellationException;
import java.util.function.Consumer;

Expand Down Expand Up @@ -67,14 +68,22 @@ public void comparesPeersWithHeightAndTd() {
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerA)).isGreaterThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);

assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerB);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
.contains(peerB);
}

@Test
public void comparesPeersWithTdAndNoHeight() {
final EthPeer peerA =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, UInt256.of(100), 0).getEthPeer();
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, UInt256.of(100), OptionalLong.empty())
.getEthPeer();
final EthPeer peerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, UInt256.of(50), 0).getEthPeer();
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, UInt256.of(50), OptionalLong.empty())
.getEthPeer();

// Sanity check
assertThat(peerA.chainState().getEstimatedHeight()).isEqualTo(0);
Expand All @@ -87,6 +96,10 @@ public void comparesPeersWithTdAndNoHeight() {
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerA)).isLessThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);

assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerA);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
.isEmpty();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.OptionalLong;

public class EthProtocolManagerTestUtil {

public static EthProtocolManager create(
Expand Down Expand Up @@ -137,6 +139,13 @@ public static RespondingEthPeer createPeer(
return RespondingEthPeer.create(ethProtocolManager, td, estimatedHeight);
}

public static RespondingEthPeer createPeer(
final EthProtocolManager ethProtocolManager,
final UInt256 td,
final OptionalLong estimatedHeight) {
return RespondingEthPeer.create(ethProtocolManager, td, estimatedHeight);
}

public static RespondingEthPeer createPeer(final EthProtocolManager ethProtocolManager) {
return RespondingEthPeer.create(ethProtocolManager, UInt256.of(1000L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -110,6 +111,14 @@ public static RespondingEthPeer create(
return create(ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight);
}

public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final UInt256 totalDifficulty,
final OptionalLong estimatedHeight) {
final Hash chainHeadHash = gen.hash();
return create(ethProtocolManager, chainHeadHash, totalDifficulty, estimatedHeight);
}

public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final Hash chainHeadHash,
Expand All @@ -122,6 +131,15 @@ public static RespondingEthPeer create(
final Hash chainHeadHash,
final UInt256 totalDifficulty,
final long estimatedHeight) {
return create(
ethProtocolManager, chainHeadHash, totalDifficulty, OptionalLong.of(estimatedHeight));
}

public static RespondingEthPeer create(
final EthProtocolManager ethProtocolManager,
final Hash chainHeadHash,
final UInt256 totalDifficulty,
final OptionalLong estimatedHeight) {
final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers();

final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
Expand All @@ -132,7 +150,7 @@ public static RespondingEthPeer create(
ethPeers.registerConnection(peerConnection);
final EthPeer peer = ethPeers.peer(peerConnection);
peer.registerStatusReceived(chainHeadHash, totalDifficulty);
peer.chainState().update(chainHeadHash, estimatedHeight);
estimatedHeight.ifPresent(height -> peer.chainState().update(chainHeadHash, height));
peer.registerStatusSent();

return new RespondingEthPeer(ethProtocolManager, peerConnection, peer, outgoingMessages);
Expand Down
Loading

0 comments on commit dff81a5

Please sign in to comment.