Skip to content

Commit

Permalink
Convert packetIO buffer to an interface
Browse files Browse the repository at this point in the history
PacketIO buffer interface will allow to use custom buffer for
rtp/rtcp streams
  • Loading branch information
OrlandoCo authored and tarrencev committed Jan 4, 2021
1 parent 740f8a4 commit a14e41b
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 11 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ Check out the **[contributing wiki](https://github.com/pion/webrtc/wiki/Contribu
* [Jerko Steiner](https://github.com/jeremija)
* [Juliusz Chroboczek](https://github.com/jech)
* [Mission Liao](https://github.com/mission-liao)
* [Orlando](https://github.com/OrlandoCo)
* [Tarrence van As](https://github.com/tarrencev)

### License
MIT License - see [LICENSE](LICENSE) for full text
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ require (
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.6
github.com/pion/rtp v1.6.2
github.com/pion/transport v0.12.1
github.com/pion/transport v0.12.2
github.com/stretchr/testify v1.6.1
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo=
github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/transport v0.12.1 h1:6v8lxQGVZpwSICEZjhl/CCv6aErINZlrm3O5ncFXj/c=
github.com/pion/transport v0.12.1/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
github.com/pion/transport v0.12.2 h1:WYEjhloRHt1R86LhUKjC5y+P52Y11/QqEUalvtzVoys=
github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
5 changes: 4 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/pion/logging"
"github.com/pion/transport/packetio"
)

type streamSession interface {
Expand All @@ -28,7 +29,8 @@ type session struct {
readStreams map[uint32]readStream
readStreamsLock sync.Mutex

log logging.LeveledLogger
log logging.LeveledLogger
bufferFactory func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser

nextConn net.Conn
}
Expand All @@ -40,6 +42,7 @@ type session struct {
type Config struct {
Keys SessionKeys
Profile ProtectionProfile
BufferFactory func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser
LoggerFactory logging.LoggerFactory

// List of local/remote context options.
Expand Down
1 change: 1 addition & 0 deletions session_srtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewSessionSRTCP(conn net.Conn, config *Config) (*SessionSRTCP, error) { //n
newStream: make(chan readStream),
started: make(chan interface{}),
closed: make(chan interface{}),
bufferFactory: config.BufferFactory,
log: loggerFactory.NewLogger("srtp"),
},
}
Expand Down
1 change: 1 addition & 0 deletions session_srtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewSessionSRTP(conn net.Conn, config *Config) (*SessionSRTP, error) { //nol
newStream: make(chan readStream),
started: make(chan interface{}),
closed: make(chan interface{}),
bufferFactory: config.BufferFactory,
log: loggerFactory.NewLogger("srtp"),
},
}
Expand Down
14 changes: 10 additions & 4 deletions stream_srtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package srtp

import (
"errors"
"io"
"sync"
"time"

Expand All @@ -22,7 +23,7 @@ type ReadStreamSRTCP struct {
session *SessionSRTCP
ssrc uint32

buffer *packetio.Buffer
buffer io.ReadWriteCloser
}

func (r *ReadStreamSRTCP) write(buf []byte) (n int, err error) {
Expand Down Expand Up @@ -107,9 +108,14 @@ func (r *ReadStreamSRTCP) init(child streamSession, ssrc uint32) error {
r.isInited = true
r.isClosed = make(chan bool)

// Create a buffer and limit it to 100KB
r.buffer = packetio.NewBuffer()
r.buffer.SetLimitSize(srtcpBufferSize)
if r.session.bufferFactory != nil {
r.buffer = r.session.bufferFactory(packetio.RTCPBufferPacket, ssrc)
} else {
// Create a buffer and limit it to 100KB
buff := packetio.NewBuffer()
buff.SetLimitSize(srtcpBufferSize)
r.buffer = buff
}

return nil
}
Expand Down
12 changes: 9 additions & 3 deletions stream_srtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package srtp

import (
"errors"
"io"
"sync"
"time"

Expand All @@ -22,7 +23,7 @@ type ReadStreamSRTP struct {
session *SessionSRTP
ssrc uint32

buffer *packetio.Buffer
buffer io.ReadWriteCloser
}

// Used by getOrCreateReadStream
Expand All @@ -48,8 +49,13 @@ func (r *ReadStreamSRTP) init(child streamSession, ssrc uint32) error {
r.isClosed = make(chan bool)

// Create a buffer with a 1MB limit
r.buffer = packetio.NewBuffer()
r.buffer.SetLimitSize(srtpBufferSize)
if r.session.bufferFactory != nil {
r.buffer = r.session.bufferFactory(packetio.RTPBufferPacket, ssrc)
} else {
buff := packetio.NewBuffer()
buff.SetLimitSize(srtpBufferSize)
r.buffer = buff
}

return nil
}
Expand Down
40 changes: 40 additions & 0 deletions stream_srtp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package srtp
import (
"io"
"net"
"sync"
"testing"
"time"

"github.com/pion/rtp"
"github.com/pion/transport/packetio"
"github.com/stretchr/testify/assert"
)

type noopConn struct{ closed chan struct{} }
Expand All @@ -21,6 +24,43 @@ func (c *noopConn) SetDeadline(t time.Time) error { return nil }
func (c *noopConn) SetReadDeadline(t time.Time) error { return nil }
func (c *noopConn) SetWriteDeadline(t time.Time) error { return nil }

func TestBufferFactory(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(2)
conn := newNoopConn()
bf := func(_ packetio.BufferPacketType, _ uint32) io.ReadWriteCloser {
wg.Done()
return packetio.NewBuffer()
}
rtpSession, err := NewSessionSRTP(conn, &Config{
Keys: SessionKeys{
LocalMasterKey: make([]byte, 16),
LocalMasterSalt: make([]byte, 14),
RemoteMasterKey: make([]byte, 16),
RemoteMasterSalt: make([]byte, 14),
},
BufferFactory: bf,
Profile: ProtectionProfileAes128CmHmacSha1_80,
})
assert.NoError(t, err)
rtcpSession, err := NewSessionSRTCP(conn, &Config{
Keys: SessionKeys{
LocalMasterKey: make([]byte, 16),
LocalMasterSalt: make([]byte, 14),
RemoteMasterKey: make([]byte, 16),
RemoteMasterSalt: make([]byte, 14),
},
BufferFactory: bf,
Profile: ProtectionProfileAes128CmHmacSha1_80,
})
assert.NoError(t, err)

_, _ = rtpSession.OpenReadStream(123)
_, _ = rtcpSession.OpenReadStream(123)

wg.Wait()
}

func BenchmarkWrite(b *testing.B) {
conn := newNoopConn()

Expand Down

0 comments on commit a14e41b

Please sign in to comment.