Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 22 additions & 45 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package internal

import (
"context"
"errors"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -97,75 +96,53 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}))
}

func Benchmark_QueueUsage_10000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 50000)
func Benchmark_QueueUsage_1000_requests(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 1000)
}

func Benchmark_QueueUsage_10000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 50000)
func Benchmark_QueueUsage_100000_requests(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 100000)
}

func Benchmark_QueueUsage_50000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 1, 50000)
func Benchmark_QueueUsage_10000_items(b *testing.B) {
// each request has 10 items: 1000 requests = 10000 items
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000)
}

func Benchmark_QueueUsage_50000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 10, 50000)
}

func Benchmark_QueueUsage_10000_requests_1_250000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 250000)
}

func Benchmark_QueueUsage_10000_requests_10_250000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 250000)
}

func Benchmark_QueueUsage_1M_items_10_250k(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 250000)
}

func Benchmark_QueueUsage_1M_items_10_1M(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 1000000)
}

func Benchmark_QueueUsage_100M_items_10_10M(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000000, 10, 10000000)
func Benchmark_QueueUsage_1M_items(b *testing.B) {
// each request has 10 items: 100000 requests = 1M items
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000)
}

func TestQueueUsage(t *testing.T) {
t.Run("with enough workers", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 5, 1000)
t.Run("requests_based", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10)
})
t.Run("past capacity", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 2, 50000)
t.Run("items_based", func(t *testing.T) {
queueUsage(t, &ItemsSizer[fakeReq]{}, 10)
})
}

func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], capacity int, numConsumers int,
numberOfItems int) {
func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queueUsage(b, sizer, capacity, numConsumers, numberOfItems)
queueUsage(b, sizer, requestsCount)
}
}

func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers int, numberOfItems int) {
func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) {
var wg sync.WaitGroup
wg.Add(numberOfItems)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: capacity})
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error {
wg.Add(requestsCount)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: 10 * requestsCount})
consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error {
wg.Done()
return nil
})
require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
if err := q.Offer(context.Background(), fakeReq{10}); errors.Is(err, ErrQueueIsFull) {
wg.Done()
}
for j := 0; j < requestsCount; j++ {
require.NoError(tb, q.Offer(context.Background(), fakeReq{10}))
}
assert.NoError(tb, consumers.Shutdown(context.Background()))

wg.Wait()
}

Expand Down