-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[chore] [exporterhelper] Integrate capacity limiting into the communication channel #9232
[chore] [exporterhelper] Integrate capacity limiting into the communication channel #9232
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #9232 +/- ##
==========================================
- Coverage 91.87% 91.86% -0.02%
==========================================
Files 360 361 +1
Lines 16717 16722 +5
==========================================
+ Hits 15359 15361 +2
- Misses 1020 1024 +4
+ Partials 338 337 -1 ☔ View full report in Codecov by Sentry. |
e7abbb5
to
f994b06
Compare
e9e6ea6
to
6b47a4d
Compare
e57ef73
to
92d5cca
Compare
65d4ef3
to
f45b7e6
Compare
86baed4
to
8827d63
Compare
8827d63
to
418f5ba
Compare
418f5ba
to
70d723d
Compare
308ee38
to
cd3b7c6
Compare
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
cd3b7c6
to
04e3a1f
Compare
|
||
// withPreloadElements puts the elements into the queue with the given size. It's used by the persistent queue to | ||
// initialize the queue with the elements recovered from the disk. | ||
func withPreloadElements[T any](els []T, totalSize int64) sizedElementsChannelOption[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To simplify this, probably next PR, we need to have 2 sizes for persistent:
- in memory size which is fixed and smaller than the storage;
- storage size which is how much to store in the storage;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good 👍
// syncSize updates the used size to 0 if the queue is empty. | ||
// The caller must ensure that this call is not called concurrently with enqueue. | ||
// It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always | ||
// the case in case if the queue size is restored from the disk after a crash. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why? Because we don't calculate the size correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we don't flush the current queue size on the disk on every read/write. Should I mention that in the comment?
// enqueue puts the element into the queue with the given sized if there is enough capacity. | ||
// Returns an error if the queue is full. The callback is called before the element is committed to the queue. | ||
// If the callback returns an error, the element is not put into the queue and the error is returned. | ||
func (vcq *sizedElementsChannel[T]) enqueue(el T, size int64, callback func() error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment that size MUST be positive. We can even consider to change it to uint64? That my make the -size ugly to use with atomics, but I think it is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the comment. The atomic size field used to be in uint64, and then you asked to change it to int64 to simplify the subtraction. Do you think we should accept uint64 and convert it to int64?
opt(sech) | ||
} | ||
if sech.ch == nil { | ||
sech.ch = make(chan T, capacity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a problem if we switch to use bytes, since it will be very large and lots of memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. We can think of a solution outside of this PR?
d9d6892
to
4afa371
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ship it :)
4afa371
to
aae0856
Compare
Integrate capacity limiting into internal channels used by both memory and persistent queues. Otherwise, with the independent capacity limiter, it's hard to ensure that queue size is always accurate going forward.
aae0856
to
999718d
Compare
…cation channel (open-telemetry#9232) Integrate capacity limiting into internal channels used by both memory and persistent queues. Otherwise, with the independent capacity limiter, it's hard to ensure that queue size is always accurate going forward. Benchmarks before: ``` goos: darwin goarch: arm64 Benchmark_QueueUsage_1000_requests-10 3252 325010 ns/op 246059 B/op 10 allocs/op Benchmark_QueueUsage_100000_requests-10 39 29811116 ns/op 24002870 B/op 10 allocs/op Benchmark_QueueUsage_10000_items-10 3404 349753 ns/op 246052 B/op 10 allocs/op Benchmark_QueueUsage_1M_items-10 40 29415583 ns/op 24002858 B/op 10 allocs/op BenchmarkPersistentQueue_TraceSpans BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_1-10 338180 3836 ns/op 2851 B/op 78 allocs/op BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_10-10 81369 15822 ns/op 14598 B/op 289 allocs/op BenchmarkPersistentQueue_TraceSpans/#traces:_10_#spansPerTrace:_10-10 13066 90155 ns/op 130087 B/op 2417 allocs/op ``` Benchmarks after: ``` Benchmark_QueueUsage_1000_requests-10 4210 278175 ns/op 246055 B/op 10 allocs/op Benchmark_QueueUsage_100000_requests-10 42 25835945 ns/op 24002968 B/op 10 allocs/op Benchmark_QueueUsage_10000_items-10 4376 279571 ns/op 246056 B/op 10 allocs/op Benchmark_QueueUsage_1M_items-10 42 26483907 ns/op 24002995 B/op 10 allocs/op BenchmarkPersistentQueue_TraceSpans BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_1-10 328268 4251 ns/op 2854 B/op 78 allocs/op BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_10-10 101683 12238 ns/op 14582 B/op 289 allocs/op BenchmarkPersistentQueue_TraceSpans/#traces:_10_#spansPerTrace:_10-10 13382 86464 ns/op 130154 B/op 2417 allocs/op ```
Integrate capacity limiting into internal channels used by both memory and persistent queues. Otherwise, with the independent capacity limiter, it's hard to ensure that queue size is always accurate going forward.
Benchmarks before:
Benchmarks after: