Skip to content

Commit 049f4cd

Browse files
AntonitoSean-Der
authored andcommitted
Implement Naive PLI Generator
The Naive PLI Generator sends a PLI packet for each new track that supports PLI, and then keep sending packets at a constant interval.
1 parent 585f01c commit 049f4cd

File tree

6 files changed

+303
-2
lines changed

6 files changed

+303
-2
lines changed

AUTHORS.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Adam Kiss <masterada@gmail.com>
88
adamroach <adam@nostrum.com>
99
Aditya Kumar <k.aditya00@gmail.com>
1010
aler9 <46489434+aler9@users.noreply.github.com>
11+
Antoine <antoine@tenten.app>
1112
Antoine Baché <antoine@tenten.app>
1213
Atsushi Watanabe <atsushi.w@ieee.org>
1314
Bobby Peck <rpeck@mux.com>
@@ -18,8 +19,8 @@ Kevin Caffrey <kcaffrey@gmail.com>
1819
Maksim Nesterov <msnesterov@avito.ru>
1920
Mathis Engelbart <mathis.engelbart@gmail.com>
2021
Sean DuBois <sean@siobud.com>
21-
ziminghua <565209960@qq.com>
2222
Steffen Vogel <post@steffenvogel.de>
23+
ziminghua <565209960@qq.com>
2324

