Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterqueue] Bare minimum frame of queue batcher + unit test. #11532

Merged
merged 10 commits into from
Oct 26, 2024
Prev Previous commit
Next Next commit
Edits according to Dmitrii's comment
  • Loading branch information
sfc-gh-sili committed Oct 25, 2024
commit 095533404f324875ece8470c1eb808da9e95a38d
24 changes: 14 additions & 10 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Batcher struct {

exportFunc func(context.Context, internal.Request) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not Request.export?

Copy link
Contributor Author

@sfc-gh-sili sfc-gh-sili Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
This was modeled after consumeFunc used in consumer but it seems having req.Export() here is enough for the use case.


readingBatch *batch
currentBatch *batch // the batch that is being built
timer *time.Timer
shutdownCh chan bool

Expand All @@ -55,8 +55,8 @@ func (qb *Batcher) flush(batchToFlush batch) {
}
}

// allocateFlusher() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available.
func (qb *Batcher) allocateFlusher(batchToFlush batch) {
// flushAsync() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available.
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
func (qb *Batcher) flushAsync(batchToFlush batch) {
// maxWorker = 0 means we don't limit the number of flushers.
if qb.maxWorkers == 0 {
qb.stopWG.Add(1)
Expand Down Expand Up @@ -87,7 +87,10 @@ func (qb *Batcher) Start(ctx context.Context, host component.Host) error {
panic("not implemented")
}

// This goroutine keeps reading until flush is triggered because of request size.
// This goroutine reads and then flushes if request reaches size limit. There are two operations in this
// goroutine that could be blocking:
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped.
// 2. flushAsync() blocks until there are idle workers in the worker pool.
qb.stopWG.Add(1)
go func() {
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
defer qb.stopWG.Done()
Expand All @@ -96,25 +99,26 @@ func (qb *Batcher) Start(ctx context.Context, host component.Host) error {

if !ok {
qb.shutdownCh <- true
if qb.readingBatch != nil {
panic("batching is supported yet so reading batch should always be nil")
if qb.currentBatch != nil {
panic("batching is not supported yet so reading batch should always be nil")
}

return
}
if !qb.batchCfg.Enabled {
qb.readingBatch = &batch{
qb.currentBatch = &batch{
req: req,
ctx: context.Background(),
idxList: []uint64{idx}}
qb.allocateFlusher(*qb.readingBatch)
qb.readingBatch = nil
qb.flushAsync(*qb.currentBatch)
qb.currentBatch = nil
} else {
panic("not implemented")
}
}
}()

// The following goroutine is in charge of listening to timer and shutdown signal. This is a seperate goroutine
// from the reading-flushing, because we want to keep timer seperate blocking operations.
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
Expand Down