Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f5690c5
Add becnhamrks and tests for per-sample HA dedup
dimitarvdimitrov Oct 5, 2022
c4b3b2e
Implement partial HA dedup
dimitarvdimitrov Oct 23, 2022
d436cf4
Add comments
dimitarvdimitrov Oct 1, 2023
6a36c51
Optimize for single replica
dimitarvdimitrov Oct 1, 2023
3851429
optimize for single replica, improve findHALabels
dimitarvdimitrov Oct 1, 2023
8eb17c0
Rebase and complete per-sample ha deduplication
julietteO Nov 26, 2025
35e94e6
typo
vaxvms Nov 27, 2025
453e84f
Restore costAttribution manager call
vaxvms Nov 27, 2025
978afb8
Rebase and complete per-sample ha deduplication
julietteO Nov 26, 2025
dc8f311
Add updated benchmarks and stats
julietteO Dec 1, 2025
901c578
Correctly count samples for multi replica case
vaxvms Dec 1, 2025
4a0fc66
Clone string in haReplica to avoid gRPC buffer issues
julietteO Dec 1, 2025
61e13ea
Replace deprecated opentracing
julietteO Dec 1, 2025
7a6c088
Update documentation about ha tracker behavior
vaxvms Dec 3, 2025
730490e
Refacto to remove useless switch in tests
julietteO Dec 3, 2025
291897a
Add testcase on `TestHaDedupeMiddleware`
vaxvms Dec 3, 2025
0f77395
Move comment as godoc for replicaState
julietteO Dec 3, 2025
03e673e
loop is useless
vaxvms Dec 3, 2025
bb177a5
Set all seen clusters and replicas value as attribute of the trace
vaxvms Dec 3, 2025
6299787
Refacto findHALabels so it returns an haReplica
julietteO Dec 4, 2025
dc76acf
Break down prePushHaDedupeMiddleware to use helper functions
julietteO Dec 4, 2025
7739ac3
Only clone values if and when needed
julietteO Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525
* [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525
* [ENHANCEMENT] Ruler: gRPC errors without details are classified as `operator` errors, and rule evaluation failures (such as duplicate labelsets) are classified as `user` errors. #13586
* [ENHANCEMENT] HA: Deduplicate per sample instead of per batch. #13665
* [BUGFIX] Compactor: Fix potential concurrent map writes. #13053
* [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084
* [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084
Expand Down
672 changes: 672 additions & 0 deletions benchmarks/after.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you delete these before merging?

Large diffs are not rendered by default.

648 changes: 648 additions & 0 deletions benchmarks/before.txt

Large diffs are not rendered by default.

186 changes: 186 additions & 0 deletions benchmarks/bench_stat.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for running the benchmarks. it looks like there's nothing to worry about, but imo worth running the cases with HA tracker without your diff too.

normally it's enough to post this as a comment, but committed works too. Just don't forget to delete it before this PR is merged

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Incoming samples are considered duplicated (and thus dropped) if they are receiv

If the HA tracker is enabled but incoming samples contain only one or none of the cluster and replica labels, these samples are accepted by default and never deduplicated.

> Note: for performance reasons, the HA tracker only checks the cluster and replica label of the first series in the request to determine whether all series in the request should be deduplicated. This assumes that all series inside the request have the same cluster and replica labels, which is typically true when Prometheus is configured with external labels. Ensure this requirement is honored if you have a non-standard Prometheus setup (for example, you're using Prometheus federation or have a metrics proxy in between).
> Note: the HA tracker checks the cluster and replica label of every series in the request to determine whether each series in the request should be deduplicated.

## Configuration

Expand Down
310 changes: 263 additions & 47 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/mtime"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -1132,6 +1133,149 @@ func (d *Distributor) wrapPushWithMiddlewares(next PushFunc) PushFunc {

}

type replicaState int

const (
// replicaRejectedUnknown sample is rejected due to an unknown error.
replicaRejectedUnknown replicaState = 0
// replicaIsPrimary sample is from the elected primary replica and should be accepted.
replicaIsPrimary replicaState = 1 << iota
// replicaNotHA sample doesn't have both HA labels and should be accepted.
replicaNotHA
// replicaDeduped sample is from a non-primary replica and should be deduplicated.
replicaDeduped
// replicaRejectedTooManyClusters sample is rejected because the tenant has too many HA clusters.
replicaRejectedTooManyClusters

replicaAccepted = replicaIsPrimary | replicaNotHA
)

type haReplica struct {
cluster, replica string
}

type replicaInfo struct {
state replicaState
sampleCount int
}

// replicaObserved checks if a sample from a given replica should be accepted for ingestion based on HA deduplication rules.
func (d *Distributor) replicaObserved(ctx context.Context, userID string, replica haReplica, ts int64) (replicaState, error) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
// Make a copy of these, since they may be retained as tags
attribute.String("cluster", strings.Clone(replica.cluster)),
attribute.String("replica", strings.Clone(replica.replica)),
)

isAccepted, err := d.checkSample(ctx, userID, replica.cluster, replica.replica, ts)
if err != nil {
switch {
case errors.As(err, &replicasDidNotMatchError{}):
// These samples have been deduped.
return replicaDeduped, err
case errors.As(err, &tooManyClustersError{}):
return replicaRejectedTooManyClusters, err
default:
return replicaRejectedUnknown, err
}
}

if isAccepted {
return replicaIsPrimary, nil
}
// If there wasn't an error but isAccepted is false that means we didn't find both HA labels.
return replicaNotHA, nil
}

func getReplicasFromRequest(req *mimirpb.WriteRequest, haReplicaLabel, haClusterLabel string) []haReplica {
replicas := make([]haReplica, len(req.Timeseries))
for i, ts := range req.Timeseries {
// Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples.
replicas[i] = findHALabels(haReplicaLabel, haClusterLabel, ts.Labels)
}
return replicas
}

func getEarliestSampleTimestamp(req *mimirpb.WriteRequest, defaultTimestamp int64) int64 {
earliestSampleTimestamp := defaultTimestamp
for _, ts := range req.Timeseries {
if len(ts.Samples) > 0 {
tsms := ts.Samples[0].TimestampMs
if tsms < earliestSampleTimestamp {
earliestSampleTimestamp = tsms
}
}
if len(ts.Histograms) > 0 {
tsms := ts.Histograms[0].Timestamp
if tsms < earliestSampleTimestamp {
earliestSampleTimestamp = tsms
}
}
}
return earliestSampleTimestamp
}

func (d *Distributor) processHaReplicas(ctx context.Context, userID string, sampleTimestamp int64, replicaInfos map[haReplica]*replicaInfo) (map[replicaState]int, error) {
var errs multierror.MultiError
samplesPerState := make(map[replicaState]int)
for replicaKey, info := range replicaInfos {
if info.state == replicaRejectedUnknown {
state, replicaErr := d.replicaObserved(ctx, userID, replicaKey, sampleTimestamp)
info.state = state
if replicaErr != nil {
errs.Add(replicaErr)
}
}
samplesPerState[info.state] += info.sampleCount
}
return samplesPerState, errs.Err()
}

func getReplicaInfos(req *mimirpb.WriteRequest, replicas []haReplica) map[haReplica]*replicaInfo {
replicaInfos := make(map[haReplica]*replicaInfo)
// Check if all timeseries belong to the same replica
firstReplica := replicas[0]
isOneReplica := true
for i := 1; i < len(req.Timeseries); i++ {
if replicas[i] != firstReplica {
isOneReplica = false
break
}
}

// Count samples per replica
if isOneReplica {
numSamples := 0
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples) + len(ts.Histograms)
}
// The replica info is stored in a map where the key is the replica itself.
// The replica labels are references to the request buffer, which will be reused.
// To safely use the replica as map key, we need to clone its labels.
firstReplica.cluster = strings.Clone(firstReplica.cluster)
firstReplica.replica = strings.Clone(firstReplica.replica)
replicaInfos[firstReplica] = &replicaInfo{sampleCount: numSamples}
} else {
for i, ts := range req.Timeseries {
r := replicas[i]
info := replicaInfos[r]
if info == nil {
// The replica info is stored in a map where the key is the replica itself.
// The replica labels are references to the request buffer, which will be reused.
// To safely use the replica as map key, we need to clone its labels.
r.cluster = strings.Clone(r.cluster)
r.replica = strings.Clone(r.replica)

info = &replicaInfo{}
replicaInfos[r] = info
}
info.sampleCount += len(ts.Samples) + len(ts.Histograms)
}
}
return replicaInfos
}

