@@ -3,16 +3,16 @@ package io.iohk.ethereum.blockchain.sync.regular
33import akka .actor .ActorRef
44import akka .util .ByteString
55import cats .data .NonEmptyList
6- import io .iohk .ethereum .consensus .validators .BlockValidator
76import cats .implicits ._
87import io .iohk .ethereum .blockchain .sync .regular .BlockFetcherState ._
8+ import io .iohk .ethereum .consensus .validators .BlockValidator
99import io .iohk .ethereum .domain .{Block , BlockBody , BlockHeader , HeadersSeq }
1010import io .iohk .ethereum .network .PeerId
1111import io .iohk .ethereum .network .p2p .messages .PV62 .BlockHash
12+ import io .iohk .ethereum .utils .ByteStringUtils
1213
13- import scala .collection .immutable .Queue
1414import scala .annotation .tailrec
15- import io . iohk . ethereum . consensus . validators . BlockValidator
15+ import scala . collection . immutable . Queue
1616
1717// scalastyle:off number.of.methods
1818/**
@@ -53,7 +53,7 @@ case class BlockFetcherState(
5353
5454 private def hasEmptyBuffer : Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
5555
56- def hasFetchedTopHeader : Boolean = lastBlock == knownTop
56+ def hasFetchedTopHeader : Boolean = nextBlockToFetch == knownTop + 1
5757
5858 def isOnTop : Boolean = hasFetchedTopHeader && hasEmptyBuffer
5959
@@ -83,11 +83,31 @@ case class BlockFetcherState(
8383 val lastNumber = HeadersSeq .lastNumber(validHeaders)
8484 withPossibleNewTopAt(lastNumber)
8585 .copy(
86- waitingHeaders = waitingHeaders ++ validHeaders,
87- lastBlock = lastNumber.getOrElse(lastBlock)
86+ waitingHeaders = waitingHeaders ++ validHeaders
8887 )
8988 })
9089
90+ def tryInsertBlock (block : Block , peerId : PeerId ): Either [String , BlockFetcherState ] = {
91+ val blockHash = block.hash
92+ if (isExist(blockHash)) {
93+ Right (this )
94+ } else if (isExistInReadyBlocks(block.header.parentHash)) {
95+ val newState = clearQueues()
96+ .copy(
97+ readyBlocks = readyBlocks.takeWhile(_.number < block.number).enqueue(block)
98+ )
99+ .withPeerForBlocks(peerId, Seq (block.number))
100+ .withKnownTopAt(block.number)
101+ Right (newState)
102+ } else if (isExistInWaitingHeaders(block.header.parentHash)) {
103+ val newState = copy(
104+ waitingHeaders = waitingHeaders.takeWhile(_.number < block.number).enqueue(block.header)
105+ )
106+ .withKnownTopAt(block.number)
107+ Right (newState)
108+ } else Left (s " Cannot insert block [ ${ByteStringUtils .hash2string(blockHash)}] into the queues " )
109+ }
110+
91111 /**
92112 * Validates received headers consistency and their compatibilty with the state
93113 * TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
@@ -169,7 +189,6 @@ case class BlockFetcherState(
169189 if (waitingHeader.hash == block.hash)
170190 withPeerForBlocks(fromPeer, Seq (block.number))
171191 .withPossibleNewTopAt(block.number)
172- .withLastBlock(block.number)
173192 .copy(
174193 readyBlocks = readyBlocks.enqueue(block),
175194 waitingHeaders = waitingHeadersTail
@@ -182,7 +201,7 @@ case class BlockFetcherState(
182201 def pickBlocks (amount : Int ): Option [(NonEmptyList [Block ], BlockFetcherState )] =
183202 if (readyBlocks.nonEmpty) {
184203 val (picked, rest) = readyBlocks.splitAt(amount)
185- Some ((NonEmptyList (picked.head, picked.tail.toList), copy(readyBlocks = rest)))
204+ Some ((NonEmptyList (picked.head, picked.tail.toList), copy(readyBlocks = rest, lastBlock = picked.last.number )))
186205 } else {
187206 None
188207 }
@@ -203,28 +222,41 @@ case class BlockFetcherState(
203222 .map(blocks => (NonEmptyList (blocks.head, blocks.tail.toList), copy(readyBlocks = Queue ())))
204223 }
205224
206- def invalidateBlocksFrom (nr : BigInt ): (Option [PeerId ], BlockFetcherState ) = invalidateBlocksFrom(nr, Some (nr))
207-
208- def invalidateBlocksFrom (nr : BigInt , toBlacklist : Option [BigInt ]): (Option [PeerId ], BlockFetcherState ) = {
225+ def clearQueues (): BlockFetcherState = {
209226 // We can't start completely from scratch as requests could be in progress, we have to keep special track of them
210227 val newFetchingHeadersState =
211228 if (fetchingHeadersState == AwaitingHeaders ) AwaitingHeadersToBeIgnored else fetchingHeadersState
212229 val newFetchingBodiesState =
213230 if (fetchingBodiesState == AwaitingBodies ) AwaitingBodiesToBeIgnored else fetchingBodiesState
214231
232+ copy(
233+ readyBlocks = Queue (),
234+ waitingHeaders = Queue (),
235+ fetchingHeadersState = newFetchingHeadersState,
236+ fetchingBodiesState = newFetchingBodiesState
237+ )
238+ }
239+
240+ def invalidateBlocksFrom (nr : BigInt ): (Option [PeerId ], BlockFetcherState ) = invalidateBlocksFrom(nr, Some (nr))
241+
242+ def invalidateBlocksFrom (nr : BigInt , toBlacklist : Option [BigInt ]): (Option [PeerId ], BlockFetcherState ) = {
215243 (
216244 toBlacklist.flatMap(blockProviders.get),
217- copy(
218- readyBlocks = Queue (),
219- waitingHeaders = Queue (),
220- lastBlock = (nr - 2 ).max(0 ),
221- fetchingHeadersState = newFetchingHeadersState,
222- fetchingBodiesState = newFetchingBodiesState,
223- blockProviders = blockProviders - nr
224- )
245+ this
246+ .clearQueues()
247+ .copy(
248+ lastBlock = (nr - 2 ).max(0 ),
249+ blockProviders = blockProviders - nr
250+ )
225251 )
226252 }
227253
254+ def isExist (hash : ByteString ): Boolean = isExistInReadyBlocks(hash) || isExistInWaitingHeaders(hash)
255+
256+ def isExistInWaitingHeaders (hash : ByteString ): Boolean = waitingHeaders.exists(_.hash == hash)
257+
258+ def isExistInReadyBlocks (hash : ByteString ): Boolean = readyBlocks.exists(_.hash == hash)
259+
228260 def withLastBlock (nr : BigInt ): BlockFetcherState = copy(lastBlock = nr)
229261
230262 def withKnownTopAt (nr : BigInt ): BlockFetcherState = copy(knownTop = nr)
0 commit comments