Skip to content

Commit 3135e5a

Browse files
rtkaczykpslaski
authored andcommitted
[ETCM-280] use peer capabilities (protocol version) when sending checkpoint-related messages
1 parent 33a24dc commit 3135e5a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+895
-651
lines changed

src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicReference
66
import akka.actor.{ActorRef, ActorSystem}
77
import akka.testkit.TestProbe
88
import akka.util.{ByteString, Timeout}
9+
import io.iohk.ethereum.blockchain.sync.BlockBroadcast.BlockToBroadcast
910
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor
1011
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
1112
import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, TestSyncConfig}
@@ -18,21 +19,13 @@ import io.iohk.ethereum.ledger.InMemoryWorldStateProxy
1819
import io.iohk.ethereum.mpt.MerklePatriciaTrie
1920
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
2021
import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration}
21-
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, Node}
2222
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo}
23+
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, Node}
2324
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
2425
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
25-
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
2626
import io.iohk.ethereum.network.rlpx.AuthHandshaker
2727
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
28-
import io.iohk.ethereum.network.{
29-
EtcPeerManagerActor,
30-
ForkResolver,
31-
KnownNodesManager,
32-
PeerEventBusActor,
33-
PeerManagerActor,
34-
ServerActor
35-
}
28+
import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, ServerActor}
3629
import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder}
3730
import io.iohk.ethereum.sync.util.SyncCommonItSpec._
3831
import io.iohk.ethereum.sync.util.SyncCommonItSpecUtils._
@@ -159,7 +152,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
159152
override val peerConfiguration: PeerConfiguration = peerConf
160153
override val blockchain: Blockchain = bl
161154
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
162-
override val blockchainConfig = CommonFakePeer.this.blockchainConfig // FIXME: remove in ETCM-280
155+
override val protocolVersion: Int = Config.Network.protocolVersion
163156
}
164157

165158
lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)
@@ -230,7 +223,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
230223
}
231224

232225
private def broadcastBlock(block: Block, weight: ChainWeight) = {
233-
broadcasterActor ! BroadcastBlock(NewBlock(block, weight))
226+
broadcasterActor ! BroadcastBlock(BlockToBroadcast(block, weight))
234227
}
235228

