-
Notifications
You must be signed in to change notification settings - Fork 1
/
linkfwddelay.go
103 lines (85 loc) · 2.26 KB
/
linkfwddelay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package netem
//
// Link forwarding: delay-aware algorithm
//
import (
"fmt"
"time"
)
// LinkFwdWithDelay is an implementation of link forwarding that only
// delays packets without losses and deep packet inspection.
func LinkFwdWithDelay(cfg *LinkFwdConfig) {
// informative logging
linkName := fmt.Sprintf(
"linkFwdWithDelay %s<->%s",
cfg.Reader.InterfaceName(),
cfg.Writer.InterfaceName(),
)
cfg.Logger.Debugf("netem: %s up", linkName)
defer cfg.Logger.Debugf("netem: %s down", linkName)
// synchronize with stop
defer cfg.Wg.Done()
// inflight contains the frames currently in flight
var inflight []*Frame
// ticker to schedule sending frames
const initialTimer = 100 * time.Millisecond
ticker := time.NewTicker(initialTimer)
defer ticker.Stop()
for {
select {
case <-cfg.Reader.StackClosed():
return
case <-cfg.Reader.FrameAvailable():
frame, err := cfg.Reader.ReadFrameNonblocking()
if err != nil {
cfg.Logger.Warnf("netem: ReadFrameNonblocking: %s", err.Error())
continue
}
// avoid potential data races
frame = frame.ShallowCopy()
// create frame deadline
d := time.Now().Add(cfg.OneWayDelay)
frame.Deadline = d
// register as inflight and possibly rearm timer
inflight = append(inflight, frame)
if len(inflight) == 1 {
d := time.Until(frame.Deadline)
if d <= 0 {
d = time.Nanosecond // avoid panic
}
ticker.Reset(d)
}
case <-ticker.C:
// avoid wasting CPU with a fast timer if there's nothing to do
if len(inflight) <= 0 {
ticker.Reset(initialTimer)
continue
}
// if the front frame is still pending, rearm timer
frame := inflight[0]
d := time.Until(frame.Deadline)
if d > 0 {
ticker.Reset(d)
continue
}
// avoid leaking the frame deadline to the caller
frame.Deadline = time.Time{}
// then deliver the front frame
inflight = inflight[1:]
_ = cfg.Writer.WriteFrame(frame)
// again, if the channel is empty, avoid wasting CPU
if len(inflight) <= 0 {
ticker.Reset(initialTimer)
continue
}
// rearm timer for the next incoming frame
frame = inflight[0]
d = time.Until(frame.Deadline)
if d <= 0 {
d = time.Nanosecond // avoid panic
}
ticker.Reset(d)
}
}
}
var _ = LinkFwdFunc(LinkFwdWithDelay)