Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor pushes to ingesters through a goroutines pool #6660

Merged
merged 9 commits into from
Nov 17, 2023

Conversation

colega
Copy link
Contributor

@colega colega commented Nov 14, 2023

What this PR does

This updates dskit to include grafana/dskit#431 and takes advantage of that to introduce a new flag distributor.ingester-push-worker-goroutines that can be configured to use a worker goroutines pool when sending push requests from the distributors to the ingesters, instead of creating new goroutines all the time.

This should reduce the amount of stack allocations happening, which take around 6% of distributor CPU time:

image

Which issue(s) this PR fixes or relates to

Ref: golang/go#18138

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
@colega colega marked this pull request as ready for review November 14, 2023 17:46
@colega colega requested review from grafanabot and a team as code owners November 14, 2023 17:46
@@ -292,6 +293,22 @@ func TestDistributor_Push(t *testing.T) {
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
"A push to 3 happy ingesters using batch worker gouroutines should succeed": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT if we add another test in which we show, similarly to what is done in dskit, that setting cfg.IngesterPushWorkerGoroutines to a positive value, actually uses that number of workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of counting goroutines here, with all the services and stuff going on, that can easily break. However, I've added another test below that wraps the workers with a function that counts the calls and checks the counter. This also served me to see that service wasn't properly created, this test wasn't calling configure, and workers weren't actually used 🤦

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this test.

@@ -205,6 +212,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", false, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")
f.IntVar(&cfg.IngesterPushWorkerGoroutines, "distributor.ingester-push-worker-goroutines", 0, "Number of pre-allocated worker goroutines used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-distributor.ingester-push-worker-goroutines sounds like "I am going to use at most this number of goroutines for ingester push requests", whereas it means "I will reuse this number of goroutines, and spawn new ones if it is needed".
WDYT if we call this -distributor.reusable-ingester-push-workers or -distributor.reusable-ingester-push-worker-goroutines?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distributor.reusable-ingester-push-workers or distributor.ingester-push-workers is my preference. I don't really like including "goroutines" in the name of the option, seems like it leaks implementation details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to distributor.reusable-ingester-push-workers in 690cc0c

Thank you 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

Copy link
Contributor

@56quarters 56quarters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the behavior of this option. Right now, if there are no workers available we spawn new goroutines. It seems like that if there are no workers available, the distributor is under load and so spawning new goroutines would further increase the load.

Edit: Nevermind, this is a performance improvement, not intended for load shedding or something like that.

@@ -205,6 +212,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", false, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")
f.IntVar(&cfg.IngesterPushWorkerGoroutines, "distributor.ingester-push-worker-goroutines", 0, "Number of pre-allocated worker goroutines used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distributor.reusable-ingester-push-workers or distributor.ingester-push-workers is my preference. I don't really like including "goroutines" in the name of the option, seems like it leaks implementation details.

pkg/distributor/distributor.go Outdated Show resolved Hide resolved
// ingesterDoBatchPushGo is the Go function passed to ring.DoBatchWithOptions.
// It can be nil, in which case a simple `go f()` will be used.
// See Config.IngesterPushWorkerGoroutines on how to configure this.
ingesterDoBatchPushGo func(func())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we use something else besides Go here? I'm struggling to understand what this is/does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to ingesterDoBatchPushWorkers, does that work?
I called it Go because that's how I called the option in dskit, I'm also open to suggestion on naming there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the new CLI flag is called distributor.reusable-ingester-push-workers, my suggestion is:

  • replace
      ingesterDoBatchPushWorkers func(func())
    
    with
      reusableIngesterPushWorkersPool *concurrency.ReusableGoroutinesPool
    
  • instantiate it as
      if cfg.ReusableIngesterPushWorkers > 0 {
        d.reusableIngesterPushWorkersPool := concurrency.NewReusableGoroutinesPool(cfg.ReusableIngesterPushWorkers)
        ...
     }
    
  • and use it as
      ring.DoBatchOptions{
        Cleanup:       func() { pushReq.CleanUp(); cancel() },
        IsClientError: isClientError,
        Go:            d.reusableIngesterPushWorkersPool.Go,
      }
    

It is more intuitive what the type *concurrency.ReusableGoroutinesPool is comparing to just func(func()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then I need to add extra logic on each call checking whether there's a pool or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to block you on this. I approved your PR.
If we don't manage to find a better attribute name, let's merge it like this.

colega and others added 5 commits November 15, 2023 09:17
Co-authored-by: Nick Pillitteri <56quarters@users.noreply.github.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
…th-goroutines-pool

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
@colega
Copy link
Contributor Author

colega commented Nov 15, 2023

Thank you for your reviews, @duricanikolic, @56quarters, I've addressed your feedback, please take a look when possible 🙏.

Copy link
Contributor

@duricanikolic duricanikolic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Thank you

@colega
Copy link
Contributor Author

colega commented Nov 16, 2023

I'm going to close this PR and perform a different approach described in golang/go#18138: just growing the stack in the distributor call.

That seems less complex, more local, and easier to revert in the long term.

@colega colega marked this pull request as draft November 16, 2023 11:38
@colega colega marked this pull request as ready for review November 17, 2023 09:23
@colega
Copy link
Contributor Author

colega commented Nov 17, 2023

The growStack approach didn't work, so I'm continuing the work on this.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
@colega
Copy link
Contributor Author

colega commented Nov 17, 2023

@56quarters merging this as I don't see any blocking feedback, I'll be happy to apply any changes afterwards if you find something.

@colega colega merged commit 1280630 into main Nov 17, 2023
28 checks passed
@colega colega deleted the distributor-pushes-with-goroutines-pool branch November 17, 2023 09:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants