Skip to content

Commit fe728b2

Browse files
committed
add more comments and tests
1 parent 38cff0f commit fe728b2

File tree

2 files changed

+253
-135
lines changed

2 files changed

+253
-135
lines changed

p2p/net/swarm/dial_worker.go

Lines changed: 83 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,20 @@ type pendRequest struct {
3333
addrs map[ma.Multiaddr]struct{} // pending addr dials
3434
}
3535

36+
// addrDial tracks dials to a particular multiaddress.
3637
type addrDial struct {
37-
addr ma.Multiaddr
38-
ctx context.Context
39-
conn *Conn
40-
err error
38+
// addr is the address dialed
39+
addr ma.Multiaddr
40+
// ctx is the context used for dialing the address
41+
ctx context.Context
42+
// conn is the established connection on success
43+
conn *Conn
44+
// err is the err on dialing the address
45+
err error
46+
// requests is the list of dialRequests interested in this dial
4147
requests []int
42-
dialed bool
48+
// dialed indicates whether we have triggered the dial to the address
49+
dialed bool
4350
}
4451

4552
type dialWorker struct {
@@ -78,20 +85,22 @@ func (w *dialWorker) loop() {
7885
defer w.wg.Done()
7986
defer w.s.limiter.clearAllPeerDials(w.peer)
8087

81-
// used to signal readiness to dial and completion of the dial
82-
ready := make(chan struct{})
83-
close(ready)
88+
// dq is used to pace dials to different addresses of the peer
8489
dq := newDialQueue()
90+
// currDials is the number of dials in flight
8591
currDials := 0
8692
st := w.cl.Now()
93+
// timer is the timer used to trigger dials
8794
timer := w.cl.InstantTimer(st.Add(math.MaxInt64))
8895
timerRunning := true
89-
scheduleNext := func() {
96+
// scheduleNextDial updates timer for triggering the next dial
97+
scheduleNextDial := func() {
9098
if timerRunning && !timer.Stop() {
9199
<-timer.Ch()
92100
}
93101
timerRunning = false
94102
if dq.len() > 0 {
103+
// if there are no dials in flight, trigger the next dials immediately
95104
if currDials == 0 {
96105
timer.Reset(st)
97106
} else {
@@ -196,9 +205,11 @@ loop:
196205
dq.add(network.AddrDelay{Addr: a, Delay: addrDelay[a]})
197206
}
198207
}
199-
scheduleNext()
208+
scheduleNextDial()
200209

201210
case <-timer.Ch():
211+
// we dont check the delay here because an early trigger means all in flight
212+
// dials have completed
202213
for _, adelay := range dq.nextBatch() {
203214
// spawn the dial
204215
ad := w.pending[adelay.Addr]
@@ -211,7 +222,7 @@ loop:
211222
}
212223
}
213224
timerRunning = false
214-
scheduleNext()
225+
scheduleNextDial()
215226

216227
case res := <-w.resch:
217228
if res.Conn != nil {
@@ -255,7 +266,8 @@ loop:
255266
w.s.backf.AddBackoff(w.peer, res.Addr)
256267
}
257268
w.dispatchError(ad, res.Err)
258-
scheduleNext()
269+
// only schedule next dial on error
270+
scheduleNextDial()
259271
}
260272
}
261273
}
@@ -294,56 +306,67 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) {
294306
// this is necessary to support active listen scenarios, where a new dial comes in while
295307
// another dial is in progress, and needs to do a direct connection without inhibitions from
296308
// dial backoff.
297-
// it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff
298-
// regresses without this.
299309
if err == ErrDialBackoff {
300310
delete(w.pending, ad.addr)
301311
}
302312
}
303313

314+
// rankAddrs ranks addresses for dialing. if it's a simConnect request we
315+
// dial all addresses immediately without any delay
304316
func (w *dialWorker) rankAddrs(addrs []ma.Multiaddr, isSimConnect bool) []network.AddrDelay {
305317
if isSimConnect {
306318
return noDelayRanker(addrs)
307319
}
308320
return w.s.dialRanker(addrs)
309321
}
310322

323+
// dialQueue is a priority queue used to schedule dials
311324
type dialQueue struct {
312-
q []network.AddrDelay
325+
// q is the queue maintained as a heap
326+
q []network.AddrDelay
327+
// pos is the reverse map from address to its position in q
328+
// the reverse map is required to provide efficient updates
313329
pos map[ma.Multiaddr]int
314330
}
315331

316332
func newDialQueue() *dialQueue {
317333
return &dialQueue{pos: make(map[ma.Multiaddr]int)}
318334
}
319335

