From ac162853ab37292693efb28f424271f0fd32ea96 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Tue, 30 Nov 2021 23:17:48 +0530 Subject: [PATCH] fix(dot/network): move low reputation peer removal from network ConnManager to peer scoring logic (dot/peerstate) (#2068) - peerset service (doWorks goroutine) sends messages to network service about peers to connect, drop, reject, disconnect etc. Some of those message were being send without a peer id and set id. This was the reason why our excess peers were not getting removed. - To save ourselves from such a problem in future, processMessage now checks if peer id is empty or not. - Also change the resultMsgCh type to peerset.Message from interface{} Fixes #2039 --- dot/network/connmgr.go | 34 +--------------------------- dot/network/service.go | 12 +++++----- dot/network/state.go | 2 +- dot/peerset/handler.go | 6 +++-- dot/peerset/peerset.go | 45 +++++++++++++++++++++++++++---------- dot/peerset/peerset_test.go | 4 +--- 6 files changed, 45 insertions(+), 58 deletions(-) diff --git a/dot/network/connmgr.go b/dot/network/connmgr.go index d9cabfc9dc..81db9d9f1d 100644 --- a/dot/network/connmgr.go +++ b/dot/network/connmgr.go @@ -5,8 +5,6 @@ package network import ( "context" - "crypto/rand" - "math/big" "sync" "github.com/libp2p/go-libp2p-core/connmgr" @@ -130,40 +128,10 @@ func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID { func (cm *ConnManager) Connected(n network.Network, c network.Conn) { logger.Tracef( "Host %s connected to peer %s", n.LocalPeer(), c.RemotePeer()) + if cm.connectHandler != nil { cm.connectHandler(c.RemotePeer()) } - - cm.Lock() - defer cm.Unlock() - - over := len(n.Peers()) - cm.max - if over <= 0 { - return - } - - // TODO: peer scoring doesn't seem to prevent us from going over the max. - // if over the max peer count, disconnect from (total_peers - maximum) peers - // (#2039) - for i := 0; i < over; i++ { - unprotPeers := cm.unprotectedPeers(n.Peers()) - if len(unprotPeers) == 0 { - return - } - - i, err := rand.Int(rand.Reader, big.NewInt(int64(len(unprotPeers)))) - if err != nil { - logger.Errorf("error generating random number: %s", err) - return - } - - up := unprotPeers[i.Int64()] - logger.Tracef("Over max peer count, disconnecting from random unprotected peer %s", up) - err = n.ClosePeer(up) - if err != nil { - logger.Tracef("failed to close connection to peer %s", up) - } - } } // Disconnected is called when a connection closed diff --git a/dot/network/service.go b/dot/network/service.go index cdaac53a85..8c7faeb119 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -6,7 +6,6 @@ package network import ( "context" "errors" - "fmt" "math/big" "strings" "sync" @@ -670,6 +669,10 @@ func (s *Service) startPeerSetHandler() { func (s *Service) processMessage(msg peerset.Message) { peerID := msg.PeerID + if peerID == "" { + logger.Errorf("found empty peer id in peerset message") + return + } switch msg.Status { case peerset.Connect: addrInfo := s.host.h.Peerstore().PeerInfo(peerID) @@ -704,12 +707,7 @@ func (s *Service) startProcessingMsg() { select { case <-s.ctx.Done(): return - case m := <-msgCh: - msg, ok := m.(peerset.Message) - if !ok { - logger.Error(fmt.Sprintf("failed to get message from peerSet: type is %T instead of peerset.Message", m)) - continue - } + case msg := <-msgCh: s.processMessage(msg) } } diff --git a/dot/network/state.go b/dot/network/state.go index e776d192dc..b9467d02f3 100644 --- a/dot/network/state.go +++ b/dot/network/state.go @@ -79,5 +79,5 @@ type PeerRemove interface { type Peer interface { PeerReputation(peer.ID) (peerset.Reputation, error) SortedPeers(idx int) chan peer.IDSlice - Messages() chan interface{} + Messages() chan peerset.Message } diff --git a/dot/peerset/handler.go b/dot/peerset/handler.go index ac4b733ebb..0a807505df 100644 --- a/dot/peerset/handler.go +++ b/dot/peerset/handler.go @@ -3,7 +3,9 @@ package peerset -import "github.com/libp2p/go-libp2p-core/peer" +import ( + "github.com/libp2p/go-libp2p-core/peer" +) // Handler manages peerSet. type Handler struct { @@ -88,7 +90,7 @@ func (h *Handler) Incoming(setID int, peers ...peer.ID) { } // Messages return result message chan. -func (h *Handler) Messages() chan interface{} { +func (h *Handler) Messages() chan Message { return h.peerSet.resultMsgCh } diff --git a/dot/peerset/peerset.go b/dot/peerset/peerset.go index 72494e4279..8f1675897e 100644 --- a/dot/peerset/peerset.go +++ b/dot/peerset/peerset.go @@ -141,7 +141,7 @@ type PeerSet struct { // TODO: this will be useful for reserved only mode // this is for future purpose if reserved-only flag is enabled (#1888). isReservedOnly bool - resultMsgCh chan interface{} + resultMsgCh chan Message // time when the PeerSet was created. created time.Time // last time when we updated the reputations of connected nodes. @@ -183,6 +183,8 @@ func NewConfigSet(in, out uint32, reservedOnly bool, allocTime time.Duration) *C } return &ConfigSet{ + // Why are we using an array of config in the set, when we are + // using just one config Set: []*config{set}, } } @@ -228,8 +230,8 @@ func reputationTick(reput Reputation) Reputation { return reput.sub(diff) } -// updateTime updates the value of latestTimeUpdate and performs all the updates that happen -// over time, such as Reputation increases for staying connected. +// updateTime updates the value of latestTimeUpdate and performs all the updates that +// happen over time, such as Reputation increases for staying connected. func (ps *PeerSet) updateTime() error { currTime := time.Now() // identify the time difference between current time and last update time for peer reputation in seconds. @@ -282,8 +284,8 @@ func (ps *PeerSet) updateTime() error { } // reportPeer on report ReputationChange of the peer based on its behaviour, -// if the updated Reputation is below BannedThresholdValue then, this node need to be disconnected -// and a drop message for the peer is sent in order to disconnect. +// if the updated Reputation is below BannedThresholdValue then, this node need to +// be disconnected and a drop message for the peer is sent in order to disconnect. func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error { // we want reputations to be up-to-date before adjusting them. if err := ps.updateTime(); err != nil { @@ -516,8 +518,9 @@ func (ps *PeerSet) removePeer(setID int, peers ...peer.ID) error { return nil } -// incoming indicates that we have received an incoming connection. Must be answered either with -// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. +// incoming indicates that we have received an incoming connection. Must be answered +// either with a corresponding `Accept` or `Reject`, except if we were already +// connected to this peer. func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error { if err := ps.updateTime(); err != nil { return err @@ -527,7 +530,11 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error { for _, pid := range peers { if ps.isReservedOnly { if _, ok := ps.reservedNode[pid]; !ok { - ps.resultMsgCh <- Message{Status: Reject} + ps.resultMsgCh <- Message{ + Status: Reject, + setID: uint64(setID), + PeerID: pid, + } continue } } @@ -546,11 +553,24 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error { p := state.nodes[pid] switch { case p.getReputation() < BannedThresholdValue: - ps.resultMsgCh <- Message{Status: Reject} + ps.resultMsgCh <- Message{ + Status: Reject, + setID: uint64(setID), + PeerID: pid, + } case state.tryAcceptIncoming(setID, pid) != nil: - ps.resultMsgCh <- Message{Status: Reject} + ps.resultMsgCh <- Message{ + Status: Reject, + setID: uint64(setID), + PeerID: pid, + } default: - ps.resultMsgCh <- Message{Status: Accept} + logger.Debugf("incoming connection accepted from peer %s", pid) + ps.resultMsgCh <- Message{ + Status: Accept, + setID: uint64(setID), + PeerID: pid, + } } } @@ -593,6 +613,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e } ps.resultMsgCh <- Message{ Status: Drop, + setID: uint64(setIdx), PeerID: pid, } @@ -610,7 +631,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e // start handles all the action for the peerSet. func (ps *PeerSet) start(aq chan action) { ps.actionQueue = aq - ps.resultMsgCh = make(chan interface{}, msgChanSize) + ps.resultMsgCh = make(chan Message, msgChanSize) go ps.doWork() } diff --git a/dot/peerset/peerset_test.go b/dot/peerset/peerset_test.go index 5c4e37d3ab..3c094734c8 100644 --- a/dot/peerset/peerset_test.go +++ b/dot/peerset/peerset_test.go @@ -68,9 +68,7 @@ func TestAddReservedPeers(t *testing.T) { if len(ps.resultMsgCh) == 0 { break } - m := <-ps.resultMsgCh - msg, ok := m.(Message) - require.True(t, ok) + msg := <-ps.resultMsgCh require.Equal(t, expectedMsgs[i], msg) } }