236229
def getCurrentState(): BlockchainState = {

src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ import akka.actor.ActorRef
44
import akka.util.ByteString
55
import cats.effect.Resource
66
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
7-
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
7+
import io.iohk.ethereum.blockchain.sync.BlockBroadcast.BlockToBroadcast
88
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
99
import io.iohk.ethereum.blockchain.sync.regular.RegularSync
10+
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
1011
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
1112
import io.iohk.ethereum.consensus.ethash.{EthashConfig, EthashConsensus}
1213
import io.iohk.ethereum.consensus.{ConsensusConfig, FullConsensusConfig, ethash}
1314
import io.iohk.ethereum.domain._
1415
import io.iohk.ethereum.ledger._
15-
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
1616
import io.iohk.ethereum.nodebuilder.VmSetup
1717
import io.iohk.ethereum.ommers.OmmersPool
1818
import io.iohk.ethereum.sync.util.SyncCommonItSpecUtils.FakePeerCustomConfig.defaultConfig
@@ -70,7 +70,6 @@ object RegularSyncItSpecUtils {
7070
peerEventBus,
7171
ledger,
7272
bl,
73-
blockchainConfig, // FIXME: remove in ETCM-280
7473
testSyncConfig,
7574
ommersPool,
7675
pendingTransactionsManager,
@@ -140,7 +139,7 @@ object RegularSyncItSpecUtils {
140139
}
141140

142141
private def broadcastBlock(block: Block, weight: ChainWeight) = {
143-
broadcasterActor ! BroadcastBlock(NewBlock(block, weight))
142+
broadcasterActor ! BroadcastBlock(BlockToBroadcast(block, weight))
144143
}
145144

146145
private def createChildBlock(

src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,34 @@
11
package io.iohk.ethereum.txExecTest.util
22

3+
import java.util.concurrent.atomic.AtomicReference
4+
35
import akka.actor.ActorSystem
46
import akka.util.ByteString
57
import com.typesafe.config.ConfigFactory
68
import io.iohk.ethereum.db.components.Storages.PruningModeComponent
79
import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages}
8-
import io.iohk.ethereum.db.storage.{AppStateStorage, StateStorage}
10+
import io.iohk.ethereum.db.dataSource.{DataSourceBatchUpdate, RocksDbDataSource}
911
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
1012
import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation
1113
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
14+
import io.iohk.ethereum.db.storage.{AppStateStorage, StateStorage}
15+
import io.iohk.ethereum.domain.BlockHeader.HeaderExtraFields.HefEmpty
1216
import io.iohk.ethereum.domain.{Blockchain, UInt256, _}
1317
import io.iohk.ethereum.ledger.{InMemoryWorldStateProxy, InMemoryWorldStateProxyStorage}
1418
import io.iohk.ethereum.mpt.MptNode
1519
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
1620
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
21+
import io.iohk.ethereum.network.discovery.DiscoveryConfig
1722
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
1823
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
1924
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
2025
import io.iohk.ethereum.network.{ForkResolver, PeerEventBusActor, PeerManagerActor}
2126
import io.iohk.ethereum.nodebuilder.{AuthHandshakerBuilder, NodeKeyBuilder, SecureRandomBuilder}
22-
import io.iohk.ethereum.utils.{BlockchainConfig, Config, NodeStatus, ServerStatus}
23-
import java.util.concurrent.atomic.AtomicReference
24-
25-
import io.iohk.ethereum.db.dataSource.{DataSourceBatchUpdate, RocksDbDataSource}
27+
import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus}
28+
import monix.reactive.Observable
2629
import org.bouncycastle.util.encoders.Hex
2730

2831
import scala.concurrent.duration._
29-
import io.iohk.ethereum.domain.BlockHeader.HeaderExtraFields.HefEmpty
30-
import io.iohk.ethereum.network.discovery.DiscoveryConfig
31-
import monix.reactive.Observable
3232

3333
object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder with AuthHandshakerBuilder {
3434
val conf = ConfigFactory.load("txExecTest/chainDump.conf")
@@ -82,8 +82,8 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
8282
override val nodeStatusHolder: AtomicReference[NodeStatus] = DumpChainApp.nodeStatusHolder
8383
override val peerConfiguration: PeerConfiguration = peerConfig
8484
override val blockchain: Blockchain = DumpChainApp.blockchain
85-
override val blockchainConfig: BlockchainConfig = DumpChainApp.blockchainConfig
8685
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
86+
override val protocolVersion: Int = Config.Network.protocolVersion
8787
}
8888

8989
lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)

src/main/scala/io/iohk/ethereum/blockchain/sync/BlockBroadcast.scala

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package io.iohk.ethereum.blockchain.sync
22

33
import akka.actor.ActorRef
4-
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer}
4+
import io.iohk.ethereum.blockchain.sync.BlockBroadcast.BlockToBroadcast
5+
import io.iohk.ethereum.domain.{Block, ChainWeight}
56
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
6-
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
7-
import io.iohk.ethereum.network.p2p.messages.PV62
7+
import io.iohk.ethereum.network.p2p.MessageSerializable
88
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
9+
import io.iohk.ethereum.network.p2p.messages.{CommonMessages, PV62, PV64, Versions}
10+
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer}
911
import io.iohk.ethereum.utils.Config.SyncConfig
1012

