Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update queue size after the element is done exported #12399

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/ensure-queue-size-update.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update queue size after the element is done exported

# One or more tracking issues or pull requests related to the change
issues: [12399]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: After this change the active queue size will include elements in the process of being exported.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
24 changes: 12 additions & 12 deletions exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestBatchSender_Merge(t *testing.T) {
assert.Equal(t, int64(1), sink.RequestsCount())
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 2 && sink.ItemsCount() == 15
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)
})
}
for _, tt := range tests {
Expand Down Expand Up @@ -160,12 +160,12 @@ func TestBatchSender_BatchExportError(t *testing.T) {
errReq := &requesttest.FakeRequest{Items: 20, ExportErr: errors.New("transient error"), Sink: sink}
require.NoError(t, be.Send(context.Background(), errReq))

// the batch should be dropped since the queue doesn't have requeuing enabled.
// the batch should be dropped since the queue doesn't have re-queuing enabled.
assert.Eventually(t, func() bool {
return sink.RequestsCount() == tt.expectedRequests &&
sink.ItemsCount() == tt.expectedItems &&
be.queue.Size() == 0
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

require.NoError(t, be.Shutdown(context.Background()))
})
Expand Down Expand Up @@ -194,13 +194,13 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 1 && sink.ItemsCount() == 8
}, 500*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// big request should be broken down into two requests, both are sent right away.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 3 && sink.ItemsCount() == 25
}, 500*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// request that cannot be split should be dropped.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{
Expand All @@ -212,7 +212,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 5 && sink.ItemsCount() == 38
}, 500*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)
require.NoError(t, be.Shutdown(context.Background()))
})
}
Expand Down Expand Up @@ -370,20 +370,20 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 1 && sink.ItemsCount() == 4
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// the 3rd request should be flushed by itself due to flush interval
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 2 && sink.ItemsCount() == 6
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// the 4th and 5th request should be flushed in the same batched request by max concurrency limit.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 3 && sink.ItemsCount() == 10
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink}))
Expand All @@ -392,15 +392,15 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
// in case of MaxSizeItems=10, wait for the leftover request to send
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 5 && sink.ItemsCount() == 21
}, 50*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)
}

require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink}))
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 6, Sink: sink}))
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 20, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)
})
}
}
Expand Down Expand Up @@ -648,7 +648,7 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.LessOrEqual(c, int64(1), sink.RequestsCount())
assert.EqualValues(c, 8, sink.ItemsCount())
}, 500*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

require.NoError(t, be.Shutdown(context.Background()))
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterqueue/async_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 11; j++ {
for j := 0; j < 10; j++ {
assert.NoError(t, ac.Offer(ctx, 1))
}
assert.ErrorIs(t, ac.Offer(ctx, 3), context.Canceled)
Expand Down
20 changes: 17 additions & 3 deletions exporter/exporterqueue/disabled_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var donePool = sync.Pool{

func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
return &disabledQueue[T]{
sizer: &requestSizer[T]{},
consumeFunc: consumeFunc,
size: &atomic.Int64{},
}
Expand All @@ -28,14 +29,18 @@ type disabledQueue[T any] struct {
component.StartFunc
component.ShutdownFunc
consumeFunc ConsumeFunc[T]
sizer sizer[T]
size *atomic.Int64
}

func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
elSize := d.sizer.Sizeof(req)
d.size.Add(elSize)

done := donePool.Get().(*blockingDone)
d.size.Add(1)
done.queue = d
done.elSize = elSize
d.consumeFunc(ctx, req, done)
defer d.size.Add(-1)
// Only re-add the blockingDone instance back to the pool if successfully received the
// message from the consumer which guarantees consumer will not use that anymore,
// otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close.
Expand All @@ -48,6 +53,10 @@ func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
}
}

func (d *disabledQueue[T]) onDone(elSize int64) {
d.size.Add(-elSize)
}

// Size returns the current number of blocked requests waiting to be processed.
func (d *disabledQueue[T]) Size() int64 {
return d.size.Load()
Expand All @@ -59,9 +68,14 @@ func (d *disabledQueue[T]) Capacity() int64 {
}

type blockingDone struct {
ch chan error
queue interface {
onDone(int64)
}
elSize int64
ch chan error
}

func (d *blockingDone) OnDone(err error) {
d.queue.onDone(d.elSize)
d.ch <- err
}
44 changes: 37 additions & 7 deletions exporter/exporterqueue/memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"go.opentelemetry.io/collector/component"
)

