Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterqueue] Bare minimum frame of queue batcher + unit test. #11532

Merged
merged 10 commits into from
Oct 26, 2024
Prev Previous commit
Next Next commit
addressing bogdan's comments
  • Loading branch information
sfc-gh-sili committed Oct 25, 2024
commit 3bbfd6a29597a8d6226f7552491b7f287f78952c
25 changes: 14 additions & 11 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,46 @@ type batch struct {
idxList []uint64
}

// TODO
type Batcher struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the struct please.

batchCfg exporterbatcher.Config

queue Queue[internal.Request]
maxWorkers int

exportFunc func(context.Context, internal.Request) error

currentBatch *batch // the batch that is being built
timer *time.Timer
shutdownCh chan bool

stopWG sync.WaitGroup
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request],
maxWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher {
func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) *Batcher {
return &Batcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
shutdownCh: make(chan bool, 1),
}
}

// If preconditions pass, flush() take an item from the head of batch list and exports it.
// flush take an item from the head of batch list and exports it.
func (qb *Batcher) flush(batchToFlush batch) {
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
err := batchToFlush.req.Export(batchToFlush.ctx)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
}

// flushAsync() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available.
// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *Batcher) flushAsync(batchToFlush batch) {
// maxWorker = 0 means we don't limit the number of flushers.
if qb.maxWorkers == 0 {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
qb.flush(batchToFlush)
qb.stopWG.Done()
}()
return
}
Expand All @@ -71,6 +68,9 @@ func (qb *Batcher) flushAsync(batchToFlush batch) {

// Start ensures that queue and all consumers are started.
func (qb *Batcher) Start(ctx context.Context, host component.Host) error {
// TODO: queue start is done here to keep the behavior similar to queue consumer.
// However batcher should not be responsible for starting down the queue. Move this up to
// queue sender once queue consumer is cleaned up.
if err := qb.queue.Start(ctx, host); err != nil {
return err
}
Expand Down Expand Up @@ -117,8 +117,8 @@ func (qb *Batcher) Start(ctx context.Context, host component.Host) error {
}
}()

// The following goroutine is in charge of listening to timer and shutdown signal. This is a seperate goroutine
// from the reading-flushing, because we want to keep timer seperate blocking operations.
// The following goroutine is in charge of listening to timer and shutdown signal. This is a separate goroutine
// from the reading-flushing, because we want to keep timer separate blocking operations.
qb.stopWG.Add(1)
go func() {
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
defer qb.stopWG.Done()
Expand All @@ -137,6 +137,9 @@ func (qb *Batcher) Start(ctx context.Context, host component.Host) error {

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *Batcher) Shutdown(ctx context.Context) error {
// TODO: queue shutdown is done here to keep the behavior similar to queue consumer.
// However batcher should not be responsible for shutting down the queue. Move this up to
// queue sender once queue consumer is cleaned up.
if err := qb.queue.Shutdown(ctx); err != nil {
return err
}
Expand Down
6 changes: 1 addition & 5 deletions exporter/internal/queue/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import (
"go.opentelemetry.io/collector/exporter/internal"
)

func testExportFunc(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
}

func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false
Expand All @@ -32,7 +28,7 @@ func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) {
})

maxWorkers := 0
ba := NewBatcher(cfg, q, maxWorkers, testExportFunc)
ba := NewBatcher(cfg, q, maxWorkers)

require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand Down