Skip to content

Commit

Permalink
improve packet buffer dynamic minimum wait latency
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohan Totting committed Mar 19, 2024
1 parent 413e251 commit 67b383f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 53 deletions.
8 changes: 4 additions & 4 deletions bitratecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (bc *bitrateController) MonitorBandwidth(estimator cc.BandwidthEstimator) {
return
}

glog.Info("bitratecontroller: available bandwidth ", ThousandSeparator(int(bw)), " total bitrate ", ThousandSeparator(int(totalSendBitrates)))
// glog.Info("bitratecontroller: available bandwidth ", ThousandSeparator(int(bw)), " total bitrate ", ThousandSeparator(int(totalSendBitrates)))

bc.fitBitratesToBandwidth(uint32(bw))

Expand All @@ -470,15 +470,15 @@ func (bc *bitrateController) fitBitratesToBandwidth(bw uint32) {
if claim.IsAdjustable() &&
claim.Quality() == QualityLevel(i) {

glog.Info("bitratecontroller: reduce bitrate for track ", claim.track.ID(), " from ", claim.Quality(), " to ", claim.Quality()-1)
// glog.Info("bitratecontroller: reduce bitrate for track ", claim.track.ID(), " from ", claim.Quality(), " to ", claim.Quality()-1)
bc.setQuality(claim.track.ID(), claim.Quality()-1)

claim.track.RequestPLI()
totalSentBitrates = bc.totalSentBitrates()

// check if the reduced bitrate is fit to the available bandwidth
if totalSentBitrates <= bw {
glog.Info("bitratecontroller: total sent bitrates ", ThousandSeparator(int(totalSentBitrates)), " available bandwidth ", ThousandSeparator(int(bw)))
// glog.Info("bitratecontroller: total sent bitrates ", ThousandSeparator(int(totalSentBitrates)), " available bandwidth ", ThousandSeparator(int(bw)))
return
}
}
Expand All @@ -499,7 +499,7 @@ func (bc *bitrateController) fitBitratesToBandwidth(bw uint32) {
return
}

glog.Info("bitratecontroller: increase bitrate for track ", claim.track.ID(), " from ", claim.Quality(), " to ", claim.Quality()+1)
// glog.Info("bitratecontroller: increase bitrate for track ", claim.track.ID(), " from ", claim.Quality(), " to ", claim.Quality()+1)
bc.setQuality(claim.track.ID(), claim.Quality()+1)
// update current total bitrates
totalSentBitrates = bc.totalSentBitrates()
Expand Down
113 changes: 80 additions & 33 deletions packetbuffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sfu
import (
"container/list"
"errors"
"sort"
"sync"
"time"

Expand All @@ -22,6 +23,7 @@ type packetBuffers struct {
mu sync.RWMutex
buffers *list.List
lastSequenceNumber uint16
latencyMu sync.RWMutex
// min duration to wait before sending
minLatency time.Duration
// max duration to wait before sending
Expand All @@ -41,6 +43,7 @@ func newPacketBuffers(minLatency, maxLatency time.Duration) *packetBuffers {
return &packetBuffers{
mu: sync.RWMutex{},
buffers: list.New(),
latencyMu: sync.RWMutex{},
minLatency: minLatency,
maxLatency: maxLatency,
waitTimeMu: sync.RWMutex{},
Expand All @@ -50,20 +53,24 @@ func newPacketBuffers(minLatency, maxLatency time.Duration) *packetBuffers {
}

func (p *packetBuffers) MaxLatency() time.Duration {
p.mu.RLock()
defer p.mu.RUnlock()
p.latencyMu.RLock()
defer p.latencyMu.RUnlock()
return p.maxLatency
}

func (p *packetBuffers) MinLatency() time.Duration {
p.mu.RLock()
defer p.mu.RUnlock()
p.latencyMu.RLock()
defer p.latencyMu.RUnlock()
return p.minLatency
}

func (p *packetBuffers) Add(pkt *rtp.Packet) error {
p.mu.Lock()
defer p.mu.Unlock()
defer func() {
p.checkPacketWaitTime()
p.checkWaitTimeAdjuster()
p.mu.Unlock()
}()

if p.init &&
(p.lastSequenceNumber == pkt.SequenceNumber || IsRTPPacketLate(pkt.SequenceNumber, p.lastSequenceNumber)) {
Expand Down Expand Up @@ -108,10 +115,6 @@ Loop:
}

func (p *packetBuffers) pop(el *list.Element) *rtppool.RetainablePacket {
defer func() {
go p.checkWaitTimeAdjuster()
}()

pkt := el.Value.(*rtppool.RetainablePacket)

// make sure packet is not late
Expand All @@ -124,23 +127,30 @@ func (p *packetBuffers) pop(el *list.Element) *rtppool.RetainablePacket {
glog.Warning("packet cache: packet sequence ", pkt.Header().SequenceNumber, " has a gap with last sent ", p.lastSequenceNumber)
}

p.mu.Lock()
p.lastSequenceNumber = pkt.Header().SequenceNumber
p.packetCount++
p.mu.Unlock()

if p.oldestPacket == nil || p.oldestPacket.Header().SequenceNumber == pkt.Header().SequenceNumber {
// oldest packet will be remove, find the next oldest packet in the list
p.oldestPacket = nil

p.mu.RLock()
for e := el.Next(); e != nil; e = e.Next() {
packet := e.Value.(*rtppool.RetainablePacket)
if p.oldestPacket == nil || packet.AddedTime().Before(p.oldestPacket.AddedTime()) {
p.oldestPacket = packet
}
}
p.mu.RUnlock()

}

// remove the packets from the cache
p.mu.Lock()
p.buffers.Remove(el)
p.mu.Unlock()

return pkt
}
Expand All @@ -164,17 +174,23 @@ Loop:
}

func (p *packetBuffers) fetch(e *list.Element) *rtppool.RetainablePacket {
p.latencyMu.RLock()
maxLatency := p.maxLatency
minLatency := p.minLatency
p.latencyMu.RUnlock()

currentPacket := e.Value.(*rtppool.RetainablePacket)

currentSeq := currentPacket.Header().SequenceNumber

latency := time.Since(currentPacket.AddedTime())

if p.oldestPacket != nil && time.Since(p.oldestPacket.AddedTime()) > p.maxLatency {
if p.oldestPacket != nil && time.Since(p.oldestPacket.AddedTime()) > maxLatency {
// we have waited too long, we should send the packets
return p.pop(e)
}

if !p.init && latency > p.minLatency && e.Next() != nil && !IsRTPPacketLate(e.Next().Value.(*rtppool.RetainablePacket).Header().SequenceNumber, currentSeq) {
if !p.init && latency > minLatency && e.Next() != nil && !IsRTPPacketLate(e.Next().Value.(*rtppool.RetainablePacket).Header().SequenceNumber, currentSeq) {
// first packet to send, but make sure we have the packet in order
p.initSequence = currentSeq
p.init = true
Expand All @@ -183,15 +199,10 @@ func (p *packetBuffers) fetch(e *list.Element) *rtppool.RetainablePacket {

if (p.lastSequenceNumber < currentSeq || p.lastSequenceNumber-currentSeq > uint16SizeHalf) && currentSeq-p.lastSequenceNumber == 1 {
// the current packet is in sequence with the last packet we popped

// we record the wait time the packet getting reordered
p.recordWaitTime(e)

if time.Since(currentPacket.AddedTime()) > p.minLatency {
if time.Since(currentPacket.AddedTime()) > minLatency {
// passed the min latency
return p.pop(e)
}

}

// there is a gap between the last packet we popped and the current packet
Expand All @@ -200,9 +211,9 @@ func (p *packetBuffers) fetch(e *list.Element) *rtppool.RetainablePacket {
// but check with the latency if there is a packet pass the max latency
packetLatency := time.Since(currentPacket.AddedTime())
// glog.Info("packet latency: ", packetLatency, " gap: ", gap, " currentSeq: ", currentSeq, " nextSeq: ", nextSeq)
if packetLatency > p.maxLatency {
if packetLatency > maxLatency {
// we have waited too long, we should send the packets
glog.Warning("packet cache: packet sequence ", currentPacket.Header().SequenceNumber, " latency ", packetLatency, ", reached max latency ", p.maxLatency, ", will sending the packets")
glog.Warning("packet cache: packet sequence ", currentPacket.Header().SequenceNumber, " latency ", packetLatency, ", reached max latency ", maxLatency, ", will sending the packets")
return p.pop(e)
}

Expand All @@ -211,9 +222,8 @@ func (p *packetBuffers) fetch(e *list.Element) *rtppool.RetainablePacket {

func (p *packetBuffers) Pop() *rtppool.RetainablePacket {
p.mu.RLock()
defer p.mu.RUnlock()

frontElement := p.buffers.Front()
p.mu.RUnlock()
if frontElement == nil {
return nil
}
Expand Down Expand Up @@ -259,6 +269,27 @@ func (p *packetBuffers) Clear() {
p.oldestPacket = nil
}

func (p *packetBuffers) checkPacketWaitTime() {
if p.buffers.Len() == 0 {
return
}

for e := p.buffers.Front(); e != nil && e.Next() != nil; e = e.Next() {

currentPacket := e.Value.(*rtppool.RetainablePacket)
currentSeq := currentPacket.Header().SequenceNumber

nextPacket := e.Next().Value.(*rtppool.RetainablePacket)
nextSeq := nextPacket.Header().SequenceNumber

if (currentSeq < nextSeq || currentSeq-nextSeq > uint16SizeHalf) && nextSeq-currentSeq == 1 {
p.recordWaitTime(e)
} else {
break
}
}
}

func (p *packetBuffers) recordWaitTime(el *list.Element) {
p.waitTimeMu.Lock()
defer p.waitTimeMu.Unlock()
Expand Down Expand Up @@ -286,22 +317,38 @@ func (p *packetBuffers) checkWaitTimeAdjuster() {
p.waitTimeMu.Lock()
defer p.waitTimeMu.Unlock()

// calculate the average wait time
var totalWaitTime time.Duration
for _, wt := range p.waitTimes {
totalWaitTime += wt
}
// calculate the 75th percentile of the wait times
// sort the wait times

sortedWaitTimes := make([]time.Duration, len(p.waitTimes))
copy(sortedWaitTimes, p.waitTimes)

sort.Slice(sortedWaitTimes, func(i, j int) bool {
return sortedWaitTimes[i] < sortedWaitTimes[j]
})

// get the 75th percentile
percentile90thIndex := int(float64(len(sortedWaitTimes)) * 0.90)
percentile75thIndex := int(float64(len(sortedWaitTimes)) * 0.75)
percentile60thIndex := int(float64(len(sortedWaitTimes)) * 0.60)
percentile90th := sortedWaitTimes[percentile90thIndex]
percentile75th := sortedWaitTimes[percentile75thIndex]
percentile60th := sortedWaitTimes[percentile60thIndex]

averageWaitTime := totalWaitTime / time.Duration(len(p.waitTimes))
glog.Info("packet cache: 90th wait time: ", percentile90th, " 75th wait time ", percentile75th, " 60th wait time ", percentile60th)

if averageWaitTime > p.minLatency {
if percentile75th > p.minLatency {
// increase the min latency
p.minLatency = averageWaitTime
glog.Info("packet cache: average wait time ", averageWaitTime, ", increasing min latency to ", p.minLatency)
} else if averageWaitTime < p.minLatency {
glog.Info("packet cache: 75th wait time ", percentile75th, ", increasing min latency from ", p.minLatency)
p.latencyMu.Lock()
p.minLatency = percentile75th
p.latencyMu.Unlock()
} else if percentile75th < p.minLatency {
// decrease the min latency
p.minLatency = averageWaitTime
glog.Info("packet cache: average wait time ", averageWaitTime, ", decreasing min latency to ", p.minLatency)
glog.Info("packet cache: 75th wait time ", percentile75th, ", decreasing min latency from ", p.minLatency)
p.latencyMu.Lock()
p.minLatency = percentile75th
p.latencyMu.Unlock()
}

p.waitTimeResetCounter = 0
Expand Down
25 changes: 9 additions & 16 deletions pkg/pacer/leakybucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func (p *LeakyBucketPacer) AddStream(ssrc uint32, writer interceptor.RTPWriter)
defer p.qLock.Unlock()

p.ssrcToWriter[ssrc] = writer
p.queues[ssrc] = &queue{}
p.queues[ssrc] = &queue{
List: list.List{},
mu: sync.RWMutex{},
}
}

// SetTargetBitrate updates the target bitrate at which the pacer is allowed to
Expand Down Expand Up @@ -117,17 +120,6 @@ func (p *LeakyBucketPacer) Write(header *rtp.Header, payload []byte, attributes
return header.MarshalSize() + len(payload), nil
}

func (p *LeakyBucketPacer) Queues() map[uint32]*queue {
p.qLock.RLock()
defer p.qLock.RUnlock()
queues := make(map[uint32]*queue, len(p.queues))
for k, v := range p.queues {
queues[k] = v
}

return p.queues
}

// Run starts the LeakyBucketPacer
func (p *LeakyBucketPacer) Run() {
ticker := time.NewTicker(p.pacingInterval)
Expand All @@ -144,10 +136,11 @@ func (p *LeakyBucketPacer) Run() {
for {
emptyQueueCount := 0

queues := p.Queues()

for _, queue := range queues {
if queue.Len() == 0 {
for _, queue := range p.queues {
queue.mu.RLock()
queueSize := queue.Len()
queue.mu.RUnlock()
if queueSize == 0 {
emptyQueueCount++

if emptyQueueCount == len(p.queues) {
Expand Down

0 comments on commit 67b383f

Please sign in to comment.