Skip to content

Commit 37ab171

Browse files
thatsnotrightSean-Der
authored andcommitted
Jitter Buffer: Enable setting a minimum length
Add additional helper method to set the minimum packet count before the jitterbuffer will emit packets
1 parent d6aad58 commit 37ab171

File tree

2 files changed

+96
-11
lines changed

2 files changed

+96
-11
lines changed

pkg/jitterbuffer/jitter_buffer.go

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,15 @@ type (
6565
// order, and allows removing in either sequence number order or via a
6666
// provided timestamp
6767
type JitterBuffer struct {
68-
packets *PriorityQueue
69-
lastSequence uint16
70-
playoutHead uint16
71-
playoutReady bool
72-
state State
73-
stats Stats
74-
listeners map[Event][]EventListener
75-
mutex sync.Mutex
68+
packets *PriorityQueue
69+
minStartCount uint16
70+
lastSequence uint16
71+
playoutHead uint16
72+
playoutReady bool
73+
state State
74+
stats Stats
75+
listeners map[Event][]EventListener
76+
mutex sync.Mutex
7677
}
7778

7879
// Stats Track interesting statistics for the life of this JitterBuffer
@@ -90,13 +91,21 @@ type Stats struct {
9091

9192
// New will initialize a jitter buffer and its associated statistics
9293
func New(opts ...Option) *JitterBuffer {
93-
jb := &JitterBuffer{state: Buffering, stats: Stats{0, 0, 0}, packets: NewQueue(), listeners: make(map[Event][]EventListener)}
94+
jb := &JitterBuffer{state: Buffering, stats: Stats{0, 0, 0}, minStartCount: 50, packets: NewQueue(), listeners: make(map[Event][]EventListener)}
9495
for _, o := range opts {
9596
o(jb)
9697
}
9798
return jb
9899
}
99100

101+
// WithMinimumPacketCount will set the required number of packets to be received before
102+
// any attempt to pop a packet can succeed
103+
func WithMinimumPacketCount(count uint16) Option {
104+
return func(jb *JitterBuffer) {
105+
jb.minStartCount = count
106+
}
107+
}
108+
100109
// Listen will register an event listener
101110
// The jitter buffer may emit events correspnding, interested listerns should
102111
// look at Event for available events
@@ -142,7 +151,7 @@ func (jb *JitterBuffer) emit(event Event) {
142151

143152
func (jb *JitterBuffer) updateState() {
144153
// For now, we only look at the number of packets captured in the play buffer
145-
if jb.packets.Length() >= 50 && jb.state == Buffering {
154+
if jb.packets.Length() >= jb.minStartCount && jb.state == Buffering {
146155
jb.state = Emitting
147156
jb.playoutReady = true
148157
jb.emit(BeginPlayback)
@@ -186,6 +195,36 @@ func (jb *JitterBuffer) Pop() (*rtp.Packet, error) {
186195
return packet, nil
187196
}
188197

198+
// PopAtSequence will pop an RTP packet from the jitter buffer at the specified Sequence
199+
func (jb *JitterBuffer) PopAtSequence(sq uint16) (*rtp.Packet, error) {
200+
jb.mutex.Lock()
201+
defer jb.mutex.Unlock()
202+
if jb.state != Emitting {
203+
return nil, ErrPopWhileBuffering
204+
}
205+
packet, err := jb.packets.PopAt(sq)
206+
if err != nil {
207+
jb.stats.underflowCount++
208+
jb.emit(BufferUnderflow)
209+
return (*rtp.Packet)(nil), err
210+
}
211+
jb.playoutHead = (jb.playoutHead + 1) % math.MaxUint16
212+
jb.updateState()
213+
return packet, nil
214+
}
215+
216+
// PeekAtSequence will return an RTP packet from the jitter buffer at the specified Sequence
217+
// without removing it from the buffer
218+
func (jb *JitterBuffer) PeekAtSequence(sq uint16) (*rtp.Packet, error) {
219+
jb.mutex.Lock()
220+
defer jb.mutex.Unlock()
221+
packet, err := jb.packets.Find(sq)
222+
if err != nil {
223+
return (*rtp.Packet)(nil), err
224+
}
225+
return packet, nil
226+
}
227+
189228
// PopAtTimestamp pops an RTP packet from the jitter buffer with the provided timestamp
190229
// Call this method repeatedly to drain the buffer at the timestamp
191230
func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) {

pkg/jitterbuffer/jitter_buffer_test.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ func TestJitterBuffer(t *testing.T) {
2929
assert.Equal(jb.packets.Length(), uint16(4))
3030
assert.Equal(jb.lastSequence, uint16(5012))
3131
})
32-
3332
t.Run("Appends packets and begins playout", func(*testing.T) {
3433
jb := New()
3534
for i := 0; i < 100; i++ {
@@ -42,6 +41,18 @@ func TestJitterBuffer(t *testing.T) {
4241
assert.Equal(head.SequenceNumber, uint16(5012))
4342
assert.Equal(err, nil)
4443
})
44+
t.Run("Appends packets and begins playout", func(*testing.T) {
45+
jb := New(WithMinimumPacketCount(1))
46+
for i := 0; i < 2; i++ {
47+
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
48+
}
49+
assert.Equal(jb.packets.Length(), uint16(2))
50+
assert.Equal(jb.state, Emitting)
51+
assert.Equal(jb.playoutHead, uint16(5012))
52+
head, err := jb.Pop()
53+
assert.Equal(head.SequenceNumber, uint16(5012))
54+
assert.Equal(err, nil)
55+
})
4556
t.Run("Wraps playout correctly", func(*testing.T) {
4657
jb := New()
4758
for i := 0; i < 100; i++ {
@@ -99,6 +110,20 @@ func TestJitterBuffer(t *testing.T) {
99110
assert.Equal(pkt.SequenceNumber, uint16(5000))
100111
assert.Equal(err, nil)
101112
})
113+
t.Run("Pops at sequence with an invalid sequence number", func(*testing.T) {
114+
jb := New()
115+
for i := 0; i < 50; i++ {
116+
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
117+
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
118+
}
119+
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
120+
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
121+
assert.Equal(jb.packets.Length(), uint16(52))
122+
assert.Equal(jb.state, Emitting)
123+
head, err := jb.PopAtSequence(uint16(9000))
124+
assert.Equal(head, (*rtp.Packet)(nil))
125+
assert.NotEqual(err, nil)
126+
})
102127
t.Run("Pops at timestamp with multiple packets", func(*testing.T) {
103128
jb := New()
104129
for i := 0; i < 50; i++ {
@@ -120,4 +145,25 @@ func TestJitterBuffer(t *testing.T) {
120145
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
121146
assert.Equal(err, nil)
122147
})
148+
t.Run("Peeks at timestamp with multiple packets", func(*testing.T) {
149+
jb := New()
150+
for i := 0; i < 50; i++ {
151+
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
152+
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
153+
}
154+
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
155+
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
156+
assert.Equal(jb.packets.Length(), uint16(52))
157+
assert.Equal(jb.state, Emitting)
158+
head, err := jb.PeekAtSequence(uint16(1019))
159+
assert.Equal(head.SequenceNumber, uint16(1019))
160+
assert.Equal(err, nil)
161+
head, err = jb.PeekAtSequence(uint16(1020))
162+
assert.Equal(head.SequenceNumber, uint16(1020))
163+
assert.Equal(err, nil)
164+
165+
head, err = jb.PopAtSequence(uint16(math.MaxUint16 - 32))
166+
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
167+
assert.Equal(err, nil)
168+
})
123169
}

0 commit comments

Comments
 (0)