Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 and OTLP requests. The `-distributor.otlp.enable-type-and-unit-labels` flag has been consolidated into this flag. #7077
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a breaking change? I think this is fine as that seems an experimental feature. But we do need to mention that it is a breaking change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, since the changes are already integrated and tested together, I'd prefer to keep them in this PR. Also, I added breaking change mention to change log.

* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
Expand Down
10 changes: 5 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3274,11 +3274,6 @@ otlp:
# EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.
# CLI flag: -distributor.otlp.allow-delta-temporality
[allow_delta_temporality: <boolean> | default = false]

# EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for
# the OTLP metrics.
# CLI flag: -distributor.otlp.enable-type-and-unit-labels
[enable_type_and_unit_labels: <boolean> | default = false]
```

### `etcd_config`
Expand Down Expand Up @@ -3998,6 +3993,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.promote-resource-attributes
[promote_resource_attributes: <list of string> | default = ]

# EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics.
# This applies to remote write v2 and OTLP requests.
# CLI flag: -distributor.enable-type-and-unit-labels
[enable_type_and_unit_labels: <boolean> | default = false]

# The maximum number of active series per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-series-per-user
[max_series_per_user: <int> | default = 5000000]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/spf13/afero v1.11.0
github.com/stretchr/testify v1.11.1
github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3
github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264
github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6
github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1774,8 +1774,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3 h1:P301Anc27aVL7Ls88el92j+qW3PJp8zmiDl+kOUZv3A=
github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3/go.mod h1:uDHLkMKOGDAnlN75EAz8VrRzob1+VbgYSuUleatWuF0=
github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264 h1:sOmANo4XVhem4VgvI9w05DBwqMex/qw+cDjuHW2FKWw=
github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc=
github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6 h1:/8TlVay3hF8LQ7iGLRm9aNRGQsyYOXFPNmtg5dsNwcM=
github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc=
github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb h1:z/ePbn3lo/D4vdHGH8hpa2kgH9M6iLq0kOFtZwuelKM=
github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb/go.mod h1:gGUG3TDEoRSjTFVs/QO6QnQIILRgNF0P9l7BiiMfmHw=
github.com/tinylib/msgp v1.3.0 h1:ULuf7GPooDaIlbyvgAxBV/FI7ynli6LZ1/nVUNu+0ww=
Expand Down
6 changes: 4 additions & 2 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,18 +465,20 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe
st := writev2.NewSymbolTable()
lb := labels.NewScratchBuilder(0)
lb.Add("__name__", name)

for _, label := range additionalLabels {
lb.Add(label.Name, label.Value)
}

series = append(series, writev2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
Samples: []writev2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
HelpRef: 2, // equal to name
UnitRef: 2, // equal to name
},
})
symbols = st.Symbols()
Expand Down
2 changes: 1 addition & 1 deletion integration/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestOTLPEnableTypeAndUnitLabels(t *testing.T) {
"-auth.enabled": "true",

// OTLP
"-distributor.otlp.enable-type-and-unit-labels": "true",
"-distributor.enable-type-and-unit-labels": "true",

// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
Expand Down
71 changes: 71 additions & 0 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,77 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
require.Empty(t, result)
}

func TestIngest_EnableTypeAndUnitLabels(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-querier.query-store-for-labels-enabled": "true",
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.tsdb.enable-native-histograms": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
"-distributor.remote-writev2-enabled": "true",
"-distributor.enable-type-and-unit-labels": "true",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path := path.Join(s.SharedDir(), "cortex-1")

flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
// Start Cortex replicas.
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

now := time.Now()

// series push
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
writeStats, err := c.PushV2(symbols1, series)
require.NoError(t, err)
testPushHeader(t, writeStats, 1, 0, 0)

value, err := c.Query("test_series", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, value.Type())
vec := value.(model.Vector)
require.True(t, vec[0].Metric["__unit__"] != "")
require.True(t, vec[0].Metric["__type__"] != "")
}

func TestIngest(t *testing.T) {
const blockRangePeriod = 5 * time.Second

Expand Down
10 changes: 5 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
Expand All @@ -295,7 +295,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
}
Expand All @@ -313,7 +313,7 @@ type Ingester interface {
}

// RegisterIngester registers the ingesters HTTP and GRPC service
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overrides *validation.Overrides) {
client.RegisterIngesterServer(a.server.GRPC, i)

a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics")
Expand All @@ -328,12 +328,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

// Legacy Routes
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
}

func (t *Cortex) initIngester() (serv services.Service, err error) {
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor)
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor, t.Overrides)

return nil, nil
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/cortexpb/compatv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cortexpb
import (
"fmt"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)

Expand Down Expand Up @@ -32,3 +33,26 @@ func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []s
b.Sort()
return b.Labels(), nil
}

func MetadataV2MetricTypeToMetricType(mt MetadataV2_MetricType) model.MetricType {
switch mt {
case METRIC_TYPE_UNSPECIFIED:
return model.MetricTypeUnknown
case METRIC_TYPE_COUNTER:
return model.MetricTypeCounter
case METRIC_TYPE_GAUGE:
return model.MetricTypeGauge
case METRIC_TYPE_HISTOGRAM:
return model.MetricTypeHistogram
case METRIC_TYPE_GAUGEHISTOGRAM:
return model.MetricTypeGaugeHistogram
case METRIC_TYPE_SUMMARY:
return model.MetricTypeSummary
case METRIC_TYPE_INFO:
return model.MetricTypeInfo
case METRIC_TYPE_STATESET:
return model.MetricTypeStateset
default:
return model.MetricTypeUnknown
}
}
8 changes: 3 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,9 @@ type InstanceLimits struct {
}

type OTLPConfig struct {
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels"`
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -229,7 +228,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.")
f.BoolVar(&cfg.OTLPConfig.EnableTypeAndUnitLabels, "distributor.otlp.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.")
}

