Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Pipeline chain download - fetch and import data #1207

Merged
merged 18 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;

import java.util.function.Function;
import java.util.stream.Stream;

public class CheckpointHeaderValidationStep<C>
implements Function<CheckpointRangeHeaders, Stream<BlockHeader>> {

private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final ValidationPolicy validationPolicy;

public CheckpointHeaderValidationStep(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final ValidationPolicy validationPolicy) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.validationPolicy = validationPolicy;
}

@Override
public Stream<BlockHeader> apply(final CheckpointRangeHeaders checkpointRangeHeaders) {
final BlockHeader expectedParent = checkpointRangeHeaders.getCheckpointRange().getStart();
final BlockHeader firstHeaderToImport = checkpointRangeHeaders.getFirstHeaderToImport();

if (isValid(expectedParent, firstHeaderToImport)) {
return checkpointRangeHeaders.getHeadersToImport().stream();
} else {
throw new InvalidBlockException(
"Provided first header does not connect to last header.",
expectedParent.getNumber(),
expectedParent.getHash());
}
}

private boolean isValid(final BlockHeader expectedParent, final BlockHeader firstHeaderToImport) {
final BlockHeaderValidator<C> validator =
protocolSchedule
.getByBlockNumber(firstHeaderToImport.getNumber())
.getBlockHeaderValidator();
return validator.validateHeader(
firstHeaderToImport,
expectedParent,
protocolContext,
validationPolicy.getValidationModeForNextBlock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.lang.Math.toIntExact;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.util.Objects;
Expand Down Expand Up @@ -59,4 +61,8 @@ public String toString() {
.add("end", end.getNumber())
.toString();
}

public int getSegmentLength() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) If you change this to getSegmentLengthExclusive you can avoid the - 1 below: https://github.com/PegaSysEng/pantheon/pull/1207/files#diff-e42d52272bdde3b38192060ce222bebbR66

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tempting but it's only exclusive of one end, not both so could wind up being confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you -1, its exclusive of both ends :) [0,1] : 1 - 0 - 1 = 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️ I shouldn't try to do math so early in the morning...

return toIntExact(end.getNumber() - start.getNumber());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static com.google.common.base.Preconditions.checkArgument;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.util.List;
import java.util.Objects;

import com.google.common.base.MoreObjects;

public class CheckpointRangeHeaders {
private final CheckpointRange checkpointRange;
private final List<BlockHeader> headersToImport;

public CheckpointRangeHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headersToImport) {
checkArgument(!headersToImport.isEmpty(), "Must have at least one header to import");
this.checkpointRange = checkpointRange;
this.headersToImport = headersToImport;
}

public CheckpointRange getCheckpointRange() {
return checkpointRange;
}

public List<BlockHeader> getHeadersToImport() {
return headersToImport;
}

public BlockHeader getFirstHeaderToImport() {
return headersToImport.get(0);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final CheckpointRangeHeaders that = (CheckpointRangeHeaders) o;
return Objects.equals(checkpointRange, that.checkpointRange)
&& Objects.equals(headersToImport, that.headersToImport);
}

@Override
public int hashCode() {
return Objects.hash(checkpointRange, headersToImport);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("checkpointRange", checkpointRange)
.add("headersToImport", headersToImport)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class DownloadBodiesStep<C>
implements Function<List<BlockHeader>, CompletableFuture<List<Block>>> {

private final ProtocolSchedule<C> protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;

public DownloadBodiesStep(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
}

@Override
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DownloadHeaderSequenceTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.FutureUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class DownloadHeadersStep<C>
implements Function<CheckpointRange, CompletableFuture<CheckpointRangeHeaders>> {

private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final ValidationPolicy validationPolicy;
private final MetricsSystem metricsSystem;

public DownloadHeadersStep(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final ValidationPolicy validationPolicy,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.validationPolicy = validationPolicy;
this.metricsSystem = metricsSystem;
}

@Override
public CompletableFuture<CheckpointRangeHeaders> apply(final CheckpointRange checkpointRange) {
final CompletableFuture<List<BlockHeader>> taskFuture = downloadHeaders(checkpointRange);
final CompletableFuture<CheckpointRangeHeaders> processedFuture =
taskFuture.thenApply(headers -> processHeaders(checkpointRange, headers));
FutureUtils.propagateCancellation(processedFuture, taskFuture);
return processedFuture;
}

private CompletableFuture<List<BlockHeader>> downloadHeaders(
final CheckpointRange checkpointRange) {
return DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule,
protocolContext,
ethContext,
checkpointRange.getEnd(),
// -1 because we don't want to request the range starting header
checkpointRange.getSegmentLength() - 1,
validationPolicy,
metricsSystem)
.run();
}

private CheckpointRangeHeaders processHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headers) {
final List<BlockHeader> headersToImport = new ArrayList<>(headers);
headersToImport.add(checkpointRange.getEnd());
return new CheckpointRangeHeaders(checkpointRange, headersToImport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;

import java.util.Optional;
Expand All @@ -37,15 +41,27 @@ public class PipelineChainDownloader<C> implements ChainDownloader {

private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final Counter pipelineCompleteCounter;
private final Counter pipelineErrorCounter;
private Pipeline<?> currentDownloadPipeline;

public PipelineChainDownloader(
final SyncTargetManager<C> syncTargetManager,
final DownloadPipelineFactory downloadPipelineFactory,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final MetricsSystem metricsSystem) {
this.syncTargetManager = syncTargetManager;
this.downloadPipelineFactory = downloadPipelineFactory;
this.scheduler = scheduler;

final LabelledMetric<Counter> labelledCounter =
metricsSystem.createLabelledCounter(
MetricCategory.SYNCHRONIZER,
"chain_download_pipeline_restarts",
"Number of times the chain download pipeline has been restarted",
"reason");
pipelineCompleteCounter = labelledCounter.labels("complete");
pipelineErrorCounter = labelledCounter.labels("error");
}

@Override
Expand All @@ -72,7 +88,8 @@ private CompletableFuture<Void> performDownload() {
private CompletableFuture<Void> selectSyncTargetAndDownload() {
return syncTargetManager
.findSyncTarget(Optional.empty())
.thenCompose(this::startDownloadForSyncTarget);
.thenCompose(this::startDownloadForSyncTarget)
.thenRun(pipelineCompleteCounter::inc);
}

private CompletionStage<Void> repeatUnlessDownloadComplete(
Expand All @@ -87,6 +104,7 @@ private CompletionStage<Void> repeatUnlessDownloadComplete(

private CompletionStage<Void> handleFailedDownload(final Throwable error) {
LOG.debug("Chain download failed. Will restart if required.", error);
pipelineErrorCounter.inc();
if (!cancelled.get() && syncTargetManager.shouldContinueDownloading()) {
// Drop the error, allowing the normal looping logic to retry.
return completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;

import java.util.List;
import java.util.Objects;

import com.google.common.base.MoreObjects;

class BlockWithReceipts {
private final Block block;
Expand All @@ -38,4 +42,37 @@ public Block getBlock() {
public List<TransactionReceipt> getReceipts() {
return receipts;
}

public long getNumber() {
return block.getHeader().getNumber();
}

public Hash getHash() {
return block.getHash();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final BlockWithReceipts that = (BlockWithReceipts) o;
return Objects.equals(block, that.block) && Objects.equals(receipts, that.receipts);
}

@Override
public int hashCode() {
return Objects.hash(block, receipts);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("block", block)
.add("receipts", receipts)
.toString();
}
}
Loading