-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[exporterqueue] Bare minimum frame of queue batcher + unit test. #11532
Conversation
7365e27
to
7be491d
Compare
7be491d
to
a5458ed
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #11532 +/- ##
==========================================
- Coverage 91.46% 91.45% -0.01%
==========================================
Files 435 438 +3
Lines 23757 23827 +70
==========================================
+ Hits 21729 21791 +62
- Misses 1650 1658 +8
Partials 378 378 ☔ View full report in Codecov by Sentry. |
exporter/internal/queue/batcher.go
Outdated
queue Queue[internal.Request] | ||
maxWorkers int | ||
|
||
exportFunc func(context.Context, internal.Request) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not Request.export
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
This was modeled after consumeFunc
used in consumer
but it seems having req.Export()
here is enough for the use case.
exporter/internal/queue/batcher.go
Outdated
idxList []uint64 | ||
} | ||
|
||
type Batcher struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on the struct please.
exporter/internal/queue/batcher.go
Outdated
} | ||
} | ||
|
||
// If preconditions pass, flush() take an item from the head of batch list and exports it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In golang, comments start with func name. flush
exporter/internal/queue/batcher.go
Outdated
go func() { | ||
qb.flush(batchToFlush) | ||
qb.stopWG.Done() | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a best practice to ensure function gets called on all possible "return paths". Even though now is a simple func may get more complicated.
go func() { | |
qb.flush(batchToFlush) | |
qb.stopWG.Done() | |
}() | |
go func() { | |
defer qb.stopWG.Done() | |
qb.flush(batchToFlush) | |
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the advice!
exporter/internal/queue/batcher.go
Outdated
return err | ||
} | ||
|
||
if qb.batchCfg.Enabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think would be simpler if Batcher is an interface, and the "not enabled" and "enabled" are 2 different implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea.
Implemented a special-case batcher called DisabledBatcher
to handle the "not enabled" case. Let me know what you think of it.
20a8bb1
to
fc658a0
Compare
654fb15
to
e42ea72
Compare
exporter/internal/queue/batcher.go
Outdated
stopWG sync.WaitGroup | ||
} | ||
|
||
func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) Batcher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it return error instead of panic.
#### Description This PR follows #11532 and implements support for limited worker pool for queue batcher. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue #8122 #10368
…n-telemetry#11532) #### Description This PR is a bare minimum implementation of a component called queue batcher. On completion, this component will replace `consumers` in `queue_sender`, and thus moving queue-batch from a pulling model instead of pushing model. Limitations of the current code * This implements only the case where batching is disabled, which means no merge of splitting of requests + no timeout flushing. * This implementation does not enforce an upper bound on concurrency All these code paths are marked as panic currently, and they will be replaced with actual implementation in coming PRs. This PR is split from open-telemetry#11507 for easier review. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…elemetry#11540) #### Description This PR follows open-telemetry#11532 and implements support for limited worker pool for queue batcher. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…n-telemetry#11532) #### Description This PR is a bare minimum implementation of a component called queue batcher. On completion, this component will replace `consumers` in `queue_sender`, and thus moving queue-batch from a pulling model instead of pushing model. Limitations of the current code * This implements only the case where batching is disabled, which means no merge of splitting of requests + no timeout flushing. * This implementation does not enforce an upper bound on concurrency All these code paths are marked as panic currently, and they will be replaced with actual implementation in coming PRs. This PR is split from open-telemetry#11507 for easier review. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…elemetry#11540) #### Description This PR follows open-telemetry#11532 and implements support for limited worker pool for queue batcher. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
Description
This PR is a bare minimum implementation of a component called queue batcher. On completion, this component will replace
consumers
inqueue_sender
, and thus moving queue-batch from a pulling model instead of pushing model.Limitations of the current code
All these code paths are marked as panic currently, and they will be replaced with actual implementation in coming PRs. This PR is split from #11507 for easier review.
Design doc:
https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing
Link to tracking issue
#8122
#10368
Testing
Documentation