func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's way too many things happening in this function already let's break it up. i'll try to leave some suggestions

return WithCleanup(next, func(next PushFunc, ctx context.Context, pushReq *Request) error {
req, err := pushReq.WriteRequest()
Expand All @@ -1149,75 +1293,147 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
}

haReplicaLabel := d.limits.HAReplicaLabel(userID)
cluster, replica := findHALabels(haReplicaLabel, d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
// Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples.
cluster, replica = strings.Clone(cluster), strings.Clone(replica)
haClusterLabel := d.limits.HAClusterLabel(userID)

replicas := getReplicasFromRequest(req, haReplicaLabel, haClusterLabel)

span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("cluster", cluster),
attribute.String("replica", replica),
)

numSamples := 0
now := time.Now()

group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), now)
sampleTimestamp := timestamp.FromTime(now)
if d.limits.HATrackerUseSampleTimeForFailover(userID) {
earliestSampleTimestamp := sampleTimestamp
for _, ts := range req.Timeseries {
if len(ts.Samples) > 0 {
tsms := ts.Samples[0].TimestampMs
if tsms < earliestSampleTimestamp {
earliestSampleTimestamp = tsms
}
}
if len(ts.Histograms) > 0 {
tsms := ts.Histograms[0].Timestamp
if tsms < earliestSampleTimestamp {
earliestSampleTimestamp = tsms
}
}
}
sampleTimestamp = earliestSampleTimestamp
sampleTimestamp = getEarliestSampleTimestamp(req, sampleTimestamp)
}
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples) + len(ts.Histograms)

var errs multierror.MultiError
replicaInfos := getReplicaInfos(req, replicas)

var clusters []string
var replicasAsStrings []string
for replicaKey := range replicaInfos {
clusters = append(clusters, replicaKey.cluster)
replicasAsStrings = append(replicasAsStrings, replicaKey.replica)
}
span.SetAttributes(
attribute.StringSlice("clusters", clusters),
attribute.StringSlice("replicas", replicasAsStrings),
)

removeReplica, err := d.checkSample(ctx, userID, cluster, replica, sampleTimestamp)
if err != nil {
if errors.As(err, &replicasDidNotMatchError{}) {
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
}
samplesPerState, processErr := d.processHaReplicas(ctx, userID, sampleTimestamp, replicaInfos)
if processErr != nil {
errs.Add(processErr)
}

if errors.As(err, &tooManyClustersError{}) {
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples))
d.costAttributionMgr.SampleTracker(userID).IncrementDiscardedSamples(req.Timeseries[0].Labels, float64(numSamples), reasonTooManyHAClusters, now)
}
lastAccepted := sortByAccepted(req, replicaInfos, replicas)
removeHAReplicaLabels(req, lastAccepted, replicas, replicaInfos, haReplicaLabel)

return err
// We don't want to send samples beyond the last accepted sample - that was deduplicated
if samplesPerState[replicaRejectedTooManyClusters] > 0 {
d.updateHADedupeMetrics(userID, group, replicaInfos, samplesPerState, req.Timeseries[lastAccepted+1].Labels)
} else {
d.updateHADedupeMetrics(userID, group, replicaInfos, samplesPerState, nil)
}
pushReq.AddCleanup(sliceUnacceptedRequests(req, lastAccepted))

