@@ -20,8 +20,13 @@ import io.iohk.ethereum.network.p2p.messages.WireProtocol.Disconnect
2020 * In order to do so it receives events for peer creation, disconnection and new messages being sent and
2121 * received by each peer.
2222 */
23- class EtcPeerManagerActor (peerManagerActor : ActorRef , peerEventBusActor : ActorRef , appStateStorage : AppStateStorage ,
24- forkResolverOpt : Option [ForkResolver ]) extends Actor with ActorLogging {
23+ class EtcPeerManagerActor (
24+ peerManagerActor : ActorRef ,
25+ peerEventBusActor : ActorRef ,
26+ appStateStorage : AppStateStorage ,
27+ forkResolverOpt : Option [ForkResolver ]
28+ ) extends Actor
29+ with ActorLogging {
2530
2631 private type PeersWithInfo = Map [PeerId , PeerWithInfo ]
2732
@@ -42,6 +47,7 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
4247 val peerBestBlockIsItsGenesisBlock = peerInfo.bestBlockHash == peerInfo.remoteStatus.genesisHash
4348 peerBestBlockIsItsGenesisBlock || (! peerBestBlockIsItsGenesisBlock && peerInfo.maxBlockNumber > 0 )
4449 }
50+
4551 /**
4652 * Processes both messages for sending messages and for requesting peer information
4753 *
@@ -56,10 +62,11 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
5662 })
5763
5864 case PeerInfoRequest (peerId) =>
59- val peerInfoOpt = peersWithInfo.get(peerId).map{ case PeerWithInfo (_, peerInfo) => peerInfo}
65+ val peerInfoOpt = peersWithInfo.get(peerId).map { case PeerWithInfo (_, peerInfo) => peerInfo }
6066 sender() ! PeerInfoResponse (peerInfoOpt)
6167
6268 case EtcPeerManagerActor .SendMessage (message, peerId) =>
69+ NetworkMetrics .SentMessagesCounter .increment()
6370 val newPeersWithInfo = updatePeersWithInfo(peersWithInfo, peerId, message.underlyingMsg, handleSentMessage)
6471 peerManagerActor ! PeerManagerActor .SendMessage (message, peerId)
6572 context become handleMessages(newPeersWithInfo)
@@ -74,6 +81,7 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
7481
7582 case MessageFromPeer (message, peerId) if peersWithInfo.contains(peerId) =>
7683 val newPeersWithInfo = updatePeersWithInfo(peersWithInfo, peerId, message, handleReceivedMessage)
84+ NetworkMetrics .ReceivedMessagesCounter .increment()
7785 context become handleMessages(newPeersWithInfo)
7886
7987 case PeerHandshakeSuccessful (peer, peerInfo : PeerInfo ) =>
@@ -82,11 +90,13 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
8290
8391 // Ask for the highest block from the peer
8492 peer.ref ! SendMessage (GetBlockHeaders (Right (peerInfo.remoteStatus.bestHash), 1 , 0 , false ))
93+ NetworkMetrics .registerAddHandshakedPeer(peer)
8594 context become handleMessages(peersWithInfo + (peer.id -> PeerWithInfo (peer, peerInfo)))
8695
8796 case PeerDisconnected (peerId) if peersWithInfo.contains(peerId) =>
8897 peerEventBusActor ! Unsubscribe (PeerDisconnectedClassifier (PeerSelector .WithId (peerId)))
8998 peerEventBusActor ! Unsubscribe (MessageClassifier (msgCodesWithInfo, PeerSelector .WithId (peerId)))
99+ NetworkMetrics .registerRemoveHandshakedPeer(peersWithInfo(peerId).peer)
90100 context become handleMessages(peersWithInfo - peerId)
91101
92102 }
@@ -100,9 +110,13 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
100110 * @param messageHandler for processing the message and obtaining the new peerInfo
101111 * @return new information for each peer
102112 */
103- private def updatePeersWithInfo (peers : PeersWithInfo , peerId : PeerId , message : Message ,
104- messageHandler : (Message , PeerWithInfo ) => PeerInfo ): PeersWithInfo = {
105- if (peers.contains(peerId)){
113+ private def updatePeersWithInfo (
114+ peers : PeersWithInfo ,
115+ peerId : PeerId ,
116+ message : Message ,
117+ messageHandler : (Message , PeerWithInfo ) => PeerInfo
118+ ): PeersWithInfo = {
119+ if (peers.contains(peerId)) {
106120 val peerWithInfo = peers(peerId)
107121 val newPeerInfo = messageHandler(message, peerWithInfo)
108122 peers + (peerId -> peerWithInfo.copy(peerInfo = newPeerInfo))
@@ -130,11 +144,9 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
130144 private def handleReceivedMessage (message : Message , initialPeerWithInfo : PeerWithInfo ): PeerInfo = {
131145 (updateTotalDifficulty(message) _
132146 andThen updateForkAccepted(message, initialPeerWithInfo.peer)
133- andThen updateMaxBlock(message)
134- )(initialPeerWithInfo.peerInfo)
147+ andThen updateMaxBlock(message))(initialPeerWithInfo.peerInfo)
135148 }
136149
137-
138150 /**
139151 * Processes the message and updates the total difficulty of the peer
140152 *
@@ -217,11 +229,13 @@ object EtcPeerManagerActor {
217229
218230 val msgCodesWithInfo : Set [Int ] = Set (BlockHeaders .code, NewBlock .code, NewBlockHashes .code)
219231
220- case class PeerInfo (remoteStatus : Status ,
221- totalDifficulty : BigInt ,
222- forkAccepted : Boolean ,
223- maxBlockNumber : BigInt ,
224- bestBlockHash : ByteString ) extends HandshakeResult {
232+ case class PeerInfo (
233+ remoteStatus : Status ,
234+ totalDifficulty : BigInt ,
235+ forkAccepted : Boolean ,
236+ maxBlockNumber : BigInt ,
237+ bestBlockHash : ByteString
238+ ) extends HandshakeResult {
225239
226240 def withTotalDifficulty (totalDifficulty : BigInt ): PeerInfo = copy(totalDifficulty = totalDifficulty)
227241
@@ -253,8 +267,12 @@ object EtcPeerManagerActor {
253267
254268 case class SendMessage (message : MessageSerializable , peerId : PeerId )
255269
256- def props (peerManagerActor : ActorRef , peerEventBusActor : ActorRef ,
257- appStateStorage : AppStateStorage , forkResolverOpt : Option [ForkResolver ]): Props =
270+ def props (
271+ peerManagerActor : ActorRef ,
272+ peerEventBusActor : ActorRef ,
273+ appStateStorage : AppStateStorage ,
274+ forkResolverOpt : Option [ForkResolver ]
275+ ): Props =
258276 Props (new EtcPeerManagerActor (peerManagerActor, peerEventBusActor, appStateStorage, forkResolverOpt))
259277
260278}
0 commit comments