This repository has been archived by the owner on Sep 26, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 130
Pipeline chain download - fetch and import data #1207
Merged
ajsutton
merged 18 commits into
PegaSysEng:master
from
ajsutton:pipeline-download-fetch-data
Apr 4, 2019
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
89c41ec
Implement downloading of checkpoint headers.
ajsutton 86cd5f2
Add place holders for remaining download steps.
ajsutton 3d76da1
Ensure cancellation is propagated up to the actual eth task.
ajsutton a6c4db5
Add join-up validation step.
ajsutton 764e92d
Download bodies.
ajsutton 87f21eb
Download receipts.
ajsutton 737ee21
Import blocks.
ajsutton 75ed490
Make import block step a consumer instead of function.
ajsutton 8a4c20c
Add counters for number of times chain download completes or errors out.
ajsutton 76e6e01
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton 3af385d
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton 4190fe3
Use sensible buffer sizes.
ajsutton d3ac761
Fix comment.
ajsutton 0658ffc
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton bd32bb8
Review feedback
ajsutton 10a6560
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton f11ff69
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton 12df702
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
66 changes: 66 additions & 0 deletions
66
...src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Copyright 2019 ConsenSys AG. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
package tech.pegasys.pantheon.ethereum.eth.sync; | ||
|
||
import tech.pegasys.pantheon.ethereum.ProtocolContext; | ||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; | ||
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; | ||
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; | ||
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; | ||
|
||
import java.util.function.Function; | ||
import java.util.stream.Stream; | ||
|
||
public class CheckpointHeaderValidationStep<C> | ||
implements Function<CheckpointRangeHeaders, Stream<BlockHeader>> { | ||
|
||
private final ProtocolSchedule<C> protocolSchedule; | ||
private final ProtocolContext<C> protocolContext; | ||
private final ValidationPolicy validationPolicy; | ||
|
||
public CheckpointHeaderValidationStep( | ||
final ProtocolSchedule<C> protocolSchedule, | ||
final ProtocolContext<C> protocolContext, | ||
final ValidationPolicy validationPolicy) { | ||
this.protocolSchedule = protocolSchedule; | ||
this.protocolContext = protocolContext; | ||
this.validationPolicy = validationPolicy; | ||
} | ||
|
||
@Override | ||
public Stream<BlockHeader> apply(final CheckpointRangeHeaders checkpointRangeHeaders) { | ||
final BlockHeader expectedParent = checkpointRangeHeaders.getCheckpointRange().getStart(); | ||
final BlockHeader firstHeaderToImport = checkpointRangeHeaders.getFirstHeaderToImport(); | ||
|
||
if (isValid(expectedParent, firstHeaderToImport)) { | ||
return checkpointRangeHeaders.getHeadersToImport().stream(); | ||
} else { | ||
throw new InvalidBlockException( | ||
"Provided first header does not connect to last header.", | ||
expectedParent.getNumber(), | ||
expectedParent.getHash()); | ||
} | ||
} | ||
|
||
private boolean isValid(final BlockHeader expectedParent, final BlockHeader firstHeaderToImport) { | ||
final BlockHeaderValidator<C> validator = | ||
protocolSchedule | ||
.getByBlockNumber(firstHeaderToImport.getNumber()) | ||
.getBlockHeaderValidator(); | ||
return validator.validateHeader( | ||
firstHeaderToImport, | ||
expectedParent, | ||
protocolContext, | ||
validationPolicy.getValidationModeForNextBlock()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
72 changes: 72 additions & 0 deletions
72
...eum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointRangeHeaders.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Copyright 2019 ConsenSys AG. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
package tech.pegasys.pantheon.ethereum.eth.sync; | ||
|
||
import static com.google.common.base.Preconditions.checkArgument; | ||
|
||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
import com.google.common.base.MoreObjects; | ||
|
||
public class CheckpointRangeHeaders { | ||
private final CheckpointRange checkpointRange; | ||
private final List<BlockHeader> headersToImport; | ||
|
||
public CheckpointRangeHeaders( | ||
final CheckpointRange checkpointRange, final List<BlockHeader> headersToImport) { | ||
checkArgument(!headersToImport.isEmpty(), "Must have at least one header to import"); | ||
this.checkpointRange = checkpointRange; | ||
this.headersToImport = headersToImport; | ||
} | ||
|
||
public CheckpointRange getCheckpointRange() { | ||
return checkpointRange; | ||
} | ||
|
||
public List<BlockHeader> getHeadersToImport() { | ||
return headersToImport; | ||
} | ||
|
||
public BlockHeader getFirstHeaderToImport() { | ||
return headersToImport.get(0); | ||
} | ||
|
||
@Override | ||
public boolean equals(final Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
final CheckpointRangeHeaders that = (CheckpointRangeHeaders) o; | ||
return Objects.equals(checkpointRange, that.checkpointRange) | ||
&& Objects.equals(headersToImport, that.headersToImport); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(checkpointRange, headersToImport); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return MoreObjects.toStringHelper(this) | ||
.add("checkpointRange", checkpointRange) | ||
.add("headersToImport", headersToImport) | ||
.toString(); | ||
} | ||
} |
47 changes: 47 additions & 0 deletions
47
ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloadBodiesStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Copyright 2019 ConsenSys AG. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
package tech.pegasys.pantheon.ethereum.eth.sync; | ||
|
||
import tech.pegasys.pantheon.ethereum.core.Block; | ||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; | ||
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; | ||
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask; | ||
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; | ||
import tech.pegasys.pantheon.metrics.MetricsSystem; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.function.Function; | ||
|
||
public class DownloadBodiesStep<C> | ||
implements Function<List<BlockHeader>, CompletableFuture<List<Block>>> { | ||
|
||
private final ProtocolSchedule<C> protocolSchedule; | ||
private final EthContext ethContext; | ||
private final MetricsSystem metricsSystem; | ||
|
||
public DownloadBodiesStep( | ||
final ProtocolSchedule<C> protocolSchedule, | ||
final EthContext ethContext, | ||
final MetricsSystem metricsSystem) { | ||
this.protocolSchedule = protocolSchedule; | ||
this.ethContext = ethContext; | ||
this.metricsSystem = metricsSystem; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) { | ||
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem) | ||
.run(); | ||
} | ||
} |
79 changes: 79 additions & 0 deletions
79
ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloadHeadersStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright 2019 ConsenSys AG. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
package tech.pegasys.pantheon.ethereum.eth.sync; | ||
|
||
import tech.pegasys.pantheon.ethereum.ProtocolContext; | ||
import tech.pegasys.pantheon.ethereum.core.BlockHeader; | ||
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; | ||
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DownloadHeaderSequenceTask; | ||
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; | ||
import tech.pegasys.pantheon.metrics.MetricsSystem; | ||
import tech.pegasys.pantheon.util.FutureUtils; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.function.Function; | ||
|
||
public class DownloadHeadersStep<C> | ||
implements Function<CheckpointRange, CompletableFuture<CheckpointRangeHeaders>> { | ||
|
||
private final ProtocolSchedule<C> protocolSchedule; | ||
private final ProtocolContext<C> protocolContext; | ||
private final EthContext ethContext; | ||
private final ValidationPolicy validationPolicy; | ||
private final MetricsSystem metricsSystem; | ||
|
||
public DownloadHeadersStep( | ||
final ProtocolSchedule<C> protocolSchedule, | ||
final ProtocolContext<C> protocolContext, | ||
final EthContext ethContext, | ||
final ValidationPolicy validationPolicy, | ||
final MetricsSystem metricsSystem) { | ||
this.protocolSchedule = protocolSchedule; | ||
this.protocolContext = protocolContext; | ||
this.ethContext = ethContext; | ||
this.validationPolicy = validationPolicy; | ||
this.metricsSystem = metricsSystem; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<CheckpointRangeHeaders> apply(final CheckpointRange checkpointRange) { | ||
final CompletableFuture<List<BlockHeader>> taskFuture = downloadHeaders(checkpointRange); | ||
final CompletableFuture<CheckpointRangeHeaders> processedFuture = | ||
taskFuture.thenApply(headers -> processHeaders(checkpointRange, headers)); | ||
FutureUtils.propagateCancellation(processedFuture, taskFuture); | ||
return processedFuture; | ||
} | ||
|
||
private CompletableFuture<List<BlockHeader>> downloadHeaders( | ||
final CheckpointRange checkpointRange) { | ||
return DownloadHeaderSequenceTask.endingAtHeader( | ||
protocolSchedule, | ||
protocolContext, | ||
ethContext, | ||
checkpointRange.getEnd(), | ||
// -1 because we don't want to request the range starting header | ||
checkpointRange.getSegmentLength() - 1, | ||
validationPolicy, | ||
metricsSystem) | ||
.run(); | ||
} | ||
|
||
private CheckpointRangeHeaders processHeaders( | ||
final CheckpointRange checkpointRange, final List<BlockHeader> headers) { | ||
final List<BlockHeader> headersToImport = new ArrayList<>(headers); | ||
headersToImport.add(checkpointRange.getEnd()); | ||
return new CheckpointRangeHeaders(checkpointRange, headersToImport); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(optional) If you change this to
getSegmentLengthExclusive
you can avoid the- 1
below: https://github.com/PegaSysEng/pantheon/pull/1207/files#diff-e42d52272bdde3b38192060ce222bebbR66There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tempting but it's only exclusive of one end, not both so could wind up being confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you -1, its exclusive of both ends :) [0,1] : 1 - 0 - 1 = 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♂️ I shouldn't try to do math so early in the morning...