Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record admission-blocked event as a span & duration #244

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions collector/admission/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,45 @@ import (

"github.com/google/uuid"
orderedmap "github.com/wk8/go-ordered-map/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)

var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters")

type BoundedQueue struct {
maxLimitBytes int64
maxLimitBytes int64
maxLimitWaiters int64
currentBytes int64
currentWaiters int64
lock sync.Mutex
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
currentBytes int64
currentWaiters int64
lock sync.Mutex
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
tracer trace.Tracer
}

type waiter struct {
readyCh chan struct{}
readyCh chan struct{}
pendingBytes int64
ID uuid.UUID
ID uuid.UUID
}

func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
return &BoundedQueue{
maxLimitBytes: maxLimitBytes,
maxLimitBytes: maxLimitBytes,
maxLimitWaiters: maxLimitWaiters,
waiters: orderedmap.New[uuid.UUID, waiter](),
waiters: orderedmap.New[uuid.UUID, waiter](),
tracer: noop.NewTracerProvider().Tracer(""),
}
}

func NewTracedBoundedQueue(tp trace.TracerProvider, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
bq := NewBoundedQueue(maxLimitBytes, maxLimitWaiters)
bq.tracer = tp.Tracer("otel-arrow/admission")
return bq
}

func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
bq.lock.Lock()
defer bq.lock.Unlock()
Expand All @@ -42,13 +54,13 @@ func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
return false, fmt.Errorf("rejecting request, request size larger than configured limit")
}

if bq.currentBytes + pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
bq.currentBytes += pendingBytes
return true, nil
}

// since we were unable to admit, check if we can wait.
if bq.currentWaiters + 1 > bq.maxLimitWaiters { // too many waiters
if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters
return false, ErrTooManyWaiters
}

Expand All @@ -66,7 +78,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
// otherwise we need to wait for bytes to be released
curWaiter := waiter{
pendingBytes: pendingBytes,
readyCh: make(chan struct{}),
readyCh: make(chan struct{}),
}

bq.lock.Lock()
Expand All @@ -84,6 +96,9 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
}

bq.lock.Unlock()
ctx, span := bq.tracer.Start(ctx, "admission_blocked",
trace.WithAttributes(attribute.Int64("pending", pendingBytes)))
defer span.End()

select {
case <-curWaiter.readyCh:
Expand All @@ -93,6 +108,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
bq.lock.Lock()
defer bq.lock.Unlock()
err = fmt.Errorf("context canceled: %w ", ctx.Err())
span.SetStatus(codes.Error, "context canceled")

_, found := bq.waiters.Delete(curWaiter.ID)
if !found {
Expand Down Expand Up @@ -121,7 +137,7 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error {
next := bq.waiters.Oldest()
nextWaiter := next.Value
nextKey := next.Key
if bq.currentBytes + nextWaiter.pendingBytes <= bq.maxLimitBytes {
if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += nextWaiter.pendingBytes
bq.currentWaiters -= 1
close(nextWaiter.readyCh)
Expand All @@ -142,9 +158,9 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error {
func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool {
bq.lock.Lock()
defer bq.lock.Unlock()
if bq.currentBytes + pendingBytes <= bq.maxLimitBytes {
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += pendingBytes
return true
}
return false
}
}
Loading