Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
Browse files Browse the repository at this point in the history
…-download-fetch-data
  • Loading branch information
ajsutton committed Apr 4, 2019
2 parents bd32bb8 + b458a94 commit 10a6560
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
Expand All @@ -21,48 +25,72 @@
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

public class CheckpointHeaderFetcher {

private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule<?> protocolSchedule;
private final EthContext ethContext;
private final UnaryOperator<List<BlockHeader>> checkpointFilter;
private final Optional<BlockHeader> lastCheckpointHeader;
private final MetricsSystem metricsSystem;

public CheckpointHeaderFetcher(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final UnaryOperator<List<BlockHeader>> checkpointFilter,
final Optional<BlockHeader> lastCheckpointHeader,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.checkpointFilter = checkpointFilter;
this.lastCheckpointHeader = lastCheckpointHeader;
this.metricsSystem = metricsSystem;
}

public CompletableFuture<List<BlockHeader>> getNextCheckpointHeaders(
final EthPeer peer, final BlockHeader lastHeader) {
final int skip = syncConfig.downloaderChainSegmentSize() - 1;
final int additionalHeaderCount = syncConfig.downloaderHeaderRequestSize();
final int maximumHeaderRequestSize = syncConfig.downloaderHeaderRequestSize();

final int additionalHeaderCount;
if (lastCheckpointHeader.isPresent()) {
final BlockHeader targetHeader = lastCheckpointHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - lastHeader.getNumber();
if (blocksUntilTarget <= 0) {
return completedFuture(emptyList());
}
final long maxHeadersToRequest = blocksUntilTarget / (skip + 1);
additionalHeaderCount = (int) Math.min(maxHeadersToRequest, maximumHeaderRequestSize);
if (additionalHeaderCount == 0) {
return completedFuture(singletonList(targetHeader));
}
} else {
additionalHeaderCount = maximumHeaderRequestSize;
}

return requestHeaders(peer, lastHeader, additionalHeaderCount, skip);
}

private CompletableFuture<List<BlockHeader>> requestHeaders(
final EthPeer peer,
final BlockHeader referenceHeader,
final int headerCount,
final int skip) {
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
lastHeader.getHash(),
lastHeader.getNumber(),
referenceHeader.getHash(),
referenceHeader.getNumber(),
// + 1 because lastHeader will be returned as well.
additionalHeaderCount + 1,
headerCount + 1,
skip,
metricsSystem)
.assignPeer(peer)
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(
headers -> checkpointFilter.apply(stripExistingCheckpointHeader(lastHeader, headers)));
.thenApply(headers -> stripExistingCheckpointHeader(referenceHeader, headers));
}

private List<BlockHeader> stripExistingCheckpointHeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tech.pegasys.pantheon.services.pipeline.PipelineBuilder;

import java.time.Duration;
import java.util.Optional;

public class FastSyncDownloadPipelineFactory<C> implements DownloadPipelineFactory {
private final SynchronizerConfiguration syncConfig;
Expand Down Expand Up @@ -93,7 +94,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target)
syncConfig,
protocolSchedule,
ethContext,
new FastSyncCheckpointFilter(pivotBlockHeader),
Optional.of(pivotBlockHeader),
metricsSystem),
this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
Expand All @@ -31,14 +31,13 @@
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -47,9 +46,9 @@ public class CheckpointHeaderFetcherTest {
private static ProtocolSchedule<Void> protocolSchedule;
private static ProtocolContext<Void> protocolContext;
private static final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Mock private UnaryOperator<List<BlockHeader>> filter;
private EthProtocolManager ethProtocolManager;
private CheckpointHeaderFetcher checkpointHeaderFetcher;
private Responder responder;
private RespondingEthPeer respondingPeer;

@BeforeClass
public static void setUpClass() {
Expand All @@ -65,26 +64,15 @@ public void setUpTest() {
ethProtocolManager =
EthProtocolManagerTestUtil.create(
blockchain, protocolContext.getWorldStateArchive(), () -> false);
final EthContext ethContext = ethProtocolManager.ethContext();
checkpointHeaderFetcher =
new CheckpointHeaderFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,
ethContext,
filter,
metricsSystem);
responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
}

@Test
public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
when(filter.apply(any())).thenAnswer(invocation -> invocation.getArgument(0));
final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.empty());

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
Expand All @@ -97,23 +85,74 @@ public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
}

@Test
public void shouldApplyFilterToDownloadedCheckpoints() {
final List<BlockHeader> filteredResult = asList(header(7), header(9));
final List<BlockHeader> unfilteredResult = asList(header(6), header(11), header(16));
when(filter.apply(unfilteredResult)).thenReturn(filteredResult);
final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(11)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));

assertThat(result).isNotDone();
respondingPeer.respond(responder);

assertThat(result).isCompletedWithValue(asList(header(6), header(11)));
}

@Test
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));

respondingPeer.respond(responder);

assertThat(result).isCompletedWithValue(filteredResult);
assertThat(result).isCompletedWithValue(asList(header(6), header(11)));
}

@Test
public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheCheckpointBeforeTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(11));

assertThat(result).isCompletedWithValue(singletonList(header(15)));
}

@Test
public void shouldReturnEmptyListWhenLastHeaderIsTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(15));
assertThat(result).isCompletedWithValue(emptyList());
}

@Test
public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(16));
assertThat(result).isCompletedWithValue(emptyList());
}

private CheckpointHeaderFetcher createCheckpointHeaderFetcher(
final Optional<BlockHeader> targetHeader) {
final EthContext ethContext = ethProtocolManager.ethContext();
return new CheckpointHeaderFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,
ethContext,
targetHeader,
metricsSystem);
}

private BlockHeader header(final long blockNumber) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public void finalize(final WritePipe<O> outputPipe) {
}
}

@Override
public void abort() {
inProgress.forEach(future -> future.cancel(true));
}

private void outputNextCompletedTask(final WritePipe<O> outputPipe) {
try {
waitForAnyFutureToComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public boolean hasMore() {
return input.hasMore();
}

@Override
public boolean isAborted() {
return input.isAborted();
}

@Override
public List<T> get() {
final T firstItem = input.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public boolean isOpen() {
return !closed.get() && !aborted.get();
}

@Override
public boolean isAborted() {
return aborted.get();
}

@Override
public boolean hasRemainingCapacity() {
return queue.remainingCapacity() > 0 && isOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public void run() {
while (inputPipe.hasMore()) {
processor.processNextInput(inputPipe, outputPipe);
}
if (inputPipe.isAborted()) {
processor.abort();
}
processor.finalize(outputPipe);
outputPipe.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ interface Processor<I, O> {
void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe);

default void finalize(final WritePipe<O> outputPipe) {}

default void abort() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public interface ReadPipe<T> {
*/
boolean hasMore();

/**
* Determines if this pipeline this pipe is a part of has been aborted.
*
* @return true if the pipeline has been aborted, otherwise false.
*/
boolean isAborted();

/**
* Get and remove the next item from this pipe. This method will block until the next item is
* available but may still return <code>null</code> if the pipe is closed or the thread
Expand Down
Loading

0 comments on commit 10a6560

Please sign in to comment.