Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions pending_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func (q *pendingBaseQueue) pop() *chunkPayloadData {
if len(q.queue) == 0 {
return nil
}

c := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]

return c
Expand Down Expand Up @@ -55,9 +57,14 @@ type pendingQueue struct {

// Pending queue errors.
var (
ErrUnexpectedChuckPoppedUnordered = errors.New("unexpected chunk popped (unordered)")
ErrUnexpectedChuckPoppedOrdered = errors.New("unexpected chunk popped (ordered)")
ErrUnexpectedChunkPoppedUnordered = errors.New("unexpected chunk popped (unordered)")
ErrUnexpectedChunkPoppedOrdered = errors.New("unexpected chunk popped (ordered)")
ErrUnexpectedQState = errors.New("unexpected q state (should've been selected)")

// Deprecated: use ErrUnexpectedChunkPoppedUnordered.
ErrUnexpectedChuckPoppedUnordered = ErrUnexpectedChunkPoppedUnordered
// Deprecated: use ErrUnexpectedChunkPoppedOrdered.
ErrUnexpectedChuckPoppedOrdered = ErrUnexpectedChunkPoppedOrdered
)

func newPendingQueue() *pendingQueue {
Expand Down Expand Up @@ -98,12 +105,12 @@ func (q *pendingQueue) pop(chunkPayload *chunkPayloadData) error { //nolint:cycl
if q.unorderedIsSelected {
popped = q.unorderedQueue.pop()
if popped != chunkPayload {
return ErrUnexpectedChuckPoppedUnordered
return ErrUnexpectedChunkPoppedUnordered
}
} else {
popped = q.orderedQueue.pop()
if popped != chunkPayload {
return ErrUnexpectedChuckPoppedOrdered
return ErrUnexpectedChunkPoppedOrdered
}
}
if popped.endingFragment {
Expand All @@ -116,7 +123,7 @@ func (q *pendingQueue) pop(chunkPayload *chunkPayloadData) error { //nolint:cycl
if chunkPayload.unordered {
popped := q.unorderedQueue.pop()
if popped != chunkPayload {
return ErrUnexpectedChuckPoppedUnordered
return ErrUnexpectedChunkPoppedUnordered
}
if !popped.endingFragment {
q.selected = true
Expand All @@ -125,15 +132,20 @@ func (q *pendingQueue) pop(chunkPayload *chunkPayloadData) error { //nolint:cycl
} else {
popped := q.orderedQueue.pop()
if popped != chunkPayload {
return ErrUnexpectedChuckPoppedOrdered
return ErrUnexpectedChunkPoppedOrdered
}
if !popped.endingFragment {
q.selected = true
q.unorderedIsSelected = false
}
}
}

// guard against negative values (should never happen, but just in case).
q.nBytes -= len(chunkPayload.userData)
if q.nBytes < 0 {
q.nBytes = 0
}

return nil
}
Expand Down
85 changes: 84 additions & 1 deletion pending_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestPendingBaseQueue(t *testing.T) {
}
})

t.Run("out of bounce", func(t *testing.T) {
t.Run("out of bounds", func(t *testing.T) {
pq := newPendingBaseQueue()
assert.Nil(t, pq.pop(), "should be nil")
assert.Nil(t, pq.get(0), "should be nil")
Expand Down Expand Up @@ -192,3 +192,86 @@ func TestPendingQueue(t *testing.T) {
}
})
}

func TestPendingQueue_PopErrors(t *testing.T) {
t.Run("ErrUnexpectedQState when not selected and not beginningFragment", func(t *testing.T) {
pq := newPendingQueue()

mid := makeDataChunk(100, false, fragMiddle)
err := pq.pop(mid)
assert.ErrorIs(t, err, ErrUnexpectedQState)
})

t.Run("ErrUnexpectedChunkPoppedUnordered (not selected path)", func(t *testing.T) {
pq := newPendingQueue()

u1 := makeDataChunk(1, true, noFragment)
u2 := makeDataChunk(2, true, noFragment)
pq.push(u1)
pq.push(u2)

err := pq.pop(u2)
assert.ErrorIs(t, err, ErrUnexpectedChunkPoppedUnordered)
})

t.Run("ErrUnexpectedChunkPoppedOrdered (not selected path)", func(t *testing.T) {
pq := newPendingQueue()

o1 := makeDataChunk(10, false, noFragment)
o2 := makeDataChunk(11, false, noFragment)
pq.push(o1)
pq.push(o2)

err := pq.pop(o2)
assert.ErrorIs(t, err, ErrUnexpectedChunkPoppedOrdered)
})

t.Run("ErrUnexpectedChunkPoppedUnordered (selected unordered path)", func(t *testing.T) {
pq := newPendingQueue()

uBegin := makeDataChunk(21, true, fragBegin)
uMid := makeDataChunk(22, true, fragMiddle)
uEnd := makeDataChunk(23, true, fragEnd)
pq.push(uBegin)
pq.push(uMid)
pq.push(uEnd)

err := pq.pop(uBegin)
assert.NoError(t, err)

err = pq.pop(uEnd)
assert.ErrorIs(t, err, ErrUnexpectedChunkPoppedUnordered)
})

t.Run("ErrUnexpectedChunkPoppedOrdered (selected ordered path)", func(t *testing.T) {
pq := newPendingQueue()

oBegin := makeDataChunk(31, false, fragBegin)
oMid := makeDataChunk(32, false, fragMiddle)
oEnd := makeDataChunk(33, false, fragEnd)
pq.push(oBegin)
pq.push(oMid)
pq.push(oEnd)

err := pq.pop(oBegin)
assert.NoError(t, err)

err = pq.pop(oEnd)
assert.ErrorIs(t, err, ErrUnexpectedChunkPoppedOrdered)
})

t.Run("nBytes guard clamps to zero when underflows", func(t *testing.T) {
pq := newPendingQueue()

c := makeDataChunk(40, false, noFragment)
pq.push(c)

pq.nBytes = 5

peek := pq.peek()
err := pq.pop(peek)
assert.NoError(t, err)

assert.Equal(t, 0, pq.getNumBytes())
})
}
58 changes: 58 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
package sctp

import (
"runtime"
"runtime/debug"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -44,3 +48,57 @@ func TestQueue(t *testing.T) {
assert.Equal(t, i, queu.PopFront())
}
}

// waitForFinalizers spins until at least target have run or timeout hits.
func waitForFinalizers(got *int32, target int32, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
runtime.GC()

if atomic.LoadInt32(got) >= target {
return true
}

time.Sleep(10 * time.Millisecond)
}

return atomic.LoadInt32(got) >= target
}

func TestPendingBaseQueuePopReleasesReferences(t *testing.T) {
// Make GC more aggressive for the duration of this test.
prev := debug.SetGCPercent(10)
defer debug.SetGCPercent(prev)

bufSize := 256 << 10
queue := newPendingBaseQueue()
var finalized int32

// add 64 chunks, each with a finalizer to count collection.
for i := 0; i < 64; i++ {
c := &chunkPayloadData{
userData: make([]byte, bufSize),
}

// count when the chunk struct becomes unreachable.
runtime.SetFinalizer(c, func(*chunkPayloadData) {
atomic.AddInt32(&finalized, 1)
})
queue.push(c)
}

// pop 63 chunks so only 1 is left
for i := 0; i < 63; i++ {
queue.pop()
}

assert.Equal(t, queue.size(), 1)

wantAtLeast := int32(64 - 4) // wait for GC scheduling
ok := waitForFinalizers(&finalized, wantAtLeast, 3*time.Second)
assert.True(t, ok)

// Now pop the last element; queue should be empty.
queue.pop()
assert.Equal(t, queue.size(), 0)
}
Loading