Skip to content

Commit 4ad48cf

Browse files
committed
added processing checkpoint blocks to fetcher
1 parent b7db87e commit 4ad48cf

File tree

5 files changed

+397
-134
lines changed

5 files changed

+397
-134
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
1313
AwaitingBodiesToBeIgnored,
1414
AwaitingHeadersToBeIgnored
1515
}
16-
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
16+
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NewCheckpointBlock, NotOnTop, OnTop}
1717
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
1818
import io.iohk.ethereum.crypto.kec256
1919
import io.iohk.ethereum.domain._
@@ -215,16 +215,50 @@ class BlockFetcher(
215215

216216
if (state.isOnTop && newBlockNr == nextExpectedBlock) {
217217
log.debug("Passing block directly to importer")
218-
val newState = state.withPeerForBlocks(peerId, Seq(newBlockNr)).withKnownTopAt(newBlockNr)
218+
val newState = state
219+
.withPeerForBlocks(peerId, Seq(newBlockNr))
220+
.withLastBlock(newBlockNr)
221+
.withKnownTopAt(newBlockNr)
219222
state.importer ! OnTop
220223
state.importer ! ImportNewBlock(block, peerId)
221224
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
222225
context become started(newState)
223-
} else {
224-
log.debug("Ignoring received block as it doesn't match local state or fetch side is not on top")
225-
val newState = state.withPossibleNewTopAt(block.number)
226-
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
227-
fetchBlocks(newState)
226+
} else if (block.hasCheckpoint) {
227+
handleNewCheckpointBlockNotOnTop(block, peerId, state)
228+
} else handleFutureBlock(block, state)
229+
}
230+
231+
private def handleFutureBlock(block: Block, state: BlockFetcherState): Unit = {
232+
log.debug("Ignoring received block as it doesn't match local state or fetch side is not on top")
233+
val newState = state.withPossibleNewTopAt(block.number)
234+
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
235+
fetchBlocks(newState)
236+
}
237+
238+
private def handleNewCheckpointBlockNotOnTop(block: Block, peerId: PeerId, state: BlockFetcherState): Unit = {
239+
log.debug("Got checkpoint block")
240+
val blockHash = block.hash
241+
state.tryInsertBlock(block, peerId) match {
242+
case Left(_) if block.number <= state.lastBlock =>
243+
log.debug(
244+
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} is older than current last block ${state.lastBlock}"
245+
)
246+
state.importer ! NewCheckpointBlock(block, peerId)
247+
case Left(_) if block.number <= state.knownTop =>
248+
log.debug(
249+
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} not fit into queues - clearing the queues and setting new top"
250+
)
251+
val newState = state
252+
.clearQueues()
253+
.withPeerForBlocks(peerId, Seq(block.number))
254+
.withKnownTopAt(block.number)
255+
fetchBlocks(newState)
256+
case Left(error) =>
257+
log.debug(error)
258+
handleFutureBlock(block, state)
259+
case Right(state) =>
260+
log.debug(s"Checkpoint block [${ByteStringUtils.hash2string(blockHash)}] fit into queues")
261+
fetchBlocks(state)
228262
}
229263
}
230264

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@ package io.iohk.ethereum.blockchain.sync.regular
33
import akka.actor.ActorRef
44
import akka.util.ByteString
55
import cats.data.NonEmptyList
6-
import io.iohk.ethereum.consensus.validators.BlockValidator
76
import cats.implicits._
87
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState._
8+
import io.iohk.ethereum.consensus.validators.BlockValidator
99
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
1010
import io.iohk.ethereum.network.PeerId
1111
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
12+
import io.iohk.ethereum.utils.ByteStringUtils
1213

13-
import scala.collection.immutable.Queue
1414
import scala.annotation.tailrec
15-
import io.iohk.ethereum.consensus.validators.BlockValidator
15+
import scala.collection.immutable.Queue
1616

