Skip to content

Commit

Permalink
SRTP and SRTCP has been ported to streams API
Browse files Browse the repository at this point in the history
Send/Recv and RTCP is complete

Relates to #272
  • Loading branch information
Sean-Der committed Jan 3, 2019
1 parent da908cc commit b647be8
Showing 1 changed file with 74 additions and 60 deletions.
134 changes: 74 additions & 60 deletions session.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package srtp

import (
"bytes"
"fmt"
"io"
"net"
"sync"

"github.com/pions/webrtc/pkg/rtcp"
"github.com/pions/webrtc/pkg/rtp"
)

Expand All @@ -18,17 +21,17 @@ type SessionSRTP struct {
}

// CreateSessionSRTP creates a new SessionSRTP
func CreateSessionSRTP(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt []byte, profile ProtectionProfile, nextConn net.Conn) (*SessionSRTP, error) {
s := &SessionSRTP{
session: session{nextConn: nextConn},
}
func CreateSessionSRTP() *SessionSRTP {
s := &SessionSRTP{}
s.writeStream = &WriteStream{s}
s.session.initalize()
return s
}

if err := s.session.initalize(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt, profile, s); err != nil {
return nil, err
}

return s, nil
// Start initializes any crypto context and allows reading/writing to begin
func (s *SessionSRTP) Start(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt []byte, profile ProtectionProfile, nextConn net.Conn) error {
s.session.nextConn = nextConn
return s.session.start(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt, profile, s)
}

// OpenWriteStream returns the global write stream for the Session
Expand Down Expand Up @@ -59,6 +62,11 @@ func (s *SessionSRTP) Close() error {
}

func (s *SessionSRTP) write(buf []byte) (int, error) {
_, ok := <-s.session.started
if ok {
return 0, fmt.Errorf("started channel used incorrectly, should only be closed")
}

s.session.localContextMutex.Lock()
defer s.session.localContextMutex.Unlock()

Expand Down Expand Up @@ -101,17 +109,17 @@ type SessionSRTCP struct {
}

// CreateSessionSRTCP creates a new SessionSRTCP
func CreateSessionSRTCP(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt []byte, profile ProtectionProfile, nextConn net.Conn) (*SessionSRTCP, error) {
s := &SessionSRTCP{
session: session{nextConn: nextConn},
}
func CreateSessionSRTCP() *SessionSRTCP {
s := &SessionSRTCP{}
s.writeStream = &WriteStream{s}
s.session.initalize()
return s
}

if err := s.session.initalize(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt, profile, s); err != nil {
return nil, err
}

return s, nil
// Start initializes any crypto context and allows reading/writing to begin
func (s *SessionSRTCP) Start(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt []byte, profile ProtectionProfile, nextConn net.Conn) error {
s.session.nextConn = nextConn
return s.session.start(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt, profile, s)
}

// OpenWriteStream returns the global write stream for the Session
Expand Down Expand Up @@ -142,6 +150,11 @@ func (s *SessionSRTCP) Close() error {
}

func (s *SessionSRTCP) write(buf []byte) (int, error) {
_, ok := <-s.session.started
if ok {
return 0, fmt.Errorf("started channel used incorrectly, should only be closed")
}

s.session.localContextMutex.Lock()
defer s.session.localContextMutex.Unlock()

Expand All @@ -153,44 +166,38 @@ func (s *SessionSRTCP) write(buf []byte) (int, error) {
}

func (s *SessionSRTCP) decrypt(buf []byte) error {
fmt.Println("TODO SessionSRTCP.decrypt")
// func handleRTCP(getBufferTransports func(uint32) *TransportPair, buffer []byte) {
// //decrypted packets can also be compound packets, so we have to nest our reader loop here.
// compoundPacket := rtcp.NewReader(bytes.NewReader(buffer))
// for {
// _, rawrtcp, err := compoundPacket.ReadPacket()
//
// if err != nil {
// if err == io.EOF {
// break
// }
// fmt.Println(err)
// return
// }
//
// var report rtcp.Packet
// report, _, err = rtcp.Unmarshal(rawrtcp)
// if err != nil {
// fmt.Println(err)
// return
// }
//
// f := func(ssrc uint32) {
// bufferTransport := getBufferTransports(ssrc)
// if bufferTransport != nil && bufferTransport.RTCP != nil {
// select {
// case bufferTransport.RTCP <- report:
// default:
// }
// }
// }
//
// for _, ssrc := range report.DestinationSSRC() {
// f(ssrc)
// }
// }
// }
return nil
decrypted, err := s.remoteContext.DecryptRTCP(buf)
if err != nil {
return err
}

compoundPacket := rtcp.NewReader(bytes.NewReader(decrypted))
for {
_, rawrtcp, err := compoundPacket.ReadPacket()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

var report rtcp.Packet
report, _, err = rtcp.Unmarshal(rawrtcp)
if err != nil {
return err
}

for _, ssrc := range report.DestinationSSRC() {
r, isNew := s.session.getOrCreateReadStream(ssrc, s)
if r == nil {
return nil // Session has been closed
} else if isNew {
s.session.newStream <- r // Notify AcceptStream
}

r.decrypted <- decrypted
}
}
}

/*
Expand All @@ -201,6 +208,7 @@ type session struct {
localContext, remoteContext *Context

newStream chan *ReadStream
started chan interface{}

readStreamsClosed bool
readStreams map[uint32]*ReadStream
Expand Down Expand Up @@ -229,7 +237,13 @@ func (s *session) getOrCreateReadStream(ssrc uint32, child streamSession) (*Read
return r, isNew
}

func (s *session) initalize(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt []byte, profile ProtectionProfile, child streamSession) error {
func (s *session) initalize() {
s.readStreams = map[uint32]*ReadStream{}
s.newStream = make(chan *ReadStream)
s.started = make(chan interface{})
}

func (s *session) start(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt []byte, profile ProtectionProfile, child streamSession) error {
var err error
s.localContext, err = CreateContext(localMasterKey, localMasterSalt, profile)
if err != nil {
Expand All @@ -241,9 +255,6 @@ func (s *session) initalize(localMasterKey, localMasterSalt, remoteMasterKey, re
return err
}

s.readStreams = map[uint32]*ReadStream{}
s.newStream = make(chan *ReadStream)

go func() {
defer func() {
close(s.newStream)
Expand Down Expand Up @@ -271,5 +282,8 @@ func (s *session) initalize(localMasterKey, localMasterSalt, remoteMasterKey, re
}
}
}()

close(s.started)

return nil
}

0 comments on commit b647be8

Please sign in to comment.