-
Notifications
You must be signed in to change notification settings - Fork 532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributor pushes to ingesters through a goroutines pool #6660
Conversation
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
@@ -292,6 +293,22 @@ func TestDistributor_Push(t *testing.T) { | |||
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009 | |||
`, | |||
}, | |||
"A push to 3 happy ingesters using batch worker gouroutines should succeed": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT if we add another test in which we show, similarly to what is done in dskit
, that setting cfg.IngesterPushWorkerGoroutines
to a positive value, actually uses that number of workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the idea of counting goroutines here, with all the services and stuff going on, that can easily break. However, I've added another test below that wraps the workers with a function that counts the calls and checks the counter. This also served me to see that service wasn't properly created, this test wasn't calling configure
, and workers weren't actually used 🤦
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this test.
pkg/distributor/distributor.go
Outdated
@@ -205,6 +212,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { | |||
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") | |||
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", false, "Enable pooling of buffers used for marshaling write requests.") | |||
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.") | |||
f.IntVar(&cfg.IngesterPushWorkerGoroutines, "distributor.ingester-push-worker-goroutines", 0, "Number of pre-allocated worker goroutines used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-distributor.ingester-push-worker-goroutines
sounds like "I am going to use at most this number of goroutines for ingester push requests", whereas it means "I will reuse this number of goroutines, and spawn new ones if it is needed".
WDYT if we call this -distributor.reusable-ingester-push-workers
or -distributor.reusable-ingester-push-worker-goroutines
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
distributor.reusable-ingester-push-workers
or distributor.ingester-push-workers
is my preference. I don't really like including "goroutines" in the name of the option, seems like it leaks implementation details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to distributor.reusable-ingester-push-workers
in 690cc0c
Thank you 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about the behavior of this option. Right now, if there are no workers available we spawn new goroutines. It seems like that if there are no workers available, the distributor is under load and so spawning new goroutines would further increase the load.
Edit: Nevermind, this is a performance improvement, not intended for load shedding or something like that.
pkg/distributor/distributor.go
Outdated
@@ -205,6 +212,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { | |||
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") | |||
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", false, "Enable pooling of buffers used for marshaling write requests.") | |||
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.") | |||
f.IntVar(&cfg.IngesterPushWorkerGoroutines, "distributor.ingester-push-worker-goroutines", 0, "Number of pre-allocated worker goroutines used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
distributor.reusable-ingester-push-workers
or distributor.ingester-push-workers
is my preference. I don't really like including "goroutines" in the name of the option, seems like it leaks implementation details.
pkg/distributor/distributor.go
Outdated
// ingesterDoBatchPushGo is the Go function passed to ring.DoBatchWithOptions. | ||
// It can be nil, in which case a simple `go f()` will be used. | ||
// See Config.IngesterPushWorkerGoroutines on how to configure this. | ||
ingesterDoBatchPushGo func(func()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we use something else besides Go
here? I'm struggling to understand what this is/does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to ingesterDoBatchPushWorkers
, does that work?
I called it Go
because that's how I called the option in dskit
, I'm also open to suggestion on naming there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the new CLI flag is called distributor.reusable-ingester-push-workers
, my suggestion is:
- replace
with
ingesterDoBatchPushWorkers func(func())
reusableIngesterPushWorkersPool *concurrency.ReusableGoroutinesPool
- instantiate it as
if cfg.ReusableIngesterPushWorkers > 0 { d.reusableIngesterPushWorkersPool := concurrency.NewReusableGoroutinesPool(cfg.ReusableIngesterPushWorkers) ... }
- and use it as
ring.DoBatchOptions{ Cleanup: func() { pushReq.CleanUp(); cancel() }, IsClientError: isClientError, Go: d.reusableIngesterPushWorkersPool.Go, }
It is more intuitive what the type *concurrency.ReusableGoroutinesPool
is comparing to just func(func())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then I need to add extra logic on each call checking whether there's a pool or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to block you on this. I approved your PR.
If we don't manage to find a better attribute name, let's merge it like this.
Co-authored-by: Nick Pillitteri <56quarters@users.noreply.github.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
…th-goroutines-pool Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Thank you for your reviews, @duricanikolic, @56quarters, I've addressed your feedback, please take a look when possible 🙏. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thank you
I'm going to close this PR and perform a different approach described in golang/go#18138: just growing the stack in the distributor call. That seems less complex, more local, and easier to revert in the long term. |
The |
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
@56quarters merging this as I don't see any blocking feedback, I'll be happy to apply any changes afterwards if you find something. |
What this PR does
This updates dskit to include grafana/dskit#431 and takes advantage of that to introduce a new flag
distributor.ingester-push-worker-goroutines
that can be configured to use a worker goroutines pool when sending push requests from the distributors to the ingesters, instead of creating new goroutines all the time.This should reduce the amount of stack allocations happening, which take around 6% of distributor CPU time:
Which issue(s) this PR fixes or relates to
Ref: golang/go#18138
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]