Skip to content

Commit

Permalink
Revert "Merge pull request #81 from vinted/handler_write_multiple_ten…
Browse files Browse the repository at this point in the history
…ants"

This reverts commit 42121ae, reversing
changes made to 2627578.
  • Loading branch information
GiedriusS committed Jan 31, 2024
1 parent 42121ae commit 473cf7a
Showing 1 changed file with 9 additions and 28 deletions.
37 changes: 9 additions & 28 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -65,8 +64,6 @@ const (
// Labels for metrics.
labelSuccess = "success"
labelError = "error"

metaLabelTenantID = model.MetaLabelPrefix + "tenant_id"
)

var (
Expand Down Expand Up @@ -830,40 +827,24 @@ func (h *Handler) sendWrites(
func (h *Handler) sendLocalWrite(
ctx context.Context,
writeDestination endpointReplica,
tenantHTTP string,
tenant string,
trackedSeries trackedSeries,
responses chan<- writeResponse,
) {
span, tracingCtx := tracing.StartSpan(ctx, "receive_local_tsdb_write")
defer span.Finish()
span.SetTag("endpoint", writeDestination.endpoint)
span.SetTag("replica", writeDestination.replica)

tenantSeriesMapping := map[string][]prompb.TimeSeries{}
for _, ts := range trackedSeries.timeSeries {
lbls := labelpb.ZLabelsToPromLabels(ts.Labels)
if tenant := lbls.Get(metaLabelTenantID); tenant != "" {
tenantSeriesMapping[tenant] = append(tenantSeriesMapping[tenant], ts)
continue
} else {
tenantSeriesMapping[tenantHTTP] = append(tenantSeriesMapping[tenantHTTP], ts)
continue
}
}

for tenant, series := range tenantSeriesMapping {
err := h.writer.Write(tracingCtx, tenant, &prompb.WriteRequest{
Timeseries: series,
})
if err != nil {
span.SetTag("error", true)
span.SetTag("error.msg", err.Error())
responses <- newWriteResponse(trackedSeries.seriesIDs, err, writeDestination)
return
}
err := h.writer.Write(tracingCtx, tenant, &prompb.WriteRequest{
Timeseries: trackedSeries.timeSeries,
})
if err != nil {
span.SetTag("error", true)
span.SetTag("error.msg", err.Error())
responses <- newWriteResponse(trackedSeries.seriesIDs, err, writeDestination)
return
}
responses <- newWriteResponse(trackedSeries.seriesIDs, nil, writeDestination)

}

// sendRemoteWrite sends a write request to the remote node. It takes care of checking wether the endpoint is up or not
Expand Down

0 comments on commit 473cf7a

Please sign in to comment.