generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 97
/
peering.go
321 lines (273 loc) · 8.36 KB
/
peering.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package peering
import (
"context"
"errors"
"math/rand"
"strconv"
"sync"
"time"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
const (
// maxBackoff is the maximum time between reconnect attempts.
maxBackoff = 10 * time.Minute
// The backoff will be cut off when we get within 10% of the actual max.
// If we go over the max, we'll adjust the delay down to a random value
// between 90-100% of the max backoff.
maxBackoffJitter = 10 // %
connmgrTag = "ipfs-peering"
// This needs to be sufficient to prevent two sides from simultaneously
// dialing.
initialDelay = 5 * time.Second
)
var logger = log.Logger("peering")
type State uint
func (s State) String() string {
switch s {
case StateInit:
return "init"
case StateRunning:
return "running"
case StateStopped:
return "stopped"
default:
return "unknown peering state: " + strconv.FormatUint(uint64(s), 10)
}
}
const (
StateInit State = iota
StateRunning
StateStopped
)
// peerHandler keeps track of all state related to a specific "peering" peer.
type peerHandler struct {
peer peer.ID
host host.Host
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
addrs []multiaddr.Multiaddr
reconnectTimer *time.Timer
nextDelay time.Duration
}
// setAddrs sets the addresses for this peer.
func (ph *peerHandler) setAddrs(addrs []multiaddr.Multiaddr) {
// Not strictly necessary, but it helps to not trust the calling code.
addrCopy := make([]multiaddr.Multiaddr, len(addrs))
copy(addrCopy, addrs)
ph.mu.Lock()
defer ph.mu.Unlock()
ph.addrs = addrCopy
}
// getAddrs returns a shared slice of addresses for this peer. Do not modify.
func (ph *peerHandler) getAddrs() []multiaddr.Multiaddr {
ph.mu.Lock()
defer ph.mu.Unlock()
return ph.addrs
}
// stop permanently stops the peer handler.
func (ph *peerHandler) stop() {
ph.cancel()
ph.mu.Lock()
defer ph.mu.Unlock()
if ph.reconnectTimer != nil {
ph.reconnectTimer.Stop()
ph.reconnectTimer = nil
}
}
func (ph *peerHandler) nextBackoff() time.Duration {
if ph.nextDelay < maxBackoff {
ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay)))
}
// If we've gone over the max backoff, reduce it under the max.
if ph.nextDelay > maxBackoff {
ph.nextDelay = maxBackoff
// randomize the backoff a bit (10%).
ph.nextDelay -= time.Duration(rand.Int63n(int64(maxBackoff) * maxBackoffJitter / 100))
}
return ph.nextDelay
}
func (ph *peerHandler) reconnect() {
// Try connecting
addrs := ph.getAddrs()
logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs)
err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs})
if err != nil {
logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err)
// Ok, we failed. Extend the timeout.
ph.mu.Lock()
if ph.reconnectTimer != nil {
// Only counts if the reconnectTimer still exists. If not, a
// connection _was_ somehow established.
ph.reconnectTimer.Reset(ph.nextBackoff())
}
// Otherwise, someone else has stopped us so we can assume that
// we're either connected or someone else will start us.
ph.mu.Unlock()
}
// Always call this. We could have connected since we processed the
// error.
ph.stopIfConnected()
}
func (ph *peerHandler) stopIfConnected() {
ph.mu.Lock()
defer ph.mu.Unlock()
if ph.reconnectTimer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected {
logger.Debugw("successfully reconnected", "peer", ph.peer)
ph.reconnectTimer.Stop()
ph.reconnectTimer = nil
ph.nextDelay = initialDelay
}
}
// startIfDisconnected is the inverse of stopIfConnected.
func (ph *peerHandler) startIfDisconnected() {
ph.mu.Lock()
defer ph.mu.Unlock()
if ph.reconnectTimer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected {
logger.Debugw("disconnected from peer", "peer", ph.peer)
// Always start with a short timeout so we can stagger things a bit.
ph.reconnectTimer = time.AfterFunc(ph.nextBackoff(), ph.reconnect)
}
}
// PeeringService maintains connections to specified peers, reconnecting on
// disconnect with a back-off.
type PeeringService struct {
host host.Host
mu sync.RWMutex
peers map[peer.ID]*peerHandler
state State
}
// NewPeeringService constructs a new peering service. Peers can be added and
// removed immediately, but connections won't be formed until `Start` is called.
func NewPeeringService(host host.Host) *PeeringService {
return &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)}
}
// Start starts the peering service, connecting and maintaining connections to
// all registered peers. It returns an error if the service has already been
// stopped.
func (ps *PeeringService) Start() error {
ps.mu.Lock()
defer ps.mu.Unlock()
switch ps.state {
case StateInit:
logger.Infow("starting")
case StateRunning:
return nil
case StateStopped:
return errors.New("already stopped")
}
ps.host.Network().Notify((*netNotifee)(ps))
ps.state = StateRunning
for _, handler := range ps.peers {
go handler.startIfDisconnected()
}
return nil
}
// GetState get the State of the PeeringService.
func (ps *PeeringService) GetState() State {
ps.mu.RLock()
defer ps.mu.RUnlock()
return ps.state
}
// Stop stops the peering service.
func (ps *PeeringService) Stop() {
ps.host.Network().StopNotify((*netNotifee)(ps))
ps.mu.Lock()
defer ps.mu.Unlock()
switch ps.state {
case StateInit, StateRunning:
logger.Infow("stopping")
for _, handler := range ps.peers {
handler.stop()
}
ps.state = StateStopped
}
}
// AddPeer adds a peer to the peering service. This function may be safely
// called at any time: before the service is started, while running, or after it
// stops.
//
// Add peer may also be called multiple times for the same peer. The new
// addresses will replace the old.
func (ps *PeeringService) AddPeer(info peer.AddrInfo) {
ps.mu.Lock()
defer ps.mu.Unlock()
if handler, ok := ps.peers[info.ID]; ok {
logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs)
handler.setAddrs(info.Addrs)
} else {
logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs)
ps.host.ConnManager().Protect(info.ID, connmgrTag)
handler = &peerHandler{
host: ps.host,
peer: info.ID,
addrs: info.Addrs,
nextDelay: initialDelay,
}
handler.ctx, handler.cancel = context.WithCancel(context.Background())
ps.peers[info.ID] = handler
switch ps.state {
case StateRunning:
go handler.startIfDisconnected()
case StateStopped:
// We still construct everything in this state because
// it's easier to reason about. But we should still free
// resources.
handler.cancel()
}
}
}
// ListPeers lists peers in the peering service.
func (ps *PeeringService) ListPeers() []peer.AddrInfo {
ps.mu.RLock()
defer ps.mu.RUnlock()
out := make([]peer.AddrInfo, 0, len(ps.peers))
for id, addrs := range ps.peers {
ai := peer.AddrInfo{ID: id}
ai.Addrs = append(ai.Addrs, addrs.addrs...)
out = append(out, ai)
}
return out
}
// RemovePeer removes a peer from the peering service. This function may be
// safely called at any time: before the service is started, while running, or
// after it stops.
func (ps *PeeringService) RemovePeer(id peer.ID) {
ps.mu.Lock()
defer ps.mu.Unlock()
if handler, ok := ps.peers[id]; ok {
logger.Infow("peer removed", "peer", id)
ps.host.ConnManager().Unprotect(id, connmgrTag)
handler.stop()
delete(ps.peers, id)
}
}
type netNotifee PeeringService
func (nn *netNotifee) Connected(_ network.Network, c network.Conn) {
ps := (*PeeringService)(nn)
p := c.RemotePeer()
ps.mu.RLock()
defer ps.mu.RUnlock()
if handler, ok := ps.peers[p]; ok {
// use a goroutine to avoid blocking events.
go handler.stopIfConnected()
}
}
func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) {
ps := (*PeeringService)(nn)
p := c.RemotePeer()
ps.mu.RLock()
defer ps.mu.RUnlock()
if handler, ok := ps.peers[p]; ok {
// use a goroutine to avoid blocking events.
go handler.startIfDisconnected()
}
}
func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {}
func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {}
func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {}
func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {}