forked from inlivedev/sfu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
packetcache.go
157 lines (129 loc) · 3.61 KB
/
packetcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package sfu
import (
"container/list"
"errors"
"sync"
"github.com/pion/logging"
)
type packetCaches struct {
mu sync.RWMutex
caches *list.List
init bool
Size uint16
log logging.LeveledLogger
}
type Cache struct {
SeqNum uint16
Timestamp uint32
BaseSeq uint16
// packet TID
PTID uint8
// packet SID
PSID uint8
// stream TID
TID uint8
// stream SID
SID uint8
}
type OperationType uint8
const (
KEEPLAYER = OperationType(0)
SCALEDOWNLAYER = OperationType(1)
SCALEUPLAYER = OperationType(2)
)
var (
ErrCantDecide = errors.New("can't decide if upscale or downscale the layer")
ErrDuplicate = errors.New("packet sequence already exists in the cache")
)
func newPacketCaches(log logging.LeveledLogger) *packetCaches {
return &packetCaches{
mu: sync.RWMutex{},
caches: &list.List{},
init: false,
Size: 1000,
log: log,
}
}
func (p *packetCaches) Add(seqNum, baseSequence uint16, ts uint32, psid, tsid, sid, tid uint8) {
p.mu.Lock()
defer func() {
p.mu.Unlock()
if uint16(p.caches.Len()) > p.Size {
p.caches.Remove(p.caches.Front())
}
}()
newCache := Cache{
SeqNum: seqNum,
BaseSeq: baseSequence,
Timestamp: ts,
PSID: psid,
PTID: tsid,
TID: tid,
SID: sid,
}
if p.caches.Len() == 0 {
p.caches.PushBack(newCache)
return
}
// add packet in order
Loop:
for e := p.caches.Back(); e != nil; e = e.Prev() {
currentCache := e.Value.(Cache)
if currentCache.SeqNum == seqNum {
p.log.Warnf("packet cache: packet sequence ", seqNum, " already exists in the cache, will not adding the packet")
return
}
if currentCache.SeqNum < seqNum && seqNum-currentCache.SeqNum < uint16SizeHalf {
p.caches.InsertAfter(newCache, e)
break Loop
} else if currentCache.SeqNum-seqNum > uint16SizeHalf {
p.caches.InsertAfter(newCache, e)
break Loop
} else if e.Prev() == nil {
p.caches.PushFront(newCache)
break Loop
}
}
}
// This will provide decided sid and tid that can be used for the current packet
// it can return the same sid and tid if the sequence number is in sequence
// it will decide to upscale or downscale the sid and tid based on the sequence number
func (p *packetCaches) GetDecision(currentSeqNum, currentBaseSeq uint16, currentSID, currentTID uint8) (baseSeq uint16, sid uint8, tid uint8, err error) {
p.mu.RLock()
defer p.mu.RUnlock()
for e := p.caches.Back(); e != nil; e = e.Prev() {
currentCache := e.Value.(Cache)
if currentCache.SeqNum == currentSeqNum {
return currentBaseSeq, currentSID, currentTID, ErrDuplicate
}
// the current packet is not late
if currentCache.SeqNum < currentSeqNum &&
currentCache.SeqNum-currentSeqNum > uint16SizeHalf && currentSeqNum-currentCache.SeqNum == 1 {
// next packet is the next sequence number, allowed to upscale or downscale
return currentCache.BaseSeq, currentSID, currentTID, nil
}
if currentCache.SeqNum < currentSeqNum &&
currentCache.SeqNum-currentSeqNum > uint16SizeHalf && currentSeqNum-currentCache.SeqNum > 1 {
// next packet is has a gap, can't decide keep the current SID and TID
return currentCache.BaseSeq, currentCache.SID, currentCache.TID, ErrCantDecide
}
}
// can't decide could be because the cache is empty
if p.caches.Len() == 0 {
return currentBaseSeq, currentSID, currentTID, ErrCantDecide
}
return currentBaseSeq, currentSID, currentTID, ErrCantDecide
}
func (p *packetCaches) IsAllowToUpscaleDownscale(seqNum uint16) bool {
if p.caches.Back() == nil {
return true
}
cache, ok := p.caches.Back().Value.(Cache)
if !ok {
return false
}
if cache.SeqNum < seqNum {
return true
}
return false
}