Skip to content

Commit

Permalink
Merge pull request #3 from cockroachdb/queue-safer
Browse files Browse the repository at this point in the history
queue: make queue pools safer
  • Loading branch information
RaduBerinde authored Jun 6, 2024
2 parents 32ef7c3 + 543ddf9 commit 0bbfbd9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 27 deletions.
46 changes: 22 additions & 24 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ type Queue[T any] struct {
len int
head, tail *queueNode[T]

pool *queueBackingPool[T]
pool *QueueBackingPool[T]
}

// MakeQueue constructs a new Queue.
func MakeQueue[T any]() Queue[T] {
//
// The pool should be a singleton object initialized with MakeQueueBackingPool.
// A single pool can and should be used by all queues of that type.
func MakeQueue[T any](pool *QueueBackingPool[T]) Queue[T] {
return Queue[T]{
pool: getQueueBackingPool[T](),
pool: pool,
}
}

Expand Down Expand Up @@ -91,35 +94,30 @@ func (q *Queue[T]) PopFront() {
q.len--
}

// queueBackingPool is a sync.Pool that used to allocate internal nodes
// QueueBackingPool is a sync.Pool that used to allocate internal nodes
// for Queue[T].
type queueBackingPool[T any] sync.Pool
type QueueBackingPool[T any] struct {
pool sync.Pool
}

func newQueueBackingPool[T any]() *queueBackingPool[T] {
return &queueBackingPool[T]{
New: func() interface{} { return &queueNode[T]{} },
// MakeQueueBackingPool makes a queue backing pool. It is intented to be used to
// initialize a singleton (global) variable. A single pool can and should be
// used by all queues of that type.
func MakeQueueBackingPool[T any]() QueueBackingPool[T] {
return QueueBackingPool[T]{
pool: sync.Pool{
New: func() interface{} { return &queueNode[T]{} },
},
}
}

func (qp *queueBackingPool[T]) get() *queueNode[T] {
return (*sync.Pool)(qp).Get().(*queueNode[T])
func (qp *QueueBackingPool[T]) get() *queueNode[T] {
return qp.pool.Get().(*queueNode[T])
}

func (qp *queueBackingPool[T]) put(n *queueNode[T]) {
func (qp *QueueBackingPool[T]) put(n *queueNode[T]) {
*n = queueNode[T]{}
(*sync.Pool)(qp).Put(n)
}

// queueBackingPools stores singleton queue backing pools, keyed by a nil pointer of the
// respective type.
var queueBackingPools sync.Map

func getQueueBackingPool[T any]() *queueBackingPool[T] {
p, ok := queueBackingPools.Load((*T)(nil))
if !ok {
p, _ = queueBackingPools.LoadOrStore((*T)(nil), newQueueBackingPool[T]())
}
return p.(*queueBackingPool[T])
qp.pool.Put(n)
}

// We batch the allocation of this many queue objects. The value was chosen
Expand Down
6 changes: 4 additions & 2 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"github.com/stretchr/testify/require"
)

var pool = MakeQueueBackingPool[int]()

func TestQueue(t *testing.T) {
q := MakeQueue[int]()
q := MakeQueue[int](&pool)
require.Nil(t, q.PeekFront())
require.Equal(t, 0, q.Len())
q.PushBack(1)
Expand All @@ -49,7 +51,7 @@ func TestQueue(t *testing.T) {
}

func TestQueueRand(t *testing.T) {
q := MakeQueue[int]()
q := MakeQueue[int](&pool)
l, r := 0, 0
for iteration := 0; iteration < 100; iteration++ {
for n := rand.Intn(100); n > 0; n-- {
Expand Down
4 changes: 3 additions & 1 deletion semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ func NewSemaphore(capacity int64) *Semaphore {
}
s := &Semaphore{}
s.mu.capacity = capacity
s.mu.waiters = MakeQueue[semaWaiter]()
s.mu.waiters = MakeQueue[semaWaiter](&semaQueuePool)
return s
}

var semaQueuePool = MakeQueueBackingPool[semaWaiter]()

var ErrRequestExceedsCapacity = errors.New("request exceeds semaphore capacity")

// TryAcquire attempts to acquire n units from the semaphore without waiting. On
Expand Down

0 comments on commit 0bbfbd9

Please sign in to comment.