Skip to content

Commit

Permalink
feat: catch, discard & warn about the labels that have reserved names (
Browse files Browse the repository at this point in the history
…grafana#3162)

* feat: catch, discard & warn about the labels that have reserved names

---------

Co-authored-by: Ivan <2103732+codebien@users.noreply.github.com>
  • Loading branch information
olegbespalov and codebien authored Jul 6, 2023
1 parent e1e22ba commit 0fb962b
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 11 deletions.
52 changes: 48 additions & 4 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package expv2
import (
"context"

"github.com/sirupsen/logrus"
"go.k6.io/k6/cloudapi/insights"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
Expand All @@ -16,6 +17,8 @@ type metricsFlusher struct {
testRunID string
bq *bucketQ
client pusher
logger logrus.FieldLogger
discardedLabels map[string]struct{}
aggregationPeriodInSeconds uint32
maxSeriesInBatch int
}
Expand Down Expand Up @@ -47,7 +50,7 @@ func (f *metricsFlusher) flush(_ context.Context) error {
}

// we hit the chunk size, let's flush
err := f.client.push(msb.MetricSet)
err := f.push(msb)
if err != nil {
return err
}
Expand All @@ -59,6 +62,21 @@ func (f *metricsFlusher) flush(_ context.Context) error {
}

// send the last (or the unique) MetricSet chunk to the remote service
return f.push(msb)
}

// push sends the metric set to the remote service.
// it also checks if the labels are discarded and logs a warning if so.
func (f *metricsFlusher) push(msb metricSetBuilder) error {
for key := range msb.discardedLabels {
if _, ok := f.discardedLabels[key]; ok {
continue
}

f.discardedLabels[key] = struct{}{}
f.logger.Warnf("Tag %s has been discarded since it is reserved for Cloud operations.", key)
}

return f.client.push(msb.MetricSet)
}

Expand Down Expand Up @@ -87,15 +105,20 @@ type metricSetBuilder struct {
// It supports the iterative process for appending
// the aggregated measurements for each time series.
seriesIndex map[metrics.TimeSeries]uint

// discardedLabels tracks the labels that have been discarded
// since they are reserved for internal usage by the Cloud service.
discardedLabels map[string]struct{}
}

func newMetricSetBuilder(testRunID string, aggrPeriodSec uint32) metricSetBuilder {
builder := metricSetBuilder{
MetricSet: &pbcloud.MetricSet{},
// TODO: evaluate if removing the pointer from pbcloud.Metric
// is a better trade-off
metrics: make(map[*metrics.Metric]*pbcloud.Metric),
seriesIndex: make(map[metrics.TimeSeries]uint),
metrics: make(map[*metrics.Metric]*pbcloud.Metric),
seriesIndex: make(map[metrics.TimeSeries]uint),
discardedLabels: nil,
}
builder.MetricSet.TestRunId = testRunID
builder.MetricSet.AggregationPeriod = aggrPeriodSec
Expand All @@ -117,8 +140,11 @@ func (msb *metricSetBuilder) addTimeBucket(bucket timeBucket) {
var pbTimeSeries *pbcloud.TimeSeries
ix, ok := msb.seriesIndex[timeSeries]
if !ok {
labels, discardedLabels := mapTimeSeriesLabelsProto(timeSeries.Tags)
msb.recordDiscardedLabels(discardedLabels)

pbTimeSeries = &pbcloud.TimeSeries{
Labels: mapTimeSeriesLabelsProto(timeSeries.Tags),
Labels: labels,
}
pbmetric.TimeSeries = append(pbmetric.TimeSeries, pbTimeSeries)
msb.seriesIndex[timeSeries] = uint(len(pbmetric.TimeSeries) - 1)
Expand All @@ -131,6 +157,24 @@ func (msb *metricSetBuilder) addTimeBucket(bucket timeBucket) {
}
}

func (msb *metricSetBuilder) recordDiscardedLabels(labels []string) {
if len(labels) == 0 {
return
}

if msb.discardedLabels == nil {
msb.discardedLabels = make(map[string]struct{})
}

for _, key := range labels {
if _, ok := msb.discardedLabels[key]; ok {
continue
}

msb.discardedLabels[key] = struct{}{}
}
}

// insightsClient is an interface for sending request metadatas to the Insights API.
type insightsClient interface {
IngestRequestMetadatasBatch(context.Context, insights.RequestMetadatas) error
Expand Down
89 changes: 87 additions & 2 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/cloudapi/insights"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)
Expand Down Expand Up @@ -127,13 +129,16 @@ func TestMetricsFlusherFlushChunk(t *testing.T) {

r := metrics.NewRegistry()
m1 := r.MustNewMetric("metric1", metrics.Counter)

for _, tc := range testCases {
logger, _ := testutils.NewLoggerWithHook(t)

bq := &bucketQ{}
pm := &pusherMock{}
mf := metricsFlusher{
bq: bq,
client: pm,
logger: logger,
discardedLabels: make(map[string]struct{}),
maxSeriesInBatch: 3,
}

Expand All @@ -160,11 +165,91 @@ func TestMetricsFlusherFlushChunk(t *testing.T) {
}
}

func TestFlushWithReservedLabels(t *testing.T) {
t.Parallel()

logger, hook := testutils.NewLoggerWithHook(t)

collected := make([]*pbcloud.MetricSet, 0)

bq := &bucketQ{}
pm := &pusherMock{
hook: func(ms *pbcloud.MetricSet) {
collected = append(collected, ms)
},
}

mf := metricsFlusher{
bq: bq,
client: pm,
maxSeriesInBatch: 2,
logger: logger,
discardedLabels: make(map[string]struct{}),
}

r := metrics.NewRegistry()
m1 := r.MustNewMetric("metric1", metrics.Counter)

ts1 := metrics.TimeSeries{
Metric: m1,
Tags: r.RootTagSet().With("key1", "val1").With("__name__", "val2").With("test_run_id", "testrunid-123"),
}
bq.Push([]timeBucket{
{
Time: 1,
Sinks: map[metrics.TimeSeries]metricValue{
ts1: &counter{Sum: float64(1)},
},
},
})

ts2 := metrics.TimeSeries{
Metric: m1,
Tags: r.RootTagSet().With("key1", "val2").With("__name__", "val2"),
}
bq.Push([]timeBucket{
{
Time: 2,
Sinks: map[metrics.TimeSeries]metricValue{
ts2: &counter{Sum: float64(1)},
},
},
})

err := mf.flush(context.Background())
require.NoError(t, err)

loglines := hook.Drain()
assert.Equal(t, 1, len(collected))

// check that warnings sown only once per label
require.Len(t, loglines, 2)
testutils.LogContains(loglines, logrus.WarnLevel, "Tag __name__ has been discarded since it is reserved for Cloud operations.")
testutils.LogContains(loglines, logrus.WarnLevel, "Tag test_run_id has been discarded since it is reserved for Cloud operations.")

// check that flusher is not sending labels with reserved names
require.Len(t, collected[0].Metrics, 1)

ts := collected[0].Metrics[0].TimeSeries
require.Len(t, ts[0].Labels, 1)
assert.Equal(t, "key1", ts[0].Labels[0].Name)
assert.Equal(t, "val1", ts[0].Labels[0].Value)

require.Len(t, ts[1].Labels, 1)
assert.Equal(t, "key1", ts[1].Labels[0].Name)
assert.Equal(t, "val2", ts[1].Labels[0].Value)
}

type pusherMock struct {
hook func(*pbcloud.MetricSet)
pushCalled int
}

func (pm *pusherMock) push(_ *pbcloud.MetricSet) error {
func (pm *pusherMock) push(ms *pbcloud.MetricSet) error {
if pm.hook != nil {
pm.hook(ms)
}

pm.pushCalled++
return nil
}
Expand Down
31 changes: 26 additions & 5 deletions output/cloud/expv2/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package expv2

import (
"fmt"
"strings"

"github.com/mstoykov/atlas"
"go.k6.io/k6/metrics"
Expand All @@ -10,21 +11,41 @@ import (
)

// TODO: unit test
func mapTimeSeriesLabelsProto(tags *metrics.TagSet) []*pbcloud.Label {
func mapTimeSeriesLabelsProto(tags *metrics.TagSet) ([]*pbcloud.Label, []string) {
labels := make([]*pbcloud.Label, 0, ((*atlas.Node)(tags)).Len())
var discardedLabels []string

// TODO: move it as a shared func
// TODO: move this as a shared func
// https://github.com/grafana/k6/issues/2764
n := (*atlas.Node)(tags)
if n.Len() < 1 {
return labels
return labels, discardedLabels
}
for !n.IsRoot() {
prev, key, value := n.Data()
labels = append(labels, &pbcloud.Label{Name: key, Value: value})

n = prev
if !isReservedLabelName(key) {
labels = append(labels, &pbcloud.Label{Name: key, Value: value})
continue
}

if discardedLabels == nil {
discardedLabels = make([]string, 0, 1)
}

discardedLabels = append(discardedLabels, key)
}
return labels
return labels, discardedLabels
}

func isReservedLabelName(name string) bool {
// this is a reserved label prefix for the prometheus
if strings.HasPrefix(name, "__") {
return true
}

return name == "test_run_id"
}

// TODO: unit test
Expand Down
2 changes: 2 additions & 0 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func (o *Output) Start() error {
testRunID: o.testRunID,
bq: &o.collector.bq,
client: mc,
logger: o.logger,
discardedLabels: make(map[string]struct{}),
aggregationPeriodInSeconds: uint32(o.config.AggregationPeriod.TimeDuration().Seconds()),
maxSeriesInBatch: int(o.config.MaxTimeSeriesInBatch.Int64),
}
Expand Down

0 comments on commit 0fb962b

Please sign in to comment.