var sizeDonePool = sync.Pool{
New: func() any {
return &sizeDone{}
},
}

var errInvalidSize = errors.New("invalid element size")

// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
Expand Down Expand Up @@ -91,11 +97,11 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
defer sq.mu.Unlock()

for {
if sq.size > 0 {
if sq.items.hasElements() {
elCtx, el, elSize := sq.items.pop()
sq.size -= elSize
sq.hasMoreSpace.Signal()
return elCtx, el, noopDoneInst, true
sd := sizeDonePool.Get().(*sizeDone)
sd.reset(elSize, sq)
return elCtx, el, sd, true
}

if sq.stopped {
Expand All @@ -109,6 +115,13 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
}
}

func (sq *memoryQueue[T]) onDone(elSize int64) {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.size -= elSize
sq.hasMoreSpace.Signal()
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (sq *memoryQueue[T]) Shutdown(context.Context) error {
sq.mu.Lock()
Expand Down Expand Up @@ -142,6 +155,7 @@ type linkedQueue[T any] struct {

func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
n := &node[T]{ctx: ctx, data: data, size: size}
// If tail is nil means list is empty so update both head and tail to point to same element.
if l.tail == nil {
l.head = n
l.tail = n
Expand All @@ -151,18 +165,34 @@ func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
l.tail = n
}

func (l *linkedQueue[T]) hasElements() bool {
return l.head != nil
}

func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
n := l.head
l.head = n.next
// If it gets to the last element, then update tail as well.
if l.head == nil {
l.tail = nil
}
n.next = nil
return n.ctx, n.data, n.size
}

type noopDone struct{}
type sizeDone struct {
size int64
queue interface {
onDone(int64)
}
}

func (*noopDone) OnDone(error) {}
func (sd *sizeDone) reset(size int64, queue interface{ onDone(int64) }) {
sd.size = size
sd.queue = queue
}

var noopDoneInst = &noopDone{}
func (sd *sizeDone) OnDone(error) {
defer sizeDonePool.Put(sd)
sd.queue.onDone(sd.size)
}
47 changes: 32 additions & 15 deletions exporter/exporterqueue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ var (
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)

var indexDonePool = sync.Pool{
New: func() any {
return &indexDone{}
},
}

type persistentQueueSettings[T any] struct {
sizer sizer[T]
capacity int64
Expand Down Expand Up @@ -292,16 +298,9 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
pq.hasMoreSpace.Signal()
}
if consumed {
pq.queueSize -= pq.set.sizer.Sizeof(req)
// The size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if pq.queueSize < 0 {
pq.queueSize = 0
}
pq.hasMoreSpace.Signal()

return context.Background(), req, indexDone[T]{index: index, pq: pq}, true
id := indexDonePool.Get().(*indexDone)
id.reset(index, pq.set.sizer.Sizeof(req), pq)
return context.Background(), req, id, true
}
}

Expand Down Expand Up @@ -348,7 +347,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
}

// onDone should be called to remove the item of the given index from the queue once processing is finished.
func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
Expand All @@ -359,6 +358,15 @@ func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
pq.mu.Unlock()
}()

pq.queueSize -= elSize
// The size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if pq.queueSize < 0 {
pq.queueSize = 0
}
pq.hasMoreSpace.Signal()

if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
Expand Down Expand Up @@ -555,11 +563,20 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
return val, nil
}

type indexDone[T any] struct {
type indexDone struct {
index uint64
pq *persistentQueue[T]
size int64
queue interface {
onDone(uint64, int64, error)
}
}

func (id *indexDone) reset(index uint64, size int64, queue interface{ onDone(uint64, int64, error) }) {
id.index = index
id.size = size
id.queue = queue
}

func (id indexDone[T]) OnDone(err error) {
id.pq.onDone(id.index, err)
func (id *indexDone) OnDone(err error) {
id.queue.onDone(id.index, id.size, err)
}
Loading
Loading