From 473cf7a5c007db9cf549c3c156ef0af71c0c16be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 31 Jan 2024 11:06:27 +0200 Subject: [PATCH] Revert "Merge pull request #81 from vinted/handler_write_multiple_tenants" This reverts commit 42121aef357da4d9f69d90bd59e60287794758f8, reversing changes made to 26275788a4f89065ffcd436baa5637f304fe1fa1. --- pkg/receive/handler.go | 37 +++++++++---------------------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 53633a034b..495afa1559 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -28,7 +28,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" @@ -65,8 +64,6 @@ const ( // Labels for metrics. labelSuccess = "success" labelError = "error" - - metaLabelTenantID = model.MetaLabelPrefix + "tenant_id" ) var ( @@ -830,7 +827,7 @@ func (h *Handler) sendWrites( func (h *Handler) sendLocalWrite( ctx context.Context, writeDestination endpointReplica, - tenantHTTP string, + tenant string, trackedSeries trackedSeries, responses chan<- writeResponse, ) { @@ -838,32 +835,16 @@ func (h *Handler) sendLocalWrite( defer span.Finish() span.SetTag("endpoint", writeDestination.endpoint) span.SetTag("replica", writeDestination.replica) - - tenantSeriesMapping := map[string][]prompb.TimeSeries{} - for _, ts := range trackedSeries.timeSeries { - lbls := labelpb.ZLabelsToPromLabels(ts.Labels) - if tenant := lbls.Get(metaLabelTenantID); tenant != "" { - tenantSeriesMapping[tenant] = append(tenantSeriesMapping[tenant], ts) - continue - } else { - tenantSeriesMapping[tenantHTTP] = append(tenantSeriesMapping[tenantHTTP], ts) - continue - } - } - - for tenant, series := range tenantSeriesMapping { - err := h.writer.Write(tracingCtx, tenant, &prompb.WriteRequest{ - Timeseries: series, - }) - if err != nil { - span.SetTag("error", true) - span.SetTag("error.msg", err.Error()) - responses <- newWriteResponse(trackedSeries.seriesIDs, err, writeDestination) - return - } + err := h.writer.Write(tracingCtx, tenant, &prompb.WriteRequest{ + Timeseries: trackedSeries.timeSeries, + }) + if err != nil { + span.SetTag("error", true) + span.SetTag("error.msg", err.Error()) + responses <- newWriteResponse(trackedSeries.seriesIDs, err, writeDestination) + return } responses <- newWriteResponse(trackedSeries.seriesIDs, nil, writeDestination) - } // sendRemoteWrite sends a write request to the remote node. It takes care of checking wether the endpoint is up or not