Skip to content

[FIX] Processing checkpoint blocks by fetcher #866

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
AwaitingBodiesToBeIgnored,
AwaitingHeadersToBeIgnored
}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NewCheckpointBlock, NotOnTop, OnTop}
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain._
Expand Down Expand Up @@ -215,16 +215,50 @@ class BlockFetcher(

if (state.isOnTop && newBlockNr == nextExpectedBlock) {
log.debug("Passing block directly to importer")
val newState = state.withPeerForBlocks(peerId, Seq(newBlockNr)).withKnownTopAt(newBlockNr)
val newState = state
.withPeerForBlocks(peerId, Seq(newBlockNr))
.withLastBlock(newBlockNr)
.withKnownTopAt(newBlockNr)
state.importer ! OnTop
state.importer ! ImportNewBlock(block, peerId)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
context become started(newState)
} else {
log.debug("Ignoring received block as it doesn't match local state or fetch side is not on top")
val newState = state.withPossibleNewTopAt(block.number)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
fetchBlocks(newState)
} else if (block.hasCheckpoint) {
handleNewCheckpointBlockNotOnTop(block, peerId, state)
} else handleFutureBlock(block, state)
}

private def handleFutureBlock(block: Block, state: BlockFetcherState): Unit = {
log.debug("Ignoring received block as it doesn't match local state or fetch side is not on top")
val newState = state.withPossibleNewTopAt(block.number)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
fetchBlocks(newState)
}

private def handleNewCheckpointBlockNotOnTop(block: Block, peerId: PeerId, state: BlockFetcherState): Unit = {
log.debug("Got checkpoint block")
val blockHash = block.hash
state.tryInsertBlock(block, peerId) match {
case Left(_) if block.number <= state.lastBlock =>
log.debug(
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} is older than current last block ${state.lastBlock}"
)
state.importer ! NewCheckpointBlock(block, peerId)
case Left(_) if block.number <= state.knownTop =>
log.debug(
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} not fit into queues - clearing the queues and setting new top"
)
val newState = state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the block be saved somehow or passed to block importer in this case? This case means more or less "we're on different fork" and we don't do anything in order to switch into checkpointed branch

.clearQueues()
.withPeerForBlocks(peerId, Seq(block.number))
.withKnownTopAt(block.number)
fetchBlocks(newState)
case Left(error) =>
log.debug(error)
handleFutureBlock(block, state)
case Right(state) =>
log.debug(s"Checkpoint block [${ByteStringUtils.hash2string(blockHash)}] fit into queues")
fetchBlocks(state)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.ActorRef
import akka.util.ByteString
import cats.data.NonEmptyList
import io.iohk.ethereum.consensus.validators.BlockValidator
import cats.implicits._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState._
import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
import io.iohk.ethereum.utils.ByteStringUtils

import scala.collection.immutable.Queue
import scala.annotation.tailrec
import io.iohk.ethereum.consensus.validators.BlockValidator
import scala.collection.immutable.Queue