if removeReplica {
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Mimir. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
for ix := range req.Timeseries {
req.Timeseries[ix].RemoveLabel(haReplicaLabel)
if len(req.Timeseries) > 0 {
if pushErr := next(ctx, pushReq); pushErr != nil {
errs.Add(pushErr)
}
} else {
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
d.nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
}

return next(ctx, pushReq)
return errs.Err()
})
}

func removeHAReplicaLabels(req *mimirpb.WriteRequest, lastAccepted int, replicas []haReplica, replicaInfos map[haReplica]*replicaInfo, haReplicaLabel string) {
for i := 0; i <= lastAccepted; i++ {
r := replicas[i]
s := replicaInfos[r].state
if s&replicaIsPrimary == 0 {
continue
}
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Mimir. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
req.Timeseries[i].RemoveLabel(haReplicaLabel)
}
}

func sliceUnacceptedRequests(req *mimirpb.WriteRequest, lastAccepted int) func() {
originalLen := len(req.Timeseries)
req.Timeseries = req.Timeseries[:lastAccepted+1]
return func() {
// Restore the length so that we can put back all the series in the request to the memory pool
req.Timeseries = req.Timeseries[:originalLen]
}

}

// updateHADedupeMetrics updates metrics related to HA deduplication.
func (d *Distributor) updateHADedupeMetrics(userID, group string, replicaInfos map[haReplica]*replicaInfo, samplesPerState map[replicaState]int, labels []mimirpb.LabelAdapter) {
for replica, info := range replicaInfos {
if info.state&replicaDeduped != 0 && info.sampleCount > 0 {
cluster := strings.Clone(replica.cluster) // Make a copy of this, since it may be retained as labels on our metrics
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(info.sampleCount))
}
}
if samplesPerState[replicaNotHA] > 0 {
d.nonHASamples.WithLabelValues(userID).Add(float64(samplesPerState[replicaNotHA]))
}
if samplesPerState[replicaRejectedTooManyClusters] > 0 {
d.costAttributionMgr.SampleTracker(userID).IncrementDiscardedSamples(labels, float64(samplesPerState[replicaRejectedTooManyClusters]), reasonTooManyHAClusters, time.Now())
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(samplesPerState[replicaRejectedTooManyClusters]))
}
}

// sortByAccepted returns the index of the last accepted timeseries in the write request based on the ha dedup states of the replicas
func sortByAccepted(req *mimirpb.WriteRequest, replicaInfos map[haReplica]*replicaInfo, replicas []haReplica) int {
numAcceptedReplicas := 0
for _, info := range replicaInfos {
if info.state&replicaAccepted != 0 {
numAcceptedReplicas++
}
}
if numAcceptedReplicas == len(replicaInfos) {
return len(req.Timeseries) - 1
}
if numAcceptedReplicas == 0 {
return -1
}
findPreviousAccepted := func(i int) int {
for i > 0 {
state := replicaInfos[replicas[i]].state
if state&replicaAccepted != 0 {
break
}
i--
}
return i
}
lastAccepted := findPreviousAccepted(len(req.Timeseries) - 1)
// next we shift all accepted samples to the front of the timeseries slice
for i := range req.Timeseries {
if i > lastAccepted {
break
}
state := replicaInfos[replicas[i]].state
if state&replicaAccepted == 0 {
req.Timeseries[i], req.Timeseries[lastAccepted] = req.Timeseries[lastAccepted], req.Timeseries[i]
replicas[i], replicas[lastAccepted] = replicas[lastAccepted], replicas[i]
lastAccepted--
lastAccepted = findPreviousAccepted(lastAccepted)
}

}

return lastAccepted
}

func (d *Distributor) prePushRelabelMiddleware(next PushFunc) PushFunc {
return WithCleanup(next, func(next PushFunc, ctx context.Context, pushReq *Request) error {

req, err := pushReq.WriteRequest()
if err != nil {
return err
Expand Down
Loading
Loading