336+
// add adds adelay to the queue. if another elements exists in the queue with
337+
// the same address, it replaces that element.
320338
func (dq *dialQueue) add(adelay network.AddrDelay) {
321339
dq.remove(adelay.Addr)
322340
dq.q = append(dq.q, adelay)
323341
dq.pos[adelay.Addr] = len(dq.q) - 1
324342
dq.heapify(len(dq.q) - 1)
325343
}
326344

345+
// swap swaps elements at i and j maintaining the reverse map pos.
327346
func (dq *dialQueue) swap(i, j int) {
328347
dq.pos[dq.q[i].Addr] = j
329348
dq.pos[dq.q[j].Addr] = i
330349
dq.q[i], dq.q[j] = dq.q[j], dq.q[i]
331350
}
332351

352+
// len is the length of the queue. Calling top on an empty queue panics.
333353
func (dq *dialQueue) len() int {
334354
return len(dq.q)
335355
}
336356

357+
// top returns the top element of the queue
337358
func (dq *dialQueue) top() network.AddrDelay {
338359
return dq.q[0]
339360
}
340361

362+
// pop removes the top element from the queue and returns it
341363
func (dq *dialQueue) pop() network.AddrDelay {
342364
v := dq.q[0]
343365
dq.remove(v.Addr)
344366
return v
345367
}
346368

369+
// remove removes the element in the queue with address a
347370
func (dq *dialQueue) remove(a ma.Multiaddr) {
348371
pos, ok := dq.pos[a]
349372
if !ok {
@@ -352,66 +375,69 @@ func (dq *dialQueue) remove(a ma.Multiaddr) {
352375
dq.swap(pos, len(dq.q)-1)
353376
dq.q = dq.q[:len(dq.q)-1]
354377
delete(dq.pos, a)
355-
dq.heapify(pos)
378+
if pos < len(dq.q) {
379+
dq.heapify(pos)
380+
}
356381
}
357382

383+
// heapify fixes the heap property for element at position i
358384
func (dq *dialQueue) heapify(i int) {
359385
if dq.len() == 0 {
360386
return
361387
}
388+
dq.fixdown(i)
389+
dq.fixup(i)
390+
}
391+
392+
func (dq *dialQueue) fixup(i int) {
393+
if dq.len() == 0 || i == 0 {
394+
return
395+
}
396+
for i != 0 {
397+
p := (i - 1) / 2
398+
if dq.q[i].Delay < dq.q[p].Delay {
399+
dq.swap(i, p)
400+
i = p
401+
continue
402+
}
403+
break
404+
}
405+
}
406+
407+
func (dq *dialQueue) fixdown(i int) {
408+
if i >= dq.len() {
409+
return
410+
}
362411
for {
363-
v := dq.q[i].Delay
364412
l, r := 2*i+1, 2*i+2
365413
if l >= dq.len() && r >= dq.len() {
366-
if i == 0 {
367-
return
368-
}
369-
i = (i - 1) / 2
370-
continue
414+
break
371415
}
372-
lv := dq.q[l].Delay
373-
if v <= lv {
374-
if r < dq.len() {
375-
rv := dq.q[r].Delay
376-
if v <= rv {
377-
if i == 0 {
378-
return
379-
}
380-
i = (i - 1) / 2
381-
continue
382-
} else {
383-
dq.swap(i, r)
384-
i = r
385-
continue
386-
}
387-
} else {
388-
if i == 0 {
389-
return
390-
}
391-
i = (i - 1) / 2
392-
continue
393-
}
394-
} else {
395-
if r < dq.len() {
396-
rv := dq.q[r].Delay
397-
if lv <= rv {
398-
dq.swap(i, l)
399-
i = l
400-
continue
401-
} else {
402-
dq.swap(i, r)
403-
i = r
404-
continue
405-
}
406-
} else {
416+
if r >= dq.len() {
417+
if dq.q[i].Delay > dq.q[l].Delay {
407418
dq.swap(i, l)
408419
i = l
409420
continue
410421
}
422+
break
423+
}
424+
v, lv, rv := dq.q[i].Delay, dq.q[l].Delay, dq.q[r].Delay
425+
if lv < v && lv <= rv {
426+
dq.swap(i, l)
427+
i = l
428+
continue
429+
}
430+
if rv < v && rv <= lv {
431+
dq.swap(i, r)
432+
i = r
433+
continue
411434
}
435+
break
412436
}
413437
}
414438

439+
// nextBatch returns all the elements in the queue with delay equal to the top element
440+
// of the queue
415441
func (dq *dialQueue) nextBatch() []network.AddrDelay {
416442
if dq.len() == 0 {
417443
return nil

0 commit comments

Comments
 (0)