// scalastyle:off number.of.methods
/**
Expand Down Expand Up @@ -53,7 +53,7 @@ case class BlockFetcherState(

private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty

def hasFetchedTopHeader: Boolean = lastBlock == knownTop
def hasFetchedTopHeader: Boolean = nextBlockToFetch == knownTop + 1

def isOnTop: Boolean = hasFetchedTopHeader && hasEmptyBuffer

Expand Down Expand Up @@ -83,11 +83,31 @@ case class BlockFetcherState(
val lastNumber = HeadersSeq.lastNumber(validHeaders)
withPossibleNewTopAt(lastNumber)
.copy(
waitingHeaders = waitingHeaders ++ validHeaders,
lastBlock = lastNumber.getOrElse(lastBlock)
waitingHeaders = waitingHeaders ++ validHeaders
)
})

def tryInsertBlock(block: Block, peerId: PeerId): Either[String, BlockFetcherState] = {
val blockHash = block.hash
if (isExist(blockHash)) {
Right(this)
} else if (isExistInReadyBlocks(block.header.parentHash)) {
val newState = clearQueues()
.copy(
readyBlocks = readyBlocks.takeWhile(_.number < block.number).enqueue(block)
)
.withPeerForBlocks(peerId, Seq(block.number))
.withKnownTopAt(block.number)
Right(newState)
} else if (isExistInWaitingHeaders(block.header.parentHash)) {
val newState = copy(
waitingHeaders = waitingHeaders.takeWhile(_.number < block.number).enqueue(block.header)
)
.withKnownTopAt(block.number)
Right(newState)
} else Left(s"Cannot insert block [${ByteStringUtils.hash2string(blockHash)}] into the queues")
}

/**
* Validates received headers consistency and their compatibilty with the state
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
Expand Down Expand Up @@ -169,7 +189,6 @@ case class BlockFetcherState(
if (waitingHeader.hash == block.hash)
withPeerForBlocks(fromPeer, Seq(block.number))
.withPossibleNewTopAt(block.number)
.withLastBlock(block.number)
.copy(
readyBlocks = readyBlocks.enqueue(block),
waitingHeaders = waitingHeadersTail
Expand All @@ -182,7 +201,7 @@ case class BlockFetcherState(
def pickBlocks(amount: Int): Option[(NonEmptyList[Block], BlockFetcherState)] =
if (readyBlocks.nonEmpty) {
val (picked, rest) = readyBlocks.splitAt(amount)
Some((NonEmptyList(picked.head, picked.tail.toList), copy(readyBlocks = rest)))
Some((NonEmptyList(picked.head, picked.tail.toList), copy(readyBlocks = rest, lastBlock = picked.last.number)))
} else {
None
}
Expand All @@ -203,28 +222,41 @@ case class BlockFetcherState(
.map(blocks => (NonEmptyList(blocks.head, blocks.tail.toList), copy(readyBlocks = Queue())))
}

def invalidateBlocksFrom(nr: BigInt): (Option[PeerId], BlockFetcherState) = invalidateBlocksFrom(nr, Some(nr))

def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = {
def clearQueues(): BlockFetcherState = {
// We can't start completely from scratch as requests could be in progress, we have to keep special track of them
val newFetchingHeadersState =
if (fetchingHeadersState == AwaitingHeaders) AwaitingHeadersToBeIgnored else fetchingHeadersState
val newFetchingBodiesState =
if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState

copy(
readyBlocks = Queue(),
waitingHeaders = Queue(),
fetchingHeadersState = newFetchingHeadersState,
fetchingBodiesState = newFetchingBodiesState
)
}

def invalidateBlocksFrom(nr: BigInt): (Option[PeerId], BlockFetcherState) = invalidateBlocksFrom(nr, Some(nr))

def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = {
(
toBlacklist.flatMap(blockProviders.get),
copy(
readyBlocks = Queue(),
waitingHeaders = Queue(),
lastBlock = (nr - 2).max(0),
fetchingHeadersState = newFetchingHeadersState,
fetchingBodiesState = newFetchingBodiesState,
blockProviders = blockProviders - nr
)
this
.clearQueues()
.copy(
lastBlock = (nr - 2).max(0),
blockProviders = blockProviders - nr
)
)
}

def isExist(hash: ByteString): Boolean = isExistInReadyBlocks(hash) || isExistInWaitingHeaders(hash)

def isExistInWaitingHeaders(hash: ByteString): Boolean = waitingHeaders.exists(_.hash == hash)

def isExistInReadyBlocks(hash: ByteString): Boolean = readyBlocks.exists(_.hash == hash)

def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr)

def withKnownTopAt(nr: BigInt): BlockFetcherState = copy(knownTop = nr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class BlockImporter(
}
}

case NewCheckpointBlock(block, peerId) => importNewBlock(block, peerId, state)

case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)

case ImportDone(newBehavior) =>
Expand Down Expand Up @@ -376,6 +378,7 @@ object BlockImporter {
case object NotOnTop extends ImporterMsg
case class MinedBlock(block: Block) extends ImporterMsg
case class NewCheckpoint(parentHash: ByteString, signatures: Seq[ECDSASignature]) extends ImporterMsg
case class NewCheckpointBlock(block: Block, peerId: PeerId) extends ImporterMsg
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
case class ImportDone(newBehavior: NewBehavior) extends ImporterMsg
case object PickBlocks extends ImporterMsg
Expand Down
Loading