1113
import scala.util.Random
@@ -21,29 +23,31 @@ class BlockBroadcast(val etcPeerManager: ActorRef, syncConfig: SyncConfig) {
2123
* @param newBlock, block to broadcast
2224
* @param handshakedPeers, to which the blocks will be broadcasted to
2325
*/
24-
def broadcastBlock(newBlock: NewBlock, handshakedPeers: Map[Peer, PeerInfo]): Unit = {
25-
val peersWithoutBlock = handshakedPeers.collect {
26-
case (peer, peerInfo) if shouldSendNewBlock(newBlock, peerInfo) => peer
27-
}.toSet
26+
def broadcastBlock(newBlock: BlockToBroadcast, handshakedPeers: Map[Peer, PeerInfo]): Unit = {
27+
val peersWithoutBlock = handshakedPeers.filter { case (_, peerInfo) =>
28+
shouldSendNewBlock(newBlock, peerInfo)
29+
}
2830

2931
broadcastNewBlock(newBlock, peersWithoutBlock)
3032

3133
if (syncConfig.broadcastNewBlockHashes) {
3234
// NOTE: the usefulness of this message is debatable, especially in private networks
33-
broadcastNewBlockHash(newBlock, peersWithoutBlock)
35+
broadcastNewBlockHash(newBlock, peersWithoutBlock.keySet)
3436
}
3537
}
3638

37-
private def shouldSendNewBlock(newBlock: NewBlock, peerInfo: PeerInfo): Boolean =
39+
private def shouldSendNewBlock(newBlock: BlockToBroadcast, peerInfo: PeerInfo): Boolean =
3840
newBlock.block.header.number > peerInfo.maxBlockNumber ||
3941
newBlock.chainWeight > peerInfo.chainWeight
4042

41-
private def broadcastNewBlock(newBlock: NewBlock, peers: Set[Peer]): Unit =
42-
obtainRandomPeerSubset(peers).foreach { peer =>
43-
etcPeerManager ! EtcPeerManagerActor.SendMessage(newBlock, peer.id)
43+
private def broadcastNewBlock(newBlock: BlockToBroadcast, peers: Map[Peer, PeerInfo]): Unit =
44+
obtainRandomPeerSubset(peers.keySet).foreach { peer =>
45+
val message: MessageSerializable =
46+
if (peers(peer).remoteStatus.protocolVersion == Versions.PV64) newBlock.as64 else newBlock.as63
47+
etcPeerManager ! EtcPeerManagerActor.SendMessage(message, peer.id)
4448
}
4549

46-
private def broadcastNewBlockHash(newBlock: NewBlock, peers: Set[Peer]): Unit = peers.foreach { peer =>
50+
private def broadcastNewBlockHash(newBlock: BlockToBroadcast, peers: Set[Peer]): Unit = peers.foreach { peer =>
4751
val newBlockHeader = newBlock.block.header
4852
val newBlockHashMsg = PV62.NewBlockHashes(Seq(BlockHash(newBlockHeader.hash, newBlockHeader.number)))
4953
etcPeerManager ! EtcPeerManagerActor.SendMessage(newBlockHashMsg, peer.id)
@@ -61,3 +65,22 @@ class BlockBroadcast(val etcPeerManager: ActorRef, syncConfig: SyncConfig) {
6165
Random.shuffle(peers).take(numberOfPeersToSend)
6266
}
6367
}
68+
69+
object BlockBroadcast {
70+
71+
/**
72+
* BlockToBroadcast was created to decouple block information from protocol new block messages
73+
* (they are different versions of NewBlock msg)
74+
*/
75+
case class BlockToBroadcast(block: Block, chainWeight: ChainWeight) {
76+
def as63: CommonMessages.NewBlock = CommonMessages.NewBlock(block, chainWeight.totalDifficulty)
77+
def as64: PV64.NewBlock = PV64.NewBlock(block, chainWeight)
78+
}
79+
object BlockToBroadcast {
80+
def apply(block: CommonMessages.NewBlock): BlockToBroadcast =
81+
BlockToBroadcast(block.block, ChainWeight.totalDifficultyOnly(block.totalDifficulty))
82+
83+
def apply(block: PV64.NewBlock): BlockToBroadcast =
84+
BlockToBroadcast(block.block, block.chainWeight)
85+
}
86+
}

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@ import io.iohk.ethereum.consensus.validators.Validators
77
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
88
import io.iohk.ethereum.domain.Blockchain
99
import io.iohk.ethereum.ledger.Ledger
10-
import io.iohk.ethereum.utils.BlockchainConfig
1110
import io.iohk.ethereum.utils.Config.SyncConfig
1211

1312
class SyncController(
1413
appStateStorage: AppStateStorage,
1514
blockchain: Blockchain,
16-
blockchainConfig: BlockchainConfig,
1715
fastSyncStateStorage: FastSyncStateStorage,
1816
ledger: Ledger,
1917
validators: Validators,
@@ -101,7 +99,6 @@ class SyncController(
10199
peerEventBus,
102100
ledger,
103101
blockchain,
104-
blockchainConfig,
105102
syncConfig,
106103
ommersPool,
107104
pendingTransactionsManager,
@@ -121,7 +118,6 @@ object SyncController {
121118
def props(
122119
appStateStorage: AppStateStorage,
123120
blockchain: Blockchain,
124-
blockchainConfig: BlockchainConfig,
125121
syncStateStorage: FastSyncStateStorage,
126122
ledger: Ledger,
127123
validators: Validators,
@@ -136,7 +132,6 @@ object SyncController {
136132
new SyncController(
137133
appStateStorage,
138134
blockchain,
139-
blockchainConfig,
140135
syncStateStorage,
141136
ledger,
142137
validators,

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockBroadcasterActor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.iohk.ethereum.blockchain.sync.regular
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Scheduler}
4+
import io.iohk.ethereum.blockchain.sync.BlockBroadcast.BlockToBroadcast
45
import io.iohk.ethereum.blockchain.sync.{BlacklistSupport, BlockBroadcast, PeerListSupport}
5-
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
66
import io.iohk.ethereum.utils.Config.SyncConfig
77

88
class BlockBroadcasterActor(
@@ -26,8 +26,8 @@ class BlockBroadcasterActor(
2626
}
2727
object BlockBroadcasterActor {
2828
sealed trait BroadcasterMsg
29-
case class BroadcastBlock(block: NewBlock) extends BroadcasterMsg
30-
case class BroadcastBlocks(blocks: List[NewBlock]) extends BroadcasterMsg
29+
case class BroadcastBlock(block: BlockToBroadcast) extends BroadcasterMsg
30+
case class BroadcastBlocks(blocks: List[BlockToBroadcast]) extends BroadcasterMsg
3131

3232
def props(
3333
broadcast: BlockBroadcast,

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import io.iohk.ethereum.domain._
2020
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
2121
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier
2222
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
23+
import io.iohk.ethereum.network.PeerId
24+
import io.iohk.ethereum.network.p2p.messages.{CommonMessages, PV64}
2325
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
2426
import io.iohk.ethereum.network.p2p.messages.PV62._
2527
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData}
@@ -57,7 +59,7 @@ class BlockFetcher(
5759
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
5860
peerEventBus ! Subscribe(
5961
MessageClassifier(
60-
Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code),
62+
Set(NewBlock.code, NewBlockHashes.code, BlockHeaders.code),
6163
PeerSelector.AllPeers
6264
)
6365
)
@@ -192,45 +194,51 @@ class BlockFetcher(
192194
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
193195

194196
fetchBlocks(newState)
195-
case MessageFromPeer(NewBlock(_, block, _), peerId) =>
196-
val newBlockNr = block.number
197-
val nextExpectedBlock = state.lastFullBlockNumber + 1
198-
199-
log.debug("Received NewBlock nr {}", newBlockNr)
200-
201-
// we're on top, so we can pass block directly to importer
202-
if (newBlockNr == nextExpectedBlock && state.isOnTop) {
203-
log.debug("Pass block directly to importer")
204-
val newState = state.withPeerForBlocks(peerId, Seq(newBlockNr)).withKnownTopAt(newBlockNr)
205-
state.importer ! OnTop
206-
state.importer ! ImportNewBlock(block, peerId)
207-
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
208-
context become started(newState)
209-
// there are some blocks waiting for import but it seems that we reached top on fetch side so we can enqueue new block for import
210-
} else if (newBlockNr == nextExpectedBlock && !state.isFetching && state.waitingHeaders.isEmpty) {
211-
log.debug("Enqueue new block for import")
212-
val newState = state.appendNewBlock(block, peerId)
213-
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
214-
context become started(newState)
215-
// waiting for some bodies but we don't have this header yet - at least we can use new block header
216-
} else if (newBlockNr == state.nextToLastBlock && !state.isFetchingHeaders) {
217-
log.debug("Waiting for bodies. Add only headers")
218-
val newState = state.appendHeaders(List(block.header))
219-
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
220-
fetchBlocks(newState)
221-
// we're far from top
222-
} else if (newBlockNr > nextExpectedBlock) {
223-
log.debug("Far from top")
224-
val newState = state.withKnownTopAt(newBlockNr)
225-
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
226-
fetchBlocks(newState)
227-
}
197+
case MessageFromPeer(CommonMessages.NewBlock(block, _), peerId) =>
198+
handleNewBlock(block, peerId, state)
199+
case MessageFromPeer(PV64.NewBlock(block, _), peerId) =>
200+
handleNewBlock(block, peerId, state)
228201
case BlockImportFailed(blockNr, reason) =>
229202
val (peerId, newState) = state.invalidateBlocksFrom(blockNr)
230203
peerId.foreach(id => peersClient ! BlacklistPeer(id, reason))
231204
fetchBlocks(newState)
232205
}
233206

207+
private def handleNewBlock(block: Block, peerId: PeerId, state: BlockFetcherState): Unit = {
208+
val newBlockNr = block.number
209+
val nextExpectedBlock = state.lastFullBlockNumber + 1
210+
211+
log.debug("Received NewBlock nr {}", newBlockNr)
212+
213+
// we're on top, so we can pass block directly to importer
214+
if (newBlockNr == nextExpectedBlock && state.isOnTop) {
215+
log.debug("Pass block directly to importer")
216+
val newState = state.withPeerForBlocks(peerId, Seq(newBlockNr)).withKnownTopAt(newBlockNr)
217+
state.importer ! OnTop
218+
state.importer ! ImportNewBlock(block, peerId)
219+
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
220+
context become started(newState)
221+
// there are some blocks waiting for import but it seems that we reached top on fetch side so we can enqueue new block for import
222+
} else if (newBlockNr == nextExpectedBlock && !state.isFetching && state.waitingHeaders.isEmpty) {
223+
log.debug("Enqueue new block for import")
224+
val newState = state.appendNewBlock(block, peerId)
225+
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
226+
context become started(newState)
227+
// waiting for some bodies but we don't have this header yet - at least we can use new block header
228+
} else if (newBlockNr == state.nextToLastBlock && !state.isFetchingHeaders) {
229+
log.debug("Waiting for bodies. Add only headers")
230+
val newState = state.appendHeaders(List(block.header))
231+
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
232+
fetchBlocks(newState)
233+
// we're far from top
234+
} else if (newBlockNr > nextExpectedBlock) {
235+
log.debug("Far from top")
236+
val newState = state.withKnownTopAt(newBlockNr)
237+
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
238+
fetchBlocks(newState)
239+
}
240+
}
241+
234242
private def handlePossibleTopUpdate(state: BlockFetcherState): Receive = {
235243
//by handling these type of messages, fetcher can received from network, fresh info about blocks on top
236244
//ex. After a successful handshake, fetcher will receive the info about the header of the peer best block

0 commit comments

Comments
 (0)