✨ Use a buffer to optimize priority queue AddWithOpts performance#3415
✨ Use a buffer to optimize priority queue AddWithOpts performance#3415k8s-ci-robot merged 1 commit intokubernetes-sigs:mainfrom
Conversation
|
Hi @zach593. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/ok-to-test Sorry for the other pings, I didn't see this PR before. Executing the provided benchmark on main takes more than a 100 seconds for me on my m2 pro, so I agree there is clearly a problem. It is a bit surprising that it takes almost seven times more on my device as opposed to what you got but that difference probably doesn't really matter much, bottom line is its too slow. Using the provided benchmark to compare this with #3416, I get these results: Going further and comparing #3416 with the upstream workqueue, I get these results (worth noticing that the upstream q has a crazy high variance though, so something is weird there but I didn't manage to figure out what): Upstream diff for the benchmarkdiff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go
index 9bf22c4b9ab..0cbc928ad57 100644
--- a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go
+++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go
@@ -23,6 +23,33 @@ import (
testingclock "k8s.io/utils/clock/testing"
)
+func BenchmarkAddLockContended(b *testing.B) {
+ q := NewTypedWithConfig(TypedQueueConfig[int]{
+ MetricsProvider: noopMetricsProvider{},
+ })
+ defer q.ShutDown()
+
+ for i := range 1000000 {
+ q.Add(i)
+ }
+
+ for range 1000 {
+ go func() {
+ for {
+ item, _ := q.Get()
+ time.Sleep(1 * time.Millisecond)
+ q.Done(item)
+ }
+ }()
+ }
+
+ for b.Loop() {
+ for i := range 1000 {
+ q.Add(i)
+ }
+ }
+}
+
func TestRateLimitingQueue(t *testing.T) {
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
queue := NewRateLimitingQueue(limiter).(*rateLimitingType[any])So #3416 seems to be slower than this, but still somewhat faster than upstream? I am mostly just bringing that up because I'd prefer if we first got that one in, we could still do a version of this after if you want, since its effectively just trading a bit of mem for reduced lock contention. /cc @sbueringer |
| log logr.Logger | ||
|
|
||
| bufferLock sync.Mutex | ||
| buffer map[addOpts][]T |
There was a problem hiding this comment.
Why not a slice that combins addOpts and key? That way we'd preserve order and it would simplify the code quite a bit
There was a problem hiding this comment.
Updated. Using slices does simplify the code structure, but it comes at the cost of additional overhead, since each slice needs to carry AddOpts. Benchmarks show that introducing slices increases the overall cost by about 70%. However, given that the baseline latency is very low, this overhead should still be within an acceptable range.
Still with 1,000,000 preloaded items, 1,000 workers, 1ms reconcile cost.
use map:
$ go test sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue -bench=BenchmarkAddLockContended -run=^$ -v -benchtime=10s -count=10
goos: linux
goarch: amd64
pkg: sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue
cpu: 12th Gen Intel(R) Core(TM) i9-12900K
BenchmarkAddLockContended
BenchmarkAddLockContended-8 159739 72763 ns/op
BenchmarkAddLockContended-8 266252 42454 ns/op
BenchmarkAddLockContended-8 187652 74544 ns/op
BenchmarkAddLockContended-8 278227 43991 ns/op
BenchmarkAddLockContended-8 284910 42860 ns/op
BenchmarkAddLockContended-8 290059 43863 ns/op
BenchmarkAddLockContended-8 280312 42673 ns/op
BenchmarkAddLockContended-8 281852 41656 ns/op
BenchmarkAddLockContended-8 280782 46362 ns/op
BenchmarkAddLockContended-8 266382 47360 ns/op
PASS
ok sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue 125.617s
use slice:
$ go test sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue -bench=BenchmarkAddLockContended -run=^$ -v -benchtime=10s -count=10
goos: linux
goarch: amd64
pkg: sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue
cpu: 12th Gen Intel(R) Core(TM) i9-12900K
BenchmarkAddLockContended
BenchmarkAddLockContended-8 153133 92869 ns/op
BenchmarkAddLockContended-8 163017 100041 ns/op
BenchmarkAddLockContended-8 214800 80015 ns/op
BenchmarkAddLockContended-8 150663 86073 ns/op
BenchmarkAddLockContended-8 213728 65746 ns/op
BenchmarkAddLockContended-8 145710 94048 ns/op
BenchmarkAddLockContended-8 195998 80824 ns/op
BenchmarkAddLockContended-8 132121 87749 ns/op
BenchmarkAddLockContended-8 165680 81779 ns/op
BenchmarkAddLockContended-8 144142 85884 ns/op
PASS
ok sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue 155.121s
|
@alvaroaleman Indeed, #3416 appears to address the cost of |
|
@zach593 It is merged |
f4f4ee2 to
2e3ff1a
Compare
|
updated benchmark result: test function: func BenchmarkAddLockContended(b *testing.B) {
q := New[int]("")
defer q.ShutDown()
for i := range 1000000 {
q.Add(i)
}
for range 1000 {
go func() {
for {
item, _ := q.Get()
time.Sleep(1 * time.Millisecond)
q.Done(item)
}
}()
}
for b.Loop() {
for i := range 1000 {
q.Add(i)
}
}
}upstream(5de4c4f): optimized: With the same benchmark setup, the optimized version brings |
| // primarily for unit testing purposes. | ||
| // Successfully adding a ready item may result in an additional call to handleReadyItems(), | ||
| // but the cost is negligible. | ||
| w.flushAddBuffer() |
There was a problem hiding this comment.
Should we also do this in handleWaitingItems?
There was a problem hiding this comment.
I don’t think it’s necessary. Items won’t stay in the buffer for long, because once an item enters the buffer, handleAddBuffer() will be triggered. There might be a tiny delay, but that should be negligible for items carrying a non zero after.
In fact, in most real usage scenarios, adding a flush in Len() and handleReadyItems() is also unnecessary, since everything runs in an asynchronous environment. The only reason to keep it is to avoid odd behavior in synchronous setups and to make unit tests simpler. Otherwise, we would have to write Eventually(q.addBufferLen).Should(BeZero()) everywhere.
Signed-off-by: zach593 <zach_li@outlook.com>
|
LGTM label has been added. DetailsGit tree hash: 3d6913c3326dac3a935c8948958cf72f49b549d0 |
|
/assign Will review later today |
|
All good. Thank you very much!! /hold cancel |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: alvaroaleman, sbueringer, zach593 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
@zach593 Thank you very much for the feedback and the optimizations! |
@sbueringer I just updated the description. |
before #3416, In the implementation of the priority queue, when there are a large number of items and many workers blocked in
GetWithPriority(), calls tospin()can take a significant amount of time. This method shares the same lock withAddWithOpts(), which causesAddWithOpts()to spend 10 ms or more contending for the lock during writes.Although
AddWithOpts()supports enqueuing multiple items in a single call, the event handler as the caller passes only one item at a time. As a result, when the enqueue volume is high, a large number of items end up being blocked in theprocessorListener’s ring buffer.After merging #3416, the time consumption of
handleReadyItems()(i.e.,spin()) has been significantly reduced, but due to the still existing lock contention issue, there is still room for optimization in the time consumption ofAddWithOpts().benchmark function:
upstream(5de4c4f) with 1,000,000 preloaded items, 1,000 workers, 1ms reconcile cost:
optimized:
With the same benchmark setup, the optimized version brings
BenchmarkAddLockContended()down from ~1 ms/op to ~80 µs/op, showing a 10–15× improvement under high contention.