forked from fetchai/cosmos-consensus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
peer_set.go
147 lines (126 loc) · 3.35 KB
/
peer_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package p2p
import (
"net"
"sync"
)
// IPeerSet has a (immutable) subset of the methods of PeerSet.
type IPeerSet interface {
Has(key ID) bool
HasIP(ip net.IP) bool
Get(key ID) Peer
List() []Peer
Size() int
}
//-----------------------------------------------------------------------------
// PeerSet is a special structure for keeping a table of peers.
// Iteration over the peers is super fast and thread-safe.
type PeerSet struct {
mtx sync.Mutex
lookup map[ID]*peerSetItem
list []Peer
}
type peerSetItem struct {
peer Peer
index int
}
// NewPeerSet creates a new peerSet with a list of initial capacity of 256 items.
func NewPeerSet() *PeerSet {
return &PeerSet{
lookup: make(map[ID]*peerSetItem),
list: make([]Peer, 0, 256),
}
}
// Add adds the peer to the PeerSet.
// It returns an error carrying the reason, if the peer is already present.
func (ps *PeerSet) Add(peer Peer) error {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.lookup[peer.ID()] != nil {
return ErrSwitchDuplicatePeerID{peer.ID()}
}
index := len(ps.list)
// Appending is safe even with other goroutines
// iterating over the ps.list slice.
ps.list = append(ps.list, peer)
ps.lookup[peer.ID()] = &peerSetItem{peer, index}
return nil
}
// Has returns true if the set contains the peer referred to by this
// peerKey, otherwise false.
func (ps *PeerSet) Has(peerKey ID) bool {
ps.mtx.Lock()
_, ok := ps.lookup[peerKey]
ps.mtx.Unlock()
return ok
}
// HasIP returns true if the set contains the peer referred to by this IP
// address, otherwise false.
func (ps *PeerSet) HasIP(peerIP net.IP) bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.hasIP(peerIP)
}
// hasIP does not acquire a lock so it can be used in public methods which
// already lock.
func (ps *PeerSet) hasIP(peerIP net.IP) bool {
for _, item := range ps.lookup {
if item.peer.RemoteIP().Equal(peerIP) {
return true
}
}
return false
}
// Get looks up a peer by the provided peerKey. Returns nil if peer is not
// found.
func (ps *PeerSet) Get(peerKey ID) Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
item, ok := ps.lookup[peerKey]
if ok {
return item.peer
}
return nil
}
// Remove discards peer by its Key, if the peer was previously memoized.
// Returns true if the peer was removed, and false if it was not found.
// in the set.
func (ps *PeerSet) Remove(peer Peer) bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
item := ps.lookup[peer.ID()]
if item == nil {
return false
}
index := item.index
// Create a new copy of the list but with one less item.
// (we must copy because we'll be mutating the list).
newList := make([]Peer, len(ps.list)-1)
copy(newList, ps.list)
// If it's the last peer, that's an easy special case.
if index == len(ps.list)-1 {
ps.list = newList
delete(ps.lookup, peer.ID())
return true
}
// Replace the popped item with the last item in the old list.
lastPeer := ps.list[len(ps.list)-1]
lastPeerKey := lastPeer.ID()
lastPeerItem := ps.lookup[lastPeerKey]
newList[index] = lastPeer
lastPeerItem.index = index
ps.list = newList
delete(ps.lookup, peer.ID())
return true
}
// Size returns the number of unique items in the peerSet.
func (ps *PeerSet) Size() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return len(ps.list)
}
// List returns the threadsafe list of peers.
func (ps *PeerSet) List() []Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.list
}