-
Notifications
You must be signed in to change notification settings - Fork 680
HA dedup on every sample #13665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
HA dedup on every sample #13665
Changes from all commits
f5690c5
c4b3b2e
d436cf4
6a36c51
3851429
8eb17c0
35e94e6
453e84f
978afb8
dc8f311
901c578
4a0fc66
61e13ea
7a6c088
730490e
291897a
0f77395
03e673e
bb177a5
6299787
dc76acf
7739ac3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for running the benchmarks. it looks like there's nothing to worry about, but imo worth running the cases with HA tracker without your diff too. normally it's enough to post this as a comment, but committed works too. Just don't forget to delete it before this PR is merged |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ import ( | |
| "github.com/grafana/dskit/limiter" | ||
| "github.com/grafana/dskit/middleware" | ||
| "github.com/grafana/dskit/mtime" | ||
| "github.com/grafana/dskit/multierror" | ||
| "github.com/grafana/dskit/ring" | ||
| ring_client "github.com/grafana/dskit/ring/client" | ||
| "github.com/grafana/dskit/services" | ||
|
|
@@ -1132,6 +1133,149 @@ func (d *Distributor) wrapPushWithMiddlewares(next PushFunc) PushFunc { | |
|
|
||
| } | ||
|
|
||
| type replicaState int | ||
|
|
||
| const ( | ||
| // replicaRejectedUnknown sample is rejected due to an unknown error. | ||
| replicaRejectedUnknown replicaState = 0 | ||
| // replicaIsPrimary sample is from the elected primary replica and should be accepted. | ||
| replicaIsPrimary replicaState = 1 << iota | ||
| // replicaNotHA sample doesn't have both HA labels and should be accepted. | ||
| replicaNotHA | ||
| // replicaDeduped sample is from a non-primary replica and should be deduplicated. | ||
| replicaDeduped | ||
| // replicaRejectedTooManyClusters sample is rejected because the tenant has too many HA clusters. | ||
| replicaRejectedTooManyClusters | ||
|
|
||
| replicaAccepted = replicaIsPrimary | replicaNotHA | ||
| ) | ||
|
|
||
| type haReplica struct { | ||
| cluster, replica string | ||
| } | ||
|
|
||
| type replicaInfo struct { | ||
| state replicaState | ||
| sampleCount int | ||
| } | ||
|
|
||
| // replicaObserved checks if a sample from a given replica should be accepted for ingestion based on HA deduplication rules. | ||
| func (d *Distributor) replicaObserved(ctx context.Context, userID string, replica haReplica, ts int64) (replicaState, error) { | ||
| span := trace.SpanFromContext(ctx) | ||
| span.SetAttributes( | ||
| // Make a copy of these, since they may be retained as tags | ||
| attribute.String("cluster", strings.Clone(replica.cluster)), | ||
| attribute.String("replica", strings.Clone(replica.replica)), | ||
| ) | ||
|
|
||
| isAccepted, err := d.checkSample(ctx, userID, replica.cluster, replica.replica, ts) | ||
| if err != nil { | ||
| switch { | ||
| case errors.As(err, &replicasDidNotMatchError{}): | ||
| // These samples have been deduped. | ||
| return replicaDeduped, err | ||
| case errors.As(err, &tooManyClustersError{}): | ||
| return replicaRejectedTooManyClusters, err | ||
| default: | ||
| return replicaRejectedUnknown, err | ||
| } | ||
| } | ||
|
|
||
| if isAccepted { | ||
| return replicaIsPrimary, nil | ||
| } | ||
| // If there wasn't an error but isAccepted is false that means we didn't find both HA labels. | ||
| return replicaNotHA, nil | ||
| } | ||
|
|
||
| func getReplicasFromRequest(req *mimirpb.WriteRequest, haReplicaLabel, haClusterLabel string) []haReplica { | ||
| replicas := make([]haReplica, len(req.Timeseries)) | ||
| for i, ts := range req.Timeseries { | ||
| // Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples. | ||
| replicas[i] = findHALabels(haReplicaLabel, haClusterLabel, ts.Labels) | ||
| } | ||
| return replicas | ||
| } | ||
|
|
||
| func getEarliestSampleTimestamp(req *mimirpb.WriteRequest, defaultTimestamp int64) int64 { | ||
| earliestSampleTimestamp := defaultTimestamp | ||
| for _, ts := range req.Timeseries { | ||
| if len(ts.Samples) > 0 { | ||
| tsms := ts.Samples[0].TimestampMs | ||
| if tsms < earliestSampleTimestamp { | ||
| earliestSampleTimestamp = tsms | ||
| } | ||
| } | ||
| if len(ts.Histograms) > 0 { | ||
| tsms := ts.Histograms[0].Timestamp | ||
| if tsms < earliestSampleTimestamp { | ||
| earliestSampleTimestamp = tsms | ||
| } | ||
| } | ||
| } | ||
| return earliestSampleTimestamp | ||
| } | ||
|
|
||
| func (d *Distributor) processHaReplicas(ctx context.Context, userID string, sampleTimestamp int64, replicaInfos map[haReplica]*replicaInfo) (map[replicaState]int, error) { | ||
| var errs multierror.MultiError | ||
| samplesPerState := make(map[replicaState]int) | ||
| for replicaKey, info := range replicaInfos { | ||
| if info.state == replicaRejectedUnknown { | ||
| state, replicaErr := d.replicaObserved(ctx, userID, replicaKey, sampleTimestamp) | ||
| info.state = state | ||
| if replicaErr != nil { | ||
| errs.Add(replicaErr) | ||
| } | ||
| } | ||
| samplesPerState[info.state] += info.sampleCount | ||
| } | ||
| return samplesPerState, errs.Err() | ||
| } | ||
|
|
||
| func getReplicaInfos(req *mimirpb.WriteRequest, replicas []haReplica) map[haReplica]*replicaInfo { | ||
| replicaInfos := make(map[haReplica]*replicaInfo) | ||
| // Check if all timeseries belong to the same replica | ||
| firstReplica := replicas[0] | ||
| isOneReplica := true | ||
| for i := 1; i < len(req.Timeseries); i++ { | ||
| if replicas[i] != firstReplica { | ||
| isOneReplica = false | ||
| break | ||
| } | ||
| } | ||
|
|
||
| // Count samples per replica | ||
| if isOneReplica { | ||
| numSamples := 0 | ||
| for _, ts := range req.Timeseries { | ||
| numSamples += len(ts.Samples) + len(ts.Histograms) | ||
| } | ||
| // The replica info is stored in a map where the key is the replica itself. | ||
| // The replica labels are references to the request buffer, which will be reused. | ||
| // To safely use the replica as map key, we need to clone its labels. | ||
| firstReplica.cluster = strings.Clone(firstReplica.cluster) | ||
| firstReplica.replica = strings.Clone(firstReplica.replica) | ||
| replicaInfos[firstReplica] = &replicaInfo{sampleCount: numSamples} | ||
| } else { | ||
| for i, ts := range req.Timeseries { | ||
| r := replicas[i] | ||
| info := replicaInfos[r] | ||
| if info == nil { | ||
| // The replica info is stored in a map where the key is the replica itself. | ||
| // The replica labels are references to the request buffer, which will be reused. | ||
| // To safely use the replica as map key, we need to clone its labels. | ||
| r.cluster = strings.Clone(r.cluster) | ||
| r.replica = strings.Clone(r.replica) | ||
|
|
||
| info = &replicaInfo{} | ||
| replicaInfos[r] = info | ||
| } | ||
| info.sampleCount += len(ts.Samples) + len(ts.Histograms) | ||
| } | ||
| } | ||
| return replicaInfos | ||
| } | ||
|
|
||
| func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's way too many things happening in this function already let's break it up. i'll try to leave some suggestions |
||
| return WithCleanup(next, func(next PushFunc, ctx context.Context, pushReq *Request) error { | ||
| req, err := pushReq.WriteRequest() | ||
|
|
@@ -1149,75 +1293,147 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc { | |
| } | ||
|
|
||
| haReplicaLabel := d.limits.HAReplicaLabel(userID) | ||
| cluster, replica := findHALabels(haReplicaLabel, d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels) | ||
| // Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples. | ||
| cluster, replica = strings.Clone(cluster), strings.Clone(replica) | ||
| haClusterLabel := d.limits.HAClusterLabel(userID) | ||
|
|
||
| replicas := getReplicasFromRequest(req, haReplicaLabel, haClusterLabel) | ||
|
|
||
| span := trace.SpanFromContext(ctx) | ||
| span.SetAttributes( | ||
| attribute.String("cluster", cluster), | ||
| attribute.String("replica", replica), | ||
| ) | ||
|
|
||
| numSamples := 0 | ||
| now := time.Now() | ||
|
|
||
| group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), now) | ||
| sampleTimestamp := timestamp.FromTime(now) | ||
| if d.limits.HATrackerUseSampleTimeForFailover(userID) { | ||
| earliestSampleTimestamp := sampleTimestamp | ||
| for _, ts := range req.Timeseries { | ||
| if len(ts.Samples) > 0 { | ||
| tsms := ts.Samples[0].TimestampMs | ||
| if tsms < earliestSampleTimestamp { | ||
| earliestSampleTimestamp = tsms | ||
| } | ||
| } | ||
| if len(ts.Histograms) > 0 { | ||
| tsms := ts.Histograms[0].Timestamp | ||
| if tsms < earliestSampleTimestamp { | ||
| earliestSampleTimestamp = tsms | ||
| } | ||
| } | ||
| } | ||
| sampleTimestamp = earliestSampleTimestamp | ||
| sampleTimestamp = getEarliestSampleTimestamp(req, sampleTimestamp) | ||
| } | ||
| for _, ts := range req.Timeseries { | ||
| numSamples += len(ts.Samples) + len(ts.Histograms) | ||
|
|
||
| var errs multierror.MultiError | ||
| replicaInfos := getReplicaInfos(req, replicas) | ||
|
|
||
| var clusters []string | ||
| var replicasAsStrings []string | ||
| for replicaKey := range replicaInfos { | ||
| clusters = append(clusters, replicaKey.cluster) | ||
| replicasAsStrings = append(replicasAsStrings, replicaKey.replica) | ||
| } | ||
| span.SetAttributes( | ||
| attribute.StringSlice("clusters", clusters), | ||
| attribute.StringSlice("replicas", replicasAsStrings), | ||
| ) | ||
|
|
||
| removeReplica, err := d.checkSample(ctx, userID, cluster, replica, sampleTimestamp) | ||
| if err != nil { | ||
| if errors.As(err, &replicasDidNotMatchError{}) { | ||
| // These samples have been deduped. | ||
| d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples)) | ||
| } | ||
| samplesPerState, processErr := d.processHaReplicas(ctx, userID, sampleTimestamp, replicaInfos) | ||
| if processErr != nil { | ||
| errs.Add(processErr) | ||
| } | ||
|
|
||
| if errors.As(err, &tooManyClustersError{}) { | ||
| d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples)) | ||
| d.costAttributionMgr.SampleTracker(userID).IncrementDiscardedSamples(req.Timeseries[0].Labels, float64(numSamples), reasonTooManyHAClusters, now) | ||
| } | ||
| lastAccepted := sortByAccepted(req, replicaInfos, replicas) | ||
| removeHAReplicaLabels(req, lastAccepted, replicas, replicaInfos, haReplicaLabel) | ||
|
|
||
| return err | ||
| // We don't want to send samples beyond the last accepted sample - that was deduplicated | ||
| if samplesPerState[replicaRejectedTooManyClusters] > 0 { | ||
| d.updateHADedupeMetrics(userID, group, replicaInfos, samplesPerState, req.Timeseries[lastAccepted+1].Labels) | ||
| } else { | ||
| d.updateHADedupeMetrics(userID, group, replicaInfos, samplesPerState, nil) | ||
| } | ||
| pushReq.AddCleanup(sliceUnacceptedRequests(req, lastAccepted)) | ||
|
|
||
| if removeReplica { | ||
| // If we found both the cluster and replica labels, we only want to include the cluster label when | ||
| // storing series in Mimir. If we kept the replica label we would end up with another series for the same | ||
| // series we're trying to dedupe when HA tracking moves over to a different replica. | ||
| for ix := range req.Timeseries { | ||
| req.Timeseries[ix].RemoveLabel(haReplicaLabel) | ||
| if len(req.Timeseries) > 0 { | ||
| if pushErr := next(ctx, pushReq); pushErr != nil { | ||
| errs.Add(pushErr) | ||
| } | ||
| } else { | ||
| // If there wasn't an error but removeReplica is false that means we didn't find both HA labels. | ||
| d.nonHASamples.WithLabelValues(userID).Add(float64(numSamples)) | ||
| } | ||
|
|
||
| return next(ctx, pushReq) | ||
| return errs.Err() | ||
| }) | ||
| } | ||
|
|
||
| func removeHAReplicaLabels(req *mimirpb.WriteRequest, lastAccepted int, replicas []haReplica, replicaInfos map[haReplica]*replicaInfo, haReplicaLabel string) { | ||
| for i := 0; i <= lastAccepted; i++ { | ||
| r := replicas[i] | ||
| s := replicaInfos[r].state | ||
| if s&replicaIsPrimary == 0 { | ||
| continue | ||
| } | ||
| // If we found both the cluster and replica labels, we only want to include the cluster label when | ||
| // storing series in Mimir. If we kept the replica label we would end up with another series for the same | ||
| // series we're trying to dedupe when HA tracking moves over to a different replica. | ||
| req.Timeseries[i].RemoveLabel(haReplicaLabel) | ||
| } | ||
| } | ||
|
|
||
| func sliceUnacceptedRequests(req *mimirpb.WriteRequest, lastAccepted int) func() { | ||
| originalLen := len(req.Timeseries) | ||
| req.Timeseries = req.Timeseries[:lastAccepted+1] | ||
| return func() { | ||
| // Restore the length so that we can put back all the series in the request to the memory pool | ||
| req.Timeseries = req.Timeseries[:originalLen] | ||
| } | ||
|
|
||
| } | ||
|
|
||
| // updateHADedupeMetrics updates metrics related to HA deduplication. | ||
| func (d *Distributor) updateHADedupeMetrics(userID, group string, replicaInfos map[haReplica]*replicaInfo, samplesPerState map[replicaState]int, labels []mimirpb.LabelAdapter) { | ||
| for replica, info := range replicaInfos { | ||
| if info.state&replicaDeduped != 0 && info.sampleCount > 0 { | ||
| cluster := strings.Clone(replica.cluster) // Make a copy of this, since it may be retained as labels on our metrics | ||
| d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(info.sampleCount)) | ||
| } | ||
| } | ||
| if samplesPerState[replicaNotHA] > 0 { | ||
| d.nonHASamples.WithLabelValues(userID).Add(float64(samplesPerState[replicaNotHA])) | ||
| } | ||
| if samplesPerState[replicaRejectedTooManyClusters] > 0 { | ||
| d.costAttributionMgr.SampleTracker(userID).IncrementDiscardedSamples(labels, float64(samplesPerState[replicaRejectedTooManyClusters]), reasonTooManyHAClusters, time.Now()) | ||
| d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(samplesPerState[replicaRejectedTooManyClusters])) | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // sortByAccepted returns the index of the last accepted timeseries in the write request based on the ha dedup states of the replicas | ||
| func sortByAccepted(req *mimirpb.WriteRequest, replicaInfos map[haReplica]*replicaInfo, replicas []haReplica) int { | ||
| numAcceptedReplicas := 0 | ||
| for _, info := range replicaInfos { | ||
| if info.state&replicaAccepted != 0 { | ||
| numAcceptedReplicas++ | ||
| } | ||
| } | ||
| if numAcceptedReplicas == len(replicaInfos) { | ||
| return len(req.Timeseries) - 1 | ||
| } | ||
| if numAcceptedReplicas == 0 { | ||
| return -1 | ||
| } | ||
| findPreviousAccepted := func(i int) int { | ||
| for i > 0 { | ||
| state := replicaInfos[replicas[i]].state | ||
| if state&replicaAccepted != 0 { | ||
| break | ||
| } | ||
| i-- | ||
| } | ||
| return i | ||
| } | ||
| lastAccepted := findPreviousAccepted(len(req.Timeseries) - 1) | ||
| // next we shift all accepted samples to the front of the timeseries slice | ||
| for i := range req.Timeseries { | ||
| if i > lastAccepted { | ||
| break | ||
| } | ||
| state := replicaInfos[replicas[i]].state | ||
| if state&replicaAccepted == 0 { | ||
| req.Timeseries[i], req.Timeseries[lastAccepted] = req.Timeseries[lastAccepted], req.Timeseries[i] | ||
| replicas[i], replicas[lastAccepted] = replicas[lastAccepted], replicas[i] | ||
| lastAccepted-- | ||
| lastAccepted = findPreviousAccepted(lastAccepted) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| return lastAccepted | ||
| } | ||
|
|
||
| func (d *Distributor) prePushRelabelMiddleware(next PushFunc) PushFunc { | ||
| return WithCleanup(next, func(next PushFunc, ctx context.Context, pushReq *Request) error { | ||
|
|
||
| req, err := pushReq.WriteRequest() | ||
| if err != nil { | ||
| return err | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you delete these before merging?