// Validate config and returns error on failure
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu
AddMetricSuffixes: true,
DisableTargetInfo: cfg.DisableTargetInfo,
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
EnableTypeAndUnitLabels: cfg.EnableTypeAndUnitLabels,
EnableTypeAndUnitLabels: overrides.EnableTypeAndUnitLabels(userID),
}

var annots annotations.Annotations
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
cfg := distributor.OTLPConfig{
EnableTypeAndUnitLabels: test.enableTypeAndUnitLabels,
AllowDeltaTemporality: test.allowDeltaTemporality,
AllowDeltaTemporality: test.allowDeltaTemporality,
}
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
Expand All @@ -90,6 +89,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) {
test.otlpSeries.CopyTo(sm.Metrics().AppendEmpty())

limits := validation.Limits{}
limits.EnableTypeAndUnitLabels = test.enableTypeAndUnitLabels
overrides := validation.NewOverrides(limits, nil)
promSeries, metadata, err := convertToPromTS(ctx, metrics, cfg, overrides, "user-1", logger)
require.NoError(t, err)
Expand Down
31 changes: 27 additions & 4 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/exp/api/remote"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/schema"
"github.com/prometheus/prometheus/util/compression"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
Expand All @@ -36,7 +39,7 @@ const (
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
Expand Down Expand Up @@ -78,8 +81,13 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
}

handlePRW2 := func() {
userID, err := tenant.TenantID(ctx)
if err != nil {
return
}

var req cortexpb.PreallocWriteRequestV2
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -91,7 +99,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
req.Source = cortexpb.API
}

v1Req, err := convertV2RequestToV1(&req)
v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID))
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -169,7 +177,7 @@ func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int
w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10))
}

func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.PreallocWriteRequest, error) {
func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool) (cortexpb.PreallocWriteRequest, error) {
var v1Req cortexpb.PreallocWriteRequest
v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
var v1Metadata []*cortexpb.MetricMetadata
Expand All @@ -181,10 +189,25 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.Preall
if err != nil {
return v1Req, err
}

unit := symbols[v2Ts.Metadata.UnitRef]
metricType := v2Ts.Metadata.Type
shouldAttachTypeAndUnitLabels := enableTypeAndUnitLabels && (metricType != cortexpb.METRIC_TYPE_UNSPECIFIED || unit != "")
if shouldAttachTypeAndUnitLabels {
slb := labels.NewScratchBuilder(lbs.Len() + 2) // for __type__ and __unit__
lbs.Range(func(l labels.Label) {
slb.Add(l.Name, l.Value)
})
schema.Metadata{Type: cortexpb.MetadataV2MetricTypeToMetricType(metricType), Unit: unit}.AddToLabels(&slb)
slb.Sort()
lbs = slb.Labels()
}

exemplars, err := convertV2ToV1Exemplars(&b, symbols, v2Ts.Exemplars)
if err != nil {
return v1Req, err
}

v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: cortexpb.FromLabelsToLabelAdapters(lbs),
Expand Down
Loading
Loading