2425
# List of contributors not appearing in Git history
2526

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@ by anyone. With the following tenets in mind.
3434
* [Transport Wide Congestion Control Feedback](https://github.com/pion/interceptor/tree/master/pkg/twcc)
3535
* [Packet Dump](https://github.com/pion/interceptor/tree/master/pkg/packetdump)
3636
* [Google Congestion Control](https://github.com/pion/interceptor/tree/master/pkg/gcc)
37+
* [Stats](https://github.com/pion/interceptor/tree/master/pkg/stats) A [webrtc-stats](https://www.w3.org/TR/webrtc-stats/) compliant statistics generation
38+
* [Interval PLI](https://github.com/pion/interceptor/tree/master/pkg/intervalpli) Generate PLI on a interval. Useful when no decoder is available.
3739

3840
### Planned Interceptors
3941
* Bandwidth Estimation
4042
- [NADA](https://tools.ietf.org/html/rfc8698)
4143
* JitterBuffer, re-order packets and wait for arrival
4244
* [FlexFec](https://tools.ietf.org/html/draft-ietf-payload-flexible-fec-scheme-20)
43-
* [webrtc-stats](https://www.w3.org/TR/webrtc-stats/) compliant statistics generation
4445
* [RTCP Feedback for Congestion Control](https://datatracker.ietf.org/doc/html/rfc8888) the standardized alternative to TWCC.
4546

4647
### Interceptor Public API
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package intervalpli
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/pion/interceptor"
8+
"github.com/pion/logging"
9+
"github.com/pion/rtcp"
10+
)
11+
12+
// GeneratorInterceptor interceptor sends PLI packets.
13+
// Implements PLI in a naive way: sends a PLI for each new track that support PLI, periodically.
14+
type GeneratorInterceptor struct {
15+
interceptor.NoOp
16+
17+
interval time.Duration
18+
streams sync.Map
19+
immediatePLINeeded chan []uint32
20+
21+
log logging.LeveledLogger
22+
m sync.Mutex
23+
wg sync.WaitGroup
24+
25+
close chan struct{}
26+
}
27+
28+
// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor.
29+
func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
30+
r := &GeneratorInterceptor{
31+
interval: 3 * time.Second,
32+
log: logging.NewDefaultLoggerFactory().NewLogger("pli_generator"),
33+
immediatePLINeeded: make(chan []uint32, 1),
34+
close: make(chan struct{}),
35+
}
36+
37+
for _, opt := range opts {
38+
if err := opt(r); err != nil {
39+
return nil, err
40+
}
41+
}
42+
43+
return r, nil
44+
}
45+
46+
func (r *GeneratorInterceptor) isClosed() bool {
47+
select {
48+
case <-r.close:
49+
return true
50+
default:
51+
return false
52+
}
53+
}
54+
55+
// Close closes the interceptor.
56+
func (r *GeneratorInterceptor) Close() error {
57+
defer r.wg.Wait()
58+
r.m.Lock()
59+
defer r.m.Unlock()
60+
61+
if !r.isClosed() {
62+
close(r.close)
63+
}
64+
65+
return nil
66+
}
67+
68+
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
69+
// will be called once per packet batch.
70+
func (r *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
71+
r.m.Lock()
72+
defer r.m.Unlock()
73+
74+
if r.isClosed() {
75+
return writer
76+
}
77+
78+
r.wg.Add(1)
79+
80+
go r.loop(writer)
81+
82+
return writer
83+
}
84+
85+
func (r *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
86+
defer r.wg.Done()
87+
88+
ticker, tickerChan := r.createLoopTicker()
89+
90+
defer func() {
91+
if ticker != nil {
92+
ticker.Stop()
93+
}
94+
}()
95+
96+
for {
97+
select {
98+
case ssrcs := <-r.immediatePLINeeded:
99+
r.writePLIs(rtcpWriter, ssrcs)
100+
101+
case <-tickerChan:
102+
ssrcs := make([]uint32, 0)
103+
104+
r.streams.Range(func(k, value interface{}) bool {
105+
key, ok := k.(uint32)
106+
if !ok {
107+
return false
108+
}
109+
110+
ssrcs = append(ssrcs, key)
111+
return true
112+
})
113+
114+
r.writePLIs(rtcpWriter, ssrcs)
115+
116+
case <-r.close:
117+
return
118+
}
119+
}
120+
}
121+
122+
func (r *GeneratorInterceptor) createLoopTicker() (*time.Ticker, <-chan time.Time) {
123+
if r.interval > 0 {
124+
ticker := time.NewTicker(r.interval)
125+
return ticker, ticker.C
126+
}
127+
128+
return nil, make(chan time.Time)
129+
}
130+
131+
func (r *GeneratorInterceptor) writePLIs(rtcpWriter interceptor.RTCPWriter, ssrcs []uint32) {
132+
if len(ssrcs) == 0 {
133+
return
134+
}
135+
136+
pkts := []rtcp.Packet{}
137+
138+
for _, ssrc := range ssrcs {
139+
pkts = append(pkts, &rtcp.PictureLossIndication{MediaSSRC: ssrc})
140+
}
141+
142+
if _, err := rtcpWriter.Write(pkts, interceptor.Attributes{}); err != nil {
143+
r.log.Warnf("failed sending: %+v", err)
144+
}
145+
}
146+
147+
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
148+
// will be called once per rtp packet.
149+
func (r *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
150+
if !streamSupportPli(info) {
151+
return reader
152+
}
153+
154+
r.streams.Store(info.SSRC, nil)
155+
// New streams need to receive a PLI as soon as possible.
156+
r.ForcePLI(info.SSRC)
157+
158+
return reader
159+
}
160+
161+
// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
162+
func (r *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
163+
r.streams.Delete(info.SSRC)
164+
}
165+
166+
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
167+
// change in the future. The returned method will be called once per packet batch.
168+
func (r *GeneratorInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
169+
return reader
170+
}
171+
172+
// ForcePLI sends a PLI request to the tracks matching the provided SSRCs.
173+
func (r *GeneratorInterceptor) ForcePLI(ssrc ...uint32) {
174+
r.immediatePLINeeded <- ssrc
175+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package intervalpli
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/pion/interceptor"
8+
"github.com/pion/interceptor/internal/test"
9+
"github.com/pion/logging"
10+
"github.com/pion/rtcp"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestPLIGeneratorInterceptor_Unsupported(t *testing.T) {
15+
i, err := NewGeneratorInterceptor(
16+
GeneratorInterval(time.Millisecond*10),
17+
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
18+
)
19+
assert.Nil(t, err)
20+
21+
streamSSRC := uint32(123456)
22+
stream := test.NewMockStream(&interceptor.StreamInfo{
23+
SSRC: streamSSRC,
24+
MimeType: "video/h264",
25+
}, i)
26+
defer func() {
27+
assert.NoError(t, stream.Close())
28+
}()
29+
30+
timeout := time.NewTimer(100 * time.Millisecond)
31+
defer timeout.Stop()
32+
select {
33+
case <-timeout.C:
34+
return
35+
case <-stream.WrittenRTCP():
36+
assert.FailNow(t, "should not receive any PIL")
37+
}
38+
}
39+
40+
func TestPLIGeneratorInterceptor(t *testing.T) {
41+
i, err := NewGeneratorInterceptor(
42+
GeneratorInterval(time.Second*1),
43+
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
44+
)
45+
assert.Nil(t, err)
46+
47+
streamSSRC := uint32(123456)
48+
stream := test.NewMockStream(&interceptor.StreamInfo{
49+
SSRC: streamSSRC,
50+
ClockRate: 90000,
51+
MimeType: "video/h264",
52+
RTCPFeedback: []interceptor.RTCPFeedback{
53+
{Type: "nack", Parameter: "pli"},
54+
},
55+
}, i)
56+
defer func() {
57+
assert.NoError(t, stream.Close())
58+
}()
59+
60+
pkts := <-stream.WrittenRTCP()
61+
assert.Equal(t, len(pkts), 1)
62+
sr, ok := pkts[0].(*rtcp.PictureLossIndication)
63+
assert.True(t, ok)
64+
assert.Equal(t, &rtcp.PictureLossIndication{MediaSSRC: streamSSRC}, sr)
65+
66+
// Should not have another packet immediately...
67+
func() {
68+
timeout := time.NewTimer(100 * time.Millisecond)
69+
defer timeout.Stop()
70+
select {
71+
case <-timeout.C:
72+
return
73+
case <-stream.WrittenRTCP():
74+
assert.FailNow(t, "should not receive any PIL")
75+
}
76+
}()
77+
78+
// ... but should receive one 1sec later.
79+
pkts = <-stream.WrittenRTCP()
80+
assert.Equal(t, len(pkts), 1)
81+
sr, ok = pkts[0].(*rtcp.PictureLossIndication)
82+
assert.True(t, ok)
83+
assert.Equal(t, &rtcp.PictureLossIndication{MediaSSRC: streamSSRC}, sr)
84+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package intervalpli
2+
3+
import (
4+
"time"
5+
6+
"github.com/pion/logging"
7+
)
8+
9+
// GeneratorOption can be used to configure GeneratorInterceptor.
10+
type GeneratorOption func(r *GeneratorInterceptor) error
11+
12+
// GeneratorLog sets a logger for the interceptor.
13+
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
14+
return func(r *GeneratorInterceptor) error {
15+
r.log = log
16+
return nil
17+
}
18+
}
19+
20+
// GeneratorInterval sets send interval for the interceptor.
21+
func GeneratorInterval(interval time.Duration) GeneratorOption {
22+
return func(r *GeneratorInterceptor) error {
23+
r.interval = interval
24+
return nil
25+
}
26+
}

pkg/intervalpli/pli.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Package intervalpli is an interceptor that requests PLI on a static interval. Useful when bridging protocols that don't have receiver feedback
2+
package intervalpli
3+
4+
import "github.com/pion/interceptor"
5+
6+
func streamSupportPli(info *interceptor.StreamInfo) bool {
7+
for _, fb := range info.RTCPFeedback {
8+
if fb.Type == "nack" && fb.Parameter == "pli" {
9+
return true
10+
}
11+
}
12+
13+
return false
14+
}

0 commit comments

Comments
 (0)