Skip to content

Commit

Permalink
Fix race in nack responder
Browse files Browse the repository at this point in the history
  • Loading branch information
aalekseevx committed Feb 6, 2025
1 parent 60fb984 commit 3755e63
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
9 changes: 5 additions & 4 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,12 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri

// error is already checked in NewGeneratorInterceptor
rtpBuffer, _ := rtpbuffer.NewRTPBuffer(n.size)
n.streamsMu.Lock()
n.streams[info.SSRC] = &localStream{
stream := &localStream{
rtpBuffer: rtpBuffer,
rtpWriter: writer,
}
n.streamsMu.Lock()
n.streams[info.SSRC] = stream
n.streamsMu.Unlock()

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
Expand All @@ -122,8 +123,8 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
if err != nil {
return 0, err
}
n.streams[info.SSRC].rtpBufferMutex.Lock()
defer n.streams[info.SSRC].rtpBufferMutex.Unlock()
stream.rtpBufferMutex.Lock()
defer stream.rtpBufferMutex.Unlock()

rtpBuffer.Add(pkt)

Expand Down
32 changes: 32 additions & 0 deletions pkg/nack/responder_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package nack

import (
"encoding/binary"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -153,6 +154,37 @@ func TestResponderInterceptor_Race(t *testing.T) {
}
}

// this test is only useful when being run with the race detector, it won't fail otherwise:
//
// go test -race ./pkg/nack/
func TestResponderInterceptor_RaceConcurrentStreams(t *testing.T) {
f, err := NewResponderInterceptor(
ResponderSize(32768),
ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
require.NoError(t, err)

i, err := f.NewInterceptor("")
require.NoError(t, err)

var wg sync.WaitGroup
for j := 0; j < 5; j++ {
stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 1,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}, i)
wg.Add(1)
go func() {
for seqNum := uint16(0); seqNum < 500; seqNum++ {
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
}
wg.Done()
}()
}

wg.Wait()
}

func TestResponderInterceptor_StreamFilter(t *testing.T) {
f, err := NewResponderInterceptor(
ResponderSize(8),
Expand Down

0 comments on commit 3755e63

Please sign in to comment.