-
Notifications
You must be signed in to change notification settings - Fork 679
HA dedup on every sample #13665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
HA dedup on every sample #13665
Conversation
9a5373e to
656ea69
Compare
On the working branch: |
656ea69 to
cbc8c2d
Compare
|
@vaxvms thank you for running the benchmarks, can I ask you to run them with |
|
I see the benchmarks/tests are failing. Is it possible to run the benchmarks without them failing? Also can you run them with |
|
Oh, @dimitarvdimitrov :) I also see that there are benchmark results with |
|
(Sorry I was so excited that I clicked the wrong button, maybe enough caffeine for me today) |
benchmarks/after.txt
Outdated
| BenchmarkDistributor_Push/too_many_labels_limit_reached-10 2209 540531 ns/op 135548 B/op 3234 allocs/op | ||
| PASS | ||
|
|
||
| Process finished with the exit code 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Benchmark output files accidentally committed to repository
The files benchmarks/after.txt and benchmarks/before.txt contain local benchmark output with machine-specific paths (e.g., /Users/dimitar/Library/Caches/JetBrains/GoLand2023.2/...). These appear to be development artifacts that were unintentionally included in the commit.
Additional Locations (1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
committed on purpose
|
@colega Thanks for pointing me the procedure to run benchs. failures seems to be different between main and the feature branch, fixing |
|
I'll take a look once you've fixed the CI & checked Cursor's comments (at least one of them, about cost attribution, seems legit). Of course feel free to challenge Cursor's statements :) |
cbc8c2d to
7c6a812
Compare
pkg/distributor/distributor.go
Outdated
| if len(req.Timeseries) > 0 { | ||
| err = next(ctx, pushReq) | ||
| } | ||
| errs.Add(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Duplicate error added to multierror when all samples rejected
When all samples in a request are rejected (all replicas deduped or rejected), the last error from replicaObserved gets added to the multierror twice. Inside the loop at line 1292-1294, err is assigned from replicaObserved and immediately added to errs. After the loop, if len(req.Timeseries) == 0 (all samples rejected), the next() call is skipped, leaving err holding the last loop error. Line 1331 then adds this same error again via errs.Add(err). This causes duplicate errors in the returned multierror, potentially resulting in confusing error messages and logs.
Additional Locations (1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fixed now
7c6a812 to
513a4a2
Compare
513a4a2 to
bc2e330
Compare
CHANGELOG.md
Outdated
| * [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525 | ||
| * [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525 | ||
| * [ENHANCEMENT] Ruler: gRPC errors without details are classified as `operator` errors, and rule evaluation failures (such as duplicate labelsets) are classified as `user` errors. #13586 | ||
| * [ENHANCEMENT] HA: Deduplication per sample instead of per batch. #13665 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * [ENHANCEMENT] HA: Deduplication per sample instead of per batch. #13665 | |
| * [ENHANCEMENT] HA: Deduplicate per sample instead of per batch. #13665 |
tacole02
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changelog looks good! I made a small suggestion.
…tributor push benchmark (#13688) change introduced by PR #12583 #### What this PR does Fix benchmark failure #### Which issue(s) this PR fixes or relates to Fixes distributor benchmark to run them for #13665 #### Checklist - [X] Tests updated. - [ ] Documentation added. - [ ] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [ ] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Updates the expected error string for max label value length to include actual (204) and limit (200) values in `BenchmarkDistributor_Push`. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 743cb98. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
bc2e330 to
b4dd196
Compare
b4dd196 to
0a7033a
Compare
|
I would suggest you to run |
|
cursor also left a few comments, which look legit, can you take a look at those too? |
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
goos: darwin goarch: arm64 pkg: github.com/grafana/mimir/pkg/distributor BenchmarkDistributor_Push BenchmarkDistributor_Push/max_label_name_length_limit_reached BenchmarkDistributor_Push/max_label_name_length_limit_reached-10 121 9844396 ns/op 124584 B/op 2158 allocs/op BenchmarkDistributor_Push/max_label_value_length_limit_reached BenchmarkDistributor_Push/max_label_value_length_limit_reached-10 153 7800602 ns/op 109439 B/op 2158 allocs/op BenchmarkDistributor_Push/timestamp_too_new BenchmarkDistributor_Push/timestamp_too_new-10 243 5008468 ns/op 89449 B/op 2085 allocs/op BenchmarkDistributor_Push/HA_dedup;_all_samples_same_replica BenchmarkDistributor_Push/HA_dedup;_all_samples_same_replica-10 1263 924608 ns/op 150549 B/op 43 allocs/op BenchmarkDistributor_Push/HA_dedup;_4_clusters_8_replicas_evenly_split BenchmarkDistributor_Push/HA_dedup;_4_clusters_8_replicas_evenly_split-10 1407 806527 ns/op 85525 B/op 99 allocs/op BenchmarkDistributor_Push/all_samples_successfully_pushed BenchmarkDistributor_Push/all_samples_successfully_pushed-10 1398 809442 ns/op 150418 B/op 42 allocs/op BenchmarkDistributor_Push/ingestion_rate_limit_reached BenchmarkDistributor_Push/ingestion_rate_limit_reached-10 1965 603205 ns/op 25324 B/op 47 allocs/op BenchmarkDistributor_Push/too_many_labels_limit_reached BenchmarkDistributor_Push/too_many_labels_limit_reached-10 213 5744598 ns/op 80979 B/op 2188 allocs/op PASS Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com> Co-authored-by: Nicolas DUPEUX <nicolas.dupeux@corp.ovh.com> Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com> Co-authored-by: Nicolas DUPEUX <nicolas.dupeux@ovhcloud.com>
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
12b4d15 to
61e13ea
Compare
| Distributor_Push/cost_attribution=disabled/scenario=HA_dedup;_all_samples_same_replica-5 1.934m ± 4% | ||
| Distributor_Push/cost_attribution=disabled/scenario=HA_dedup;_4_clusters_8_replicas_evenly_split-5 1.971m ± 4% | ||
| Distributor_Push/cost_attribution=enabled/scenario=HA_dedup;_all_samples_same_replica-5 2.078m ± 4% | ||
| Distributor_Push/cost_attribution=enabled/scenario=HA_dedup;_4_clusters_8_replicas_evenly_split-5 2.167m ± 4% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to run these with the previous implementation too? i understand that they won't do the same, but it would show us how much slower the new path is
pkg/distributor/distributor_test.go
Outdated
| switch i % 8 { | ||
| case 0: | ||
| cluster, replica = "c1", "r1" | ||
| case 1: | ||
| cluster, replica = "c1", "r2" | ||
| case 2: | ||
| cluster, replica = "c2", "r1" | ||
| case 3: | ||
| cluster, replica = "c2", "r2" | ||
| case 4: | ||
| cluster, replica = "c3", "r1" | ||
| case 5: | ||
| cluster, replica = "c3", "r2" | ||
| case 6: | ||
| cluster, replica = "c4", "r1" | ||
| case 7: | ||
| cluster, replica = "c4", "r2" | ||
| default: | ||
| panic("in the disco") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick to make this cluster, replica = i/2 + 1, i%2 + 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you delete these before merging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for running the benchmarks. it looks like there's nothing to worry about, but imo worth running the cases with HA tracker without your diff too.
normally it's enough to post this as a comment, but committed works too. Just don't forget to delete it before this PR is merged
pkg/distributor/distributor.go
Outdated
| // replicaObserved checks if a sample from a given replica should be accepted for ingestion based on HA deduplication rules. | ||
| // | ||
| // Returns a replicaState indicating the acceptance status and classification of the replica: | ||
| // - replicaIsPrimary: sample is from the elected primary replica and should be accepted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would move these descriptions as a godoc comment on each enum value
pkg/distributor/distributor.go
Outdated
| haReplicaLabel := d.limits.HAReplicaLabel(userID) | ||
| cluster, replica := findHALabels(haReplicaLabel, d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels) | ||
| haClusterLabel := d.limits.HAClusterLabel(userID) | ||
| cluster, replica := findHALabels(haReplicaLabel, haClusterLabel, req.Timeseries[0].Labels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's a bit confusing that we log only the first replicas even though we'll now ingest data from multiple replicas. can we add the tracing instrumentation further down as we're looping over the replica states?
pkg/distributor/distributor.go
Outdated
| // These samples have been deduped. | ||
| d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples)) | ||
| var errs multierror.MultiError | ||
| replicaInfos := make(map[haReplica]*replicaInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a helper function which returns you replicaInfos?
pkg/distributor/distributor.go
Outdated
| d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples)) | ||
| var errs multierror.MultiError | ||
| replicaInfos := make(map[haReplica]*replicaInfo) | ||
| samplesPerState := make(map[replicaState]int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move all of the logic for building samplesPerState (purely based on replicaInfos as far as i can tell) and then using it (updating metrics from what i can see) to a function of their own? or maybe add the creation of samplesPerState (say, call it countSamplesPerState()) as a first step in updateHADedupeMetrics . you'd need to change the if which now contains the two calls to updateHADedupeMetrics - that you may be able to do based on whether lastAccepted is a valid index or not. will that work?
pkg/distributor/distributor.go
Outdated
| samplesPerState := make(map[replicaState]int) | ||
| // Check if all timeseries belong to the same replica | ||
| firstReplica := getReplicaForSample(0) | ||
| isOneReplica := true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think we can move all the logic of getReplicaForSample, replicaInfos and getReplicaState into a struct of their own? something like
type replicaInfos struct {
isOneReplica bool
theOneReplica *haReplica
multieplReplicas map[haReplica]*replicaInfo
// maybe more things?
}
func newReplicaInfos(Timeseries []PreallocTimeseries) replicaInfos {...}
func (replicaInfos) replicaState() {...}
func (replicaInfos) replicaForSample() {...}maybe even add the countSamplesPerState i suggested above
func (replicaInfos) countSamplesPerState() map[replicaState]int {...}| }, | ||
| expectDetails: []*mimirpb.ErrorDetails{nil, replicasDidNotMatchDetails, tooManyClusterDetails, tooManyClusterDetails}, | ||
| }, { | ||
| name: "perform partial HA deduplication", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a few more tests cases? for example where we have mixed series from multiple primary replicas in a single request and where we have mixed with series which don't have a cluster label.
|
thanks for rebasing this! i'm sorry if most of this code is code i already wrote, but i think it needs a bit of cleanup before we can merge it 😅 |
|
i forgot to mention - can you also update docs/sources/mimir/configure/configure-high-availability-deduplication.md:48 to mention that now we check all samples in a batch? |
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
We loose the cluster, replica pairing, might be a bad idea
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
Signed-off-by: juliette.orain <juliette.orain@ovhcloud.com>
|
@vaxvms @julietteO is this ready for another pass or is there still things you want to address? |
What this PR does
This PR aims to handle the case where several replicas/clusters push metrics in a single request.
Without those changes, deduplications is done on a per request basis, now it can handle a per sample logic.
We rebased @dimitarvdimitrov 's branch to current main, updated the tests and handled some remaining todos.
Which issue(s) this PR fixes or relates to
Fixes #3199
Checklist
CHANGELOG.mdupdated - the order of entries should be[CHANGE],[FEATURE],[ENHANCEMENT],[BUGFIX]. If changelog entry is not needed, please add thechangelog-not-neededlabel to the PR.about-versioning.mdupdated with experimental features.Note
Refactors HA deduplication to operate per-series instead of per-request, updating distributor middleware, HA label handling, docs, tests, and benchmarks.
prePushHaDedupeMiddlewarewith replica state tracking, request slicing, and label removal (replicaState,haReplica,sortByAccepted(),removeHAReplicaLabels(), etc.).multierror.findHALabels()to return ahaReplicastruct; update call sites accordingly.numSeriesPerRequestto1024.noopIngesterfor test stability.Written by Cursor Bugbot for commit 7739ac3. This will update automatically on new commits. Configure here.