From 275914b5a2c8d0c75f75096ce61c2186f7c8d554 Mon Sep 17 00:00:00 2001 From: Danno Ferrin Date: Wed, 27 Feb 2019 10:06:03 -0700 Subject: [PATCH] delete the legacy pipelined import chain segment task. (#1003) This has been replaced by the parallel import chain segment task. --- .../PipelinedImportChainSegmentTask.java | 300 ----------- .../PipelinedImportChainSegmentTaskTest.java | 490 ------------------ 2 files changed, 790 deletions(-) delete mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java delete mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java deleted file mode 100644 index 601af42714..0000000000 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Copyright 2018 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.ethereum.eth.sync.tasks; - -import tech.pegasys.pantheon.ethereum.ProtocolContext; -import tech.pegasys.pantheon.ethereum.core.BlockHeader; -import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask; -import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; -import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; -import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; -import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; -import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; -import tech.pegasys.pantheon.metrics.MetricsSystem; -import tech.pegasys.pantheon.util.ExceptionUtils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Deque; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.function.Supplier; - -import com.google.common.collect.Lists; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class PipelinedImportChainSegmentTask extends AbstractEthTask> { - private static final Logger LOG = LogManager.getLogger(); - - private final EthContext ethContext; - private final ProtocolContext protocolContext; - private final ProtocolSchedule protocolSchedule; - private final List importedBlocks = new ArrayList<>(); - - // First header is assumed to already be imported - private final List checkpointHeaders; - private final int chunksInTotal; - private final BlockHandler blockHandler; - private final ValidationPolicy validationPolicy; - private final MetricsSystem metricsSystem; - private int chunksIssued; - private int chunksCompleted; - private final int maxActiveChunks; - - private final Deque>> downloadAndValidateHeadersTasks = - new ConcurrentLinkedDeque<>(); - private final Deque>> downloadBodiesTasks = - new ConcurrentLinkedDeque<>(); - private final Deque>> validateAndImportBlocksTasks = - new ConcurrentLinkedDeque<>(); - - protected PipelinedImportChainSegmentTask( - final ProtocolSchedule protocolSchedule, - final ProtocolContext protocolContext, - final EthContext ethContext, - final int maxActiveChunks, - final List checkpointHeaders, - final MetricsSystem metricsSystem, - final BlockHandler blockHandler, - final ValidationPolicy validationPolicy) { - super(metricsSystem); - this.protocolSchedule = protocolSchedule; - this.protocolContext = protocolContext; - this.ethContext = ethContext; - this.checkpointHeaders = checkpointHeaders; - this.chunksInTotal = checkpointHeaders.size() - 1; - this.blockHandler = blockHandler; - this.validationPolicy = validationPolicy; - this.metricsSystem = metricsSystem; - this.chunksIssued = 0; - this.chunksCompleted = 0; - this.maxActiveChunks = maxActiveChunks; - } - - public static PipelinedImportChainSegmentTask forCheckpoints( - final ProtocolSchedule protocolSchedule, - final ProtocolContext protocolContext, - final EthContext ethContext, - final int maxActiveChunks, - final MetricsSystem metricsSystem, - final BlockHandler blockHandler, - final ValidationPolicy validationPolicy, - final BlockHeader... checkpointHeaders) { - return forCheckpoints( - protocolSchedule, - protocolContext, - ethContext, - maxActiveChunks, - metricsSystem, - blockHandler, - validationPolicy, - Arrays.asList(checkpointHeaders)); - } - - public static PipelinedImportChainSegmentTask forCheckpoints( - final ProtocolSchedule protocolSchedule, - final ProtocolContext protocolContext, - final EthContext ethContext, - final int maxActiveChunks, - final MetricsSystem metricsSystem, - final BlockHandler blockHandler, - final ValidationPolicy validationPolicy, - final List checkpointHeaders) { - return new PipelinedImportChainSegmentTask<>( - protocolSchedule, - protocolContext, - ethContext, - maxActiveChunks, - checkpointHeaders, - metricsSystem, - blockHandler, - validationPolicy); - } - - @Override - protected void executeTask() { - LOG.debug( - "Importing chain segment from {} to {}.", - firstHeader().getNumber(), - lastHeader().getNumber()); - for (int i = 0; i < chunksInTotal && i < maxActiveChunks; i++) { - createNextChunkPipeline(); - } - } - - private void createNextChunkPipeline() { - final BlockHeader firstChunkHeader = checkpointHeaders.get(chunksIssued); - final BlockHeader lastChunkHeader = checkpointHeaders.get(chunksIssued + 1); - - final CompletableFuture> downloadAndValidateHeadersTask = - lastDownloadAndValidateHeadersTask() - .thenCompose((ignore) -> downloadNextHeaders(firstChunkHeader, lastChunkHeader)) - .thenCompose(this::validateHeaders); - final CompletableFuture> downloadBodiesTask = - downloadAndValidateHeadersTask - .thenCombine(lastDownloadBodiesTask(), (headers, ignored) -> headers) - .thenCompose(this::downloadBlocks); - final CompletableFuture> validateAndImportBlocksTask = - downloadBodiesTask - .thenCombine(lastValidateAndImportBlocksTasks(), (blocks, ignored) -> blocks) - .thenCompose(this::validateAndImportBlocks); - validateAndImportBlocksTask.whenComplete(this::completeChunkPipelineAndMaybeLaunchNextOne); - - downloadAndValidateHeadersTasks.addLast(downloadAndValidateHeadersTask); - downloadBodiesTasks.addLast(downloadBodiesTask); - validateAndImportBlocksTasks.addLast(validateAndImportBlocksTask); - chunksIssued++; - } - - private CompletableFuture> validateAndImportBlocks(final List blocks) { - final Supplier>> task = - () -> blockHandler.validateAndImportBlocks(blocks); - return executeWorkerSubTask(ethContext.getScheduler(), task); - } - - public void completeChunkPipelineAndMaybeLaunchNextOne( - final List blocks, final Throwable throwable) { - if (throwable != null) { - LOG.warn( - "Import of chain segment ({} to {}) failed: {}.", - firstHeader().getNumber(), - lastHeader().getNumber(), - ExceptionUtils.rootCause(throwable).getMessage()); - result.get().completeExceptionally(throwable); - } else { - importedBlocks.addAll(blocks); - chunksCompleted++; - LOG.debug("Import chain segment succeeded (chunk {}/{}).", chunksCompleted, chunksInTotal); - if (chunksCompleted == chunksInTotal) { - LOG.info( - "Completed importing chain segment {} to {}", - firstHeader().getNumber(), - lastHeader().getNumber()); - result.get().complete(importedBlocks); - } else { - downloadAndValidateHeadersTasks.removeFirst(); - downloadBodiesTasks.removeFirst(); - validateAndImportBlocksTasks.removeFirst(); - if (chunksIssued < chunksInTotal) { - createNextChunkPipeline(); - } - } - } - } - - private CompletableFuture> downloadNextHeaders( - final BlockHeader firstChunkHeader, final BlockHeader lastChunkHeader) { - // Download the headers we're missing (between first and last) - LOG.debug( - "Downloading headers {} to {}", - firstChunkHeader.getNumber() + 1, - lastChunkHeader.getNumber()); - final int segmentLength = - Math.toIntExact(lastChunkHeader.getNumber() - firstChunkHeader.getNumber() - 1); - if (segmentLength == 0) { - return CompletableFuture.completedFuture( - Lists.newArrayList(firstChunkHeader, lastChunkHeader)); - } - final DownloadHeaderSequenceTask task = - DownloadHeaderSequenceTask.endingAtHeader( - protocolSchedule, - protocolContext, - ethContext, - lastChunkHeader, - segmentLength, - metricsSystem); - return executeSubTask(task::run) - .thenApply( - headers -> { - final List finalHeaders = Lists.newArrayList(firstChunkHeader); - finalHeaders.addAll(headers); - finalHeaders.add(lastChunkHeader); - return finalHeaders; - }); - } - - private CompletableFuture> validateHeaders(final List headers) { - // First header needs to be validated - return executeWorkerSubTask( - ethContext.getScheduler(), - () -> { - final CompletableFuture> result = new CompletableFuture<>(); - final BlockHeader parentHeader = headers.get(0); - final BlockHeader childHeader = headers.get(1); - final ProtocolSpec protocolSpec = - protocolSchedule.getByBlockNumber(childHeader.getNumber()); - final BlockHeaderValidator blockHeaderValidator = - protocolSpec.getBlockHeaderValidator(); - if (blockHeaderValidator.validateHeader( - childHeader, - parentHeader, - protocolContext, - validationPolicy.getValidationModeForNextBlock())) { - // The first header will be imported by the previous request range. - result.complete(headers.subList(1, headers.size())); - } else { - result.completeExceptionally( - new InvalidBlockException( - "Provided first header does not connect to last header.", - parentHeader.getNumber(), - parentHeader.getHash())); - } - return result; - }); - } - - private CompletableFuture> downloadBlocks(final List headers) { - LOG.debug( - "Downloading bodies {} to {}", - headers.get(0).getNumber(), - headers.get(headers.size() - 1).getNumber()); - return executeSubTask(() -> blockHandler.downloadBlocks(headers)); - } - - private BlockHeader firstHeader() { - return checkpointHeaders.get(0); - } - - private BlockHeader lastHeader() { - return checkpointHeaders.get(checkpointHeaders.size() - 1); - } - - private CompletableFuture> lastDownloadAndValidateHeadersTask() { - if (downloadAndValidateHeadersTasks.isEmpty()) { - return CompletableFuture.completedFuture(Collections.emptyList()); - } else { - return downloadAndValidateHeadersTasks.getLast(); - } - } - - private CompletableFuture> lastDownloadBodiesTask() { - if (downloadBodiesTasks.isEmpty()) { - return CompletableFuture.completedFuture(Lists.newArrayList()); - } else { - return downloadBodiesTasks.getLast(); - } - } - - private CompletableFuture> lastValidateAndImportBlocksTasks() { - if (validateAndImportBlocksTasks.isEmpty()) { - return CompletableFuture.completedFuture(Lists.newArrayList()); - } else { - return validateAndImportBlocksTasks.getLast(); - } - } -} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java deleted file mode 100644 index b174ead9c1..0000000000 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Copyright 2018 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.ethereum.eth.sync.tasks; - -import static org.assertj.core.api.Assertions.assertThat; -import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; - -import tech.pegasys.pantheon.ethereum.ProtocolContext; -import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; -import tech.pegasys.pantheon.ethereum.core.Block; -import tech.pegasys.pantheon.ethereum.core.BlockBody; -import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; -import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions; -import tech.pegasys.pantheon.ethereum.core.BlockHeader; -import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; -import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; -import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.AbstractMessageTaskTest; -import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask; -import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; -import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; -import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; -import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncBlockHandler; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; -import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; -import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.LongStream; - -import org.junit.Test; - -public class PipelinedImportChainSegmentTaskTest - extends AbstractMessageTaskTest, List> { - - private static final ValidationPolicy DETACHED_ONLY_VALIDATION_POLICY = - () -> HeaderValidationMode.DETACHED_ONLY; - - @Override - protected List generateDataToBeRequested() { - final long chainHead = blockchain.getChainHeadBlockNumber(); - final long importSize = 5; - final long startNumber = chainHead - importSize + 1; - final List blocksToImport = new ArrayList<>(); - for (long i = 0; i < importSize; i++) { - blocksToImport.add(getBlockAtNumber(startNumber + i)); - } - return blocksToImport; - } - - private Block getBlockAtNumber(final long number) { - final BlockHeader header = blockchain.getBlockHeader(number).get(); - final BlockBody body = blockchain.getBlockBody(header.getHash()).get(); - return new Block(header, body); - } - - private CompletableFuture> validateAndImportBlocks(final List blocks) { - return PersistBlockTask.forSequentialBlocks( - protocolSchedule, - protocolContext, - blocks, - HeaderValidationMode.SKIP_DETACHED, - metricsSystem) - .get(); - } - - @Override - protected EthTask> createTask(final List requestedData) { - final Block firstBlock = requestedData.get(0); - final Block lastBlock = requestedData.get(requestedData.size() - 1); - final Block previousBlock = getBlockAtNumber(firstBlock.getHeader().getNumber() - 1); - final MutableBlockchain shortBlockchain = createShortChain(firstBlock.getHeader().getNumber()); - final ProtocolContext modifiedContext = - new ProtocolContext<>( - shortBlockchain, - protocolContext.getWorldStateArchive(), - protocolContext.getConsensusState()); - return PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, - modifiedContext, - ethContext, - 1, - metricsSystem, - createBlockHandler(), - DETACHED_ONLY_VALIDATION_POLICY, - previousBlock.getHeader(), - lastBlock.getHeader()); - } - - @Override - protected void assertResultMatchesExpectation( - final List requestedData, final List response, final EthPeer respondingPeer) { - assertThat(response).isEqualTo(requestedData); - } - - @Test - public void betweenContiguousHeadersSucceeds() { - // Setup a responsive peer - final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); - final RespondingEthPeer respondingPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - - // Setup task and expectations - final Block firstBlock = getBlockAtNumber(5L); - final Block secondBlock = getBlockAtNumber(6L); - final List expectedResult = Collections.singletonList(secondBlock); - final MutableBlockchain shortBlockchain = createShortChain(firstBlock.getHeader().getNumber()); - final ProtocolContext modifiedContext = - new ProtocolContext<>( - shortBlockchain, - protocolContext.getWorldStateArchive(), - protocolContext.getConsensusState()); - final EthTask> task = - PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, - modifiedContext, - ethContext, - 1, - metricsSystem, - createBlockHandler(), - DETACHED_ONLY_VALIDATION_POLICY, - firstBlock.getHeader(), - secondBlock.getHeader()); - - // Sanity check - assertThat(shortBlockchain.contains(secondBlock.getHash())).isFalse(); - - // Execute task and wait for response - final AtomicReference> actualResult = new AtomicReference<>(); - final AtomicBoolean done = new AtomicBoolean(false); - - final CompletableFuture> future = task.run(); - respondingPeer.respond(responder); - future.whenComplete( - (result, error) -> { - actualResult.set(result); - done.compareAndSet(false, true); - }); - - assertThat(done).isTrue(); - assertResultMatchesExpectation(expectedResult, actualResult.get(), respondingPeer.getEthPeer()); - } - - @Test - public void betweenUnconnectedHeadersFails() { - final BlockDataGenerator gen = new BlockDataGenerator(); - // Setup a responsive peer - final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); - final RespondingEthPeer respondingPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - - // Setup data - final Block fakeFirstBlock = gen.block(BlockOptions.create().setBlockNumber(5L)); - final Block firstBlock = getBlockAtNumber(5L); - final Block secondBlock = getBlockAtNumber(6L); - final Block thirdBlock = getBlockAtNumber(7L); - - // Setup task - final MutableBlockchain shortBlockchain = createShortChain(firstBlock.getHeader().getNumber()); - final ProtocolContext modifiedContext = - new ProtocolContext<>( - shortBlockchain, - protocolContext.getWorldStateArchive(), - protocolContext.getConsensusState()); - final EthTask> task = - PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, - modifiedContext, - ethContext, - 1, - metricsSystem, - createBlockHandler(), - DETACHED_ONLY_VALIDATION_POLICY, - fakeFirstBlock.getHeader(), - thirdBlock.getHeader()); - - // Sanity check - assertThat(shortBlockchain.contains(secondBlock.getHash())).isFalse(); - - // Execute task and wait for response - final AtomicReference actualError = new AtomicReference<>(); - final AtomicReference> actualResult = new AtomicReference<>(); - final AtomicBoolean done = new AtomicBoolean(false); - - final CompletableFuture> future = task.run(); - respondingPeer.respond(responder); - future.whenComplete( - (result, error) -> { - actualResult.set(result); - actualError.set(error); - done.compareAndSet(false, true); - }); - - assertThat(done).isTrue(); - assertThat(actualResult.get()).isNull(); - assertThat(actualError.get()).hasCauseInstanceOf(InvalidBlockException.class); - } - - @Test - public void shouldSyncInSequencesOfChunksSequentially() { - // Setup a responsive peer - final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); - final RespondingEthPeer respondingPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - - // Setup task for three chunks - final List checkpointHeaders = - LongStream.range(0, 13) - .filter(n -> n % 4 == 0) - .mapToObj(this::getBlockAtNumber) - .map(Block::getHeader) - .collect(Collectors.toList()); - final List expectedResult = - LongStream.range(1, 13).mapToObj(this::getBlockAtNumber).collect(Collectors.toList()); - final MutableBlockchain shortBlockchain = createShortChain(0); - final ProtocolContext modifiedContext = - new ProtocolContext<>( - shortBlockchain, - protocolContext.getWorldStateArchive(), - protocolContext.getConsensusState()); - final EthTask> task = - PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, - modifiedContext, - ethContext, - 1, - metricsSystem, - createBlockHandler(), - DETACHED_ONLY_VALIDATION_POLICY, - checkpointHeaders); - - // Execute task and wait for response - final AtomicReference> actualResult = new AtomicReference<>(); - final AtomicBoolean done = new AtomicBoolean(false); - - final CompletableFuture> future = task.run(); - final CountingResponder countingResponder = CountingResponder.wrap(responder); - - // Import first segment's headers and bodies - respondingPeer.respondTimes(countingResponder, 2); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(1); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(1); - // Import second segment's headers and bodies - respondingPeer.respondTimes(countingResponder, 2); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(2); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(2); - // Import third segment's headers and bodies - respondingPeer.respondTimes(countingResponder, 2); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(3); - - future.whenComplete( - (result, error) -> { - actualResult.set(result); - done.compareAndSet(false, true); - }); - - assertThat(done).isTrue(); - assertResultMatchesExpectation(expectedResult, actualResult.get(), respondingPeer.getEthPeer()); - } - - @Test - public void shouldPipelineChainSegmentImportsUpToMaxActiveChunks() { - // Setup a responsive peer - final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); - final RespondingEthPeer respondingPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - - // Setup task and expectations - final List checkpointHeaders = - LongStream.range(0, 13) - .filter(n -> n % 4 == 0) - .mapToObj(this::getBlockAtNumber) - .map(Block::getHeader) - .collect(Collectors.toList()); - final List expectedResult = - LongStream.range(1, 13).mapToObj(this::getBlockAtNumber).collect(Collectors.toList()); - final MutableBlockchain shortBlockchain = createShortChain(0); - final ProtocolContext modifiedContext = - new ProtocolContext<>( - shortBlockchain, - protocolContext.getWorldStateArchive(), - protocolContext.getConsensusState()); - final EthTask> task = - PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, - modifiedContext, - ethContext, - 2, - metricsSystem, - createBlockHandler(), - DETACHED_ONLY_VALIDATION_POLICY, - checkpointHeaders); - - // Execute task and wait for response - final AtomicReference> actualResult = new AtomicReference<>(); - final AtomicBoolean done = new AtomicBoolean(false); - - final CompletableFuture> future = task.run(); - final CountingResponder countingResponder = CountingResponder.wrap(responder); - - // Import first segment's header - respondingPeer.respond(countingResponder); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(1); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(0); - // Import first segment's body and second segment's header - respondingPeer.respond(countingResponder); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(2); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(1); - // Import second segment's body and third segment's header - respondingPeer.respond(countingResponder); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(2); - // Import third segment's body - respondingPeer.respond(countingResponder); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(3); - - future.whenComplete( - (result, error) -> { - actualResult.set(result); - done.compareAndSet(false, true); - }); - - assertThat(done).isTrue(); - assertResultMatchesExpectation(expectedResult, actualResult.get(), respondingPeer.getEthPeer()); - } - - @Test - public void shouldPipelineChainSegmentImportsWithinMaxActiveChunks() { - // Setup a responsive peer - final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); - final RespondingEthPeer respondingPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - - // Setup task and expectations - final List checkpointHeaders = - LongStream.range(0, 13) - .filter(n -> n % 4 == 0) - .mapToObj(this::getBlockAtNumber) - .map(Block::getHeader) - .collect(Collectors.toList()); - final List expectedResult = - LongStream.range(1, 13).mapToObj(this::getBlockAtNumber).collect(Collectors.toList()); - final MutableBlockchain shortBlockchain = createShortChain(0); - final ProtocolContext modifiedContext = - new ProtocolContext<>( - shortBlockchain, - protocolContext.getWorldStateArchive(), - protocolContext.getConsensusState()); - final EthTask> task = - PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, - modifiedContext, - ethContext, - 3, - metricsSystem, - createBlockHandler(), - DETACHED_ONLY_VALIDATION_POLICY, - checkpointHeaders); - - // Execute task and wait for response - final AtomicReference> actualResult = new AtomicReference<>(); - final AtomicBoolean done = new AtomicBoolean(false); - - final CompletableFuture> future = task.run(); - final CountingResponder countingResponder = CountingResponder.wrap(responder); - - // Import first segment's header - respondingPeer.respond(countingResponder); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(1); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(0); - // Import first segment's body and second segment's header - respondingPeer.respond(countingResponder); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(2); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(1); - // Import second segment's body and third segment's header - respondingPeer.respond(countingResponder); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(2); - // Import third segment's body - respondingPeer.respond(countingResponder); - assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3); - assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(3); - - future.whenComplete( - (result, error) -> { - actualResult.set(result); - done.compareAndSet(false, true); - }); - - assertThat(done).isTrue(); - assertResultMatchesExpectation(expectedResult, actualResult.get(), respondingPeer.getEthPeer()); - } - - private MutableBlockchain createShortChain(final long lastBlockToInclude) { - final BlockHeader genesisHeader = - blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get(); - final BlockBody genesisBody = blockchain.getBlockBody(genesisHeader.getHash()).get(); - final Block genesisBlock = new Block(genesisHeader, genesisBody); - final MutableBlockchain shortChain = createInMemoryBlockchain(genesisBlock); - long nextBlock = genesisHeader.getNumber() + 1; - while (nextBlock <= lastBlockToInclude) { - final BlockHeader header = blockchain.getBlockHeader(nextBlock).get(); - final BlockBody body = blockchain.getBlockBody(header.getHash()).get(); - final List receipts = blockchain.getTxReceipts(header.getHash()).get(); - final Block block = new Block(header, body); - shortChain.appendBlock(block, receipts); - nextBlock++; - } - return shortChain; - } - - private FullSyncBlockHandler createBlockHandler() { - return new FullSyncBlockHandler<>(protocolSchedule, protocolContext, ethContext, metricsSystem); - } - - private static class CountingResponder implements Responder { - - private final Responder delegate; - private int getBlockHeaderMessages = 0; - private int getBlockBodiesMessages = 0; - private int getReceiptsMessages = 0; - private int getNodeDataMessages = 0; - - private static CountingResponder wrap(final Responder delegate) { - return new CountingResponder(delegate); - } - - private CountingResponder(final Responder delegate) { - this.delegate = delegate; - } - - @Override - public Optional respond(final Capability cap, final MessageData msg) { - final MessageData response = null; - switch (msg.getCode()) { - case EthPV62.GET_BLOCK_HEADERS: - getBlockHeaderMessages++; - break; - case EthPV62.GET_BLOCK_BODIES: - getBlockBodiesMessages++; - break; - case EthPV63.GET_RECEIPTS: - getReceiptsMessages++; - break; - case EthPV63.GET_NODE_DATA: - getNodeDataMessages++; - break; - } - return delegate.respond(cap, msg); - } - - public int getBlockHeaderMessages() { - return getBlockHeaderMessages; - } - - public int getBlockBodiesMessages() { - return getBlockBodiesMessages; - } - - public int getReceiptsMessages() { - return getReceiptsMessages; - } - - public int getNodeDataMessages() { - return getNodeDataMessages; - } - } -}