Skip to content

Commit 0191d54

Browse files
committed
Option to limit max NACKs per lost packet
1 parent 9ef04d4 commit 0191d54

File tree

2 files changed

+57
-14
lines changed

2 files changed

+57
-14
lines changed

pkg/nack/generator_interceptor.go

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ type GeneratorInterceptorFactory struct {
2121
// NewInterceptor constructs a new ReceiverInterceptor
2222
func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
2323
i := &GeneratorInterceptor{
24-
size: 512,
25-
skipLastN: 0,
26-
interval: time.Millisecond * 100,
27-
receiveLogs: map[uint32]*receiveLog{},
28-
close: make(chan struct{}),
29-
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
24+
size: 512,
25+
skipLastN: 0,
26+
maxNacksPerPacket: 0,
27+
interval: time.Millisecond * 100,
28+
receiveLogs: map[uint32]*receiveLog{},
29+
nackCountLogs: map[uint32]map[uint16]uint16{},
30+
close: make(chan struct{}),
31+
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
3032
}
3133

3234
for _, opt := range g.opts {
@@ -45,13 +47,15 @@ func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Inte
4547
// GeneratorInterceptor interceptor generates nack feedback messages.
4648
type GeneratorInterceptor struct {
4749
interceptor.NoOp
48-
size uint16
49-
skipLastN uint16
50-
interval time.Duration
51-
m sync.Mutex
52-
wg sync.WaitGroup
53-
close chan struct{}
54-
log logging.LeveledLogger
50+
size uint16
51+
skipLastN uint16
52+
maxNacksPerPacket uint16
53+
interval time.Duration
54+
m sync.Mutex
55+
wg sync.WaitGroup
56+
close chan struct{}
57+
log logging.LeveledLogger
58+
nackCountLogs map[uint32]map[uint16]uint16
5559

5660
receiveLogs map[uint32]*receiveLog
5761
receiveLogsMu sync.Mutex
@@ -131,6 +135,7 @@ func (n *GeneratorInterceptor) Close() error {
131135
return nil
132136
}
133137

138+
// nolint:gocognit
134139
func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
135140
defer n.wg.Done()
136141

@@ -147,14 +152,43 @@ func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
147152

148153
for ssrc, receiveLog := range n.receiveLogs {
149154
missing := receiveLog.missingSeqNumbers(n.skipLastN)
155+
156+
if len(missing) == 0 || n.nackCountLogs[ssrc] == nil {
157+
n.nackCountLogs[ssrc] = map[uint16]uint16{}
158+
}
150159
if len(missing) == 0 {
151160
continue
152161
}
153162

163+
filteredMissing := []uint16{}
164+
if n.maxNacksPerPacket > 0 {
165+
for _, missingSeq := range missing {
166+
if n.nackCountLogs[ssrc][missingSeq] < n.maxNacksPerPacket {
167+
filteredMissing = append(filteredMissing, missingSeq)
168+
}
169+
n.nackCountLogs[ssrc][missingSeq]++
170+
}
171+
} else {
172+
filteredMissing = missing
173+
}
174+
154175
nack := &rtcp.TransportLayerNack{
155176
SenderSSRC: senderSSRC,
156177
MediaSSRC: ssrc,
157-
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
178+
Nacks: rtcp.NackPairsFromSequenceNumbers(filteredMissing),
179+
}
180+
181+
for nackSeq := range n.nackCountLogs[ssrc] {
182+
isMissing := false
183+
for _, missingSeq := range missing {
184+
if missingSeq == nackSeq {
185+
isMissing = true
186+
break
187+
}
188+
}
189+
if !isMissing {
190+
delete(n.nackCountLogs[ssrc], nackSeq)
191+
}
158192
}
159193

160194
if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {

pkg/nack/generator_option.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ func GeneratorSkipLastN(skipLastN uint16) GeneratorOption {
3030
}
3131
}
3232

33+
// GeneratorMaxNacksPerPacket sets the maximum number of NACKs sent per missing packet, e.g. if set to 2, a missing
34+
// packet will only be NACKed at most twice. If set to 0 (default), max number of NACKs is unlimited
35+
func GeneratorMaxNacksPerPacket(maxNacks uint16) GeneratorOption {
36+
return func(r *GeneratorInterceptor) error {
37+
r.maxNacksPerPacket = maxNacks
38+
return nil
39+
}
40+
}
41+
3342
// GeneratorLog sets a logger for the interceptor
3443
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
3544
return func(r *GeneratorInterceptor) error {

0 commit comments

Comments
 (0)