1717
// scalastyle:off number.of.methods
1818
/**
@@ -53,7 +53,7 @@ case class BlockFetcherState(
5353

5454
private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
5555

56-
def hasFetchedTopHeader: Boolean = lastBlock == knownTop
56+
def hasFetchedTopHeader: Boolean = nextBlockToFetch == knownTop + 1
5757

5858
def isOnTop: Boolean = hasFetchedTopHeader && hasEmptyBuffer
5959

@@ -83,11 +83,31 @@ case class BlockFetcherState(
8383
val lastNumber = HeadersSeq.lastNumber(validHeaders)
8484
withPossibleNewTopAt(lastNumber)
8585
.copy(
86-
waitingHeaders = waitingHeaders ++ validHeaders,
87-
lastBlock = lastNumber.getOrElse(lastBlock)
86+
waitingHeaders = waitingHeaders ++ validHeaders
8887
)
8988
})
9089

90+
def tryInsertBlock(block: Block, peerId: PeerId): Either[String, BlockFetcherState] = {
91+
val blockHash = block.hash
92+
if (isExist(blockHash)) {
93+
Right(this)
94+
} else if (isExistInReadyBlocks(block.header.parentHash)) {
95+
val newState = clearQueues()
96+
.copy(
97+
readyBlocks = readyBlocks.takeWhile(_.number < block.number).enqueue(block)
98+
)
99+
.withPeerForBlocks(peerId, Seq(block.number))
100+
.withKnownTopAt(block.number)
101+
Right(newState)
102+
} else if (isExistInWaitingHeaders(block.header.parentHash)) {
103+
val newState = copy(
104+
waitingHeaders = waitingHeaders.takeWhile(_.number < block.number).enqueue(block.header)
105+
)
106+
.withKnownTopAt(block.number)
107+
Right(newState)
108+
} else Left(s"Cannot insert block [${ByteStringUtils.hash2string(blockHash)}] into the queues")
109+
}
110+
91111
/**
92112
* Validates received headers consistency and their compatibilty with the state
93113
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
@@ -169,7 +189,6 @@ case class BlockFetcherState(
169189
if (waitingHeader.hash == block.hash)
170190
withPeerForBlocks(fromPeer, Seq(block.number))
171191
.withPossibleNewTopAt(block.number)
172-
.withLastBlock(block.number)
173192
.copy(
174193
readyBlocks = readyBlocks.enqueue(block),
175194
waitingHeaders = waitingHeadersTail
@@ -182,7 +201,7 @@ case class BlockFetcherState(
182201
def pickBlocks(amount: Int): Option[(NonEmptyList[Block], BlockFetcherState)] =
183202
if (readyBlocks.nonEmpty) {
184203
val (picked, rest) = readyBlocks.splitAt(amount)
185-
Some((NonEmptyList(picked.head, picked.tail.toList), copy(readyBlocks = rest)))
204+
Some((NonEmptyList(picked.head, picked.tail.toList), copy(readyBlocks = rest, lastBlock = picked.last.number)))
186205
} else {
187206
None
188207
}
@@ -203,28 +222,41 @@ case class BlockFetcherState(
203222
.map(blocks => (NonEmptyList(blocks.head, blocks.tail.toList), copy(readyBlocks = Queue())))
204223
}
205224

206-
def invalidateBlocksFrom(nr: BigInt): (Option[PeerId], BlockFetcherState) = invalidateBlocksFrom(nr, Some(nr))
207-
208-
def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = {
225+
def clearQueues(): BlockFetcherState = {
209226
// We can't start completely from scratch as requests could be in progress, we have to keep special track of them
210227
val newFetchingHeadersState =
211228
if (fetchingHeadersState == AwaitingHeaders) AwaitingHeadersToBeIgnored else fetchingHeadersState
212229
val newFetchingBodiesState =
213230
if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState
214231

232+
copy(
233+
readyBlocks = Queue(),
234+
waitingHeaders = Queue(),
235+
fetchingHeadersState = newFetchingHeadersState,
236+
fetchingBodiesState = newFetchingBodiesState
237+
)
238+
}
239+
240+
def invalidateBlocksFrom(nr: BigInt): (Option[PeerId], BlockFetcherState) = invalidateBlocksFrom(nr, Some(nr))
241+
242+
def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = {
215243
(
216244
toBlacklist.flatMap(blockProviders.get),
217-
copy(
218-
readyBlocks = Queue(),
219-
waitingHeaders = Queue(),
220-
lastBlock = (nr - 2).max(0),
221-
fetchingHeadersState = newFetchingHeadersState,
222-
fetchingBodiesState = newFetchingBodiesState,
223-
blockProviders = blockProviders - nr
224-
)
245+
this
246+
.clearQueues()
247+
.copy(
248+
lastBlock = (nr - 2).max(0),
249+
blockProviders = blockProviders - nr
250+
)
225251
)
226252
}
227253

254+
def isExist(hash: ByteString): Boolean = isExistInReadyBlocks(hash) || isExistInWaitingHeaders(hash)
255+
256+
def isExistInWaitingHeaders(hash: ByteString): Boolean = waitingHeaders.exists(_.hash == hash)
257+
258+
def isExistInReadyBlocks(hash: ByteString): Boolean = readyBlocks.exists(_.hash == hash)
259+
228260
def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr)
229261

230262
def withKnownTopAt(nr: BigInt): BlockFetcherState = copy(knownTop = nr)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ class BlockImporter(
9090
}
9191
}
9292

93+
case NewCheckpointBlock(block, peerId) => importNewBlock(block, peerId, state)
94+
9395
case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)
9496

9597
case ImportDone(newBehavior) =>
@@ -376,6 +378,7 @@ object BlockImporter {
376378
case object NotOnTop extends ImporterMsg
377379
case class MinedBlock(block: Block) extends ImporterMsg
378380
case class NewCheckpoint(parentHash: ByteString, signatures: Seq[ECDSASignature]) extends ImporterMsg
381+
case class NewCheckpointBlock(block: Block, peerId: PeerId) extends ImporterMsg
379382
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
380383
case class ImportDone(newBehavior: NewBehavior) extends ImporterMsg
381384
case object PickBlocks extends ImporterMsg

0 commit comments

Comments
 (0)