Skip to content

output/cloudv2: Flush the aggregated metrics #3083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 107 additions & 8 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,115 @@
package expv2

import "context"
import (
"context"

type noopFlusher struct {
referenceID string
bq *bucketQ
"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

// client MetricsClient
type pusher interface {
push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error
}

func (f *noopFlusher) Flush(ctx context.Context) error {
type metricsFlusher struct {
referenceID string
bq *bucketQ
client pusher
aggregationPeriodInSeconds uint32
}

func (f *metricsFlusher) Flush(ctx context.Context) error {
// drain the buffer
_ = f.bq.PopAll()
return nil
buckets := f.bq.PopAll()
if len(buckets) < 1 {
return nil
}

// Pivot the data structure from a slice of Timebuckets
// to a metric set of time series where each has nested samples.
//
// The Protobuf payload structure has the metric as the first level
// instead of the current slice of buckets that knows about the metric
// only in deeply nested levels. So, we need to go through the buckets
// and group them by metric. To avoid doing too many loops and allocations,
// the metricSetBuilder is used for doing it during the traverse of the buckets.

msb := newMetricSetBuilder(f.referenceID, f.aggregationPeriodInSeconds)
for i := 0; i < len(buckets); i++ {
msb.addTimeBucket(&buckets[i])
}

// send the MetricSet to the remote service
return f.client.push(ctx, f.referenceID, msb.MetricSet)
}

type metricSetBuilder struct {
MetricSet *pbcloud.MetricSet
TestRunID string
AggregationPeriodInSeconds uint32

// TODO: If we will introduce the metricID then we could
// just use it as map's key (map[uint64]pbcloud.Metric). It is faster.
// Or maybe, when we will have a better vision around the dynamic tracking
// for metrics (https://github.com/grafana/k6/issues/1321) then we could consider
// if an array, with the length equals to the number of registered metrics,
// could eventually work.
//
// TODO: we may evaluate to replace it with
// map[*metrics.Metric][]*pbcloud.TimeSeries)
// and use a sync.Pool for the series slice.
// We need dedicated benchmarks before doing it.
//
// metrics tracks the related metric conversion
// into a protobuf structure.
metrics map[*metrics.Metric]*pbcloud.Metric

// seriesIndex tracks the index of the time series XYZ
// in the related slice in
// metrics[XYZ].<pbcloud.Metric>.TimeSeries.
// It supports the iterative process for appending
// the aggregated measurements for each time series.
seriesIndex map[metrics.TimeSeries]uint
}

func newMetricSetBuilder(testRunID string, aggrPeriodSec uint32) metricSetBuilder {
return metricSetBuilder{
TestRunID: testRunID,
MetricSet: &pbcloud.MetricSet{},
AggregationPeriodInSeconds: aggrPeriodSec,
// TODO: evaluate if removing the pointer from pbcloud.Metric
// is a better trade-off
metrics: make(map[*metrics.Metric]*pbcloud.Metric),
seriesIndex: make(map[metrics.TimeSeries]uint),
}
}

func (msb *metricSetBuilder) addTimeBucket(bucket *timeBucket) {
for timeSeries, sink := range bucket.Sinks {
pbmetric, ok := msb.metrics[timeSeries.Metric]
if !ok {
pbmetric = &pbcloud.Metric{
Name: timeSeries.Metric.Name,
Type: mapMetricTypeProto(timeSeries.Metric.Type),
}
msb.metrics[timeSeries.Metric] = pbmetric
msb.MetricSet.Metrics = append(msb.MetricSet.Metrics, pbmetric)
}

var pbTimeSeries *pbcloud.TimeSeries
ix, ok := msb.seriesIndex[timeSeries]
if !ok {
pbTimeSeries = &pbcloud.TimeSeries{
AggregationPeriod: msb.AggregationPeriodInSeconds,
Labels: mapTimeSeriesLabelsProto(timeSeries, msb.TestRunID),
}
pbmetric.TimeSeries = append(pbmetric.TimeSeries, pbTimeSeries)
msb.seriesIndex[timeSeries] = uint(len(pbmetric.TimeSeries) - 1)
} else {
pbTimeSeries = pbmetric.TimeSeries[ix]
}

addBucketToTimeSeriesProto(
pbTimeSeries, timeSeries.Metric.Type, bucket.Time, sink)
}
}
41 changes: 41 additions & 0 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package expv2

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/metrics"
)

// TODO: additional case
// case: add when the metric already exist
// case: add when the metric and the timeseries already exist

func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
t.Parallel()

r := metrics.NewRegistry()
m1 := r.MustNewMetric("metric1", metrics.Counter)
timeSeries := metrics.TimeSeries{
Metric: m1,
Tags: r.RootTagSet().With("key1", "val1"),
}

tb := timeBucket{
Time: time.Unix(1, 0),
Sinks: map[metrics.TimeSeries]metrics.Sink{
timeSeries: &metrics.CounterSink{},
},
}
msb := newMetricSetBuilder("testrunid-123", 1)
msb.addTimeBucket(&tb)

assert.Contains(t, msb.metrics, m1)
require.Contains(t, msb.seriesIndex, timeSeries)
assert.Equal(t, uint(0), msb.seriesIndex[timeSeries]) // TODO: assert with another number

require.Len(t, msb.MetricSet.Metrics, 1)
assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1)
}
155 changes: 155 additions & 0 deletions output/cloud/expv2/integration/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package integration

import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/klauspost/compress/snappy"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/cloudapi"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"gopkg.in/guregu/null.v3"
)

// This test runs an integration tests for the Output cloud.
// It only calls public API of the Output and
// it implements a concrete http endpoint where to get
// the protobuf flush requests.

func TestOutputFlush(t *testing.T) {
// TODO: it has 3s for aggregation time
// then it means it will execute for +3s that it is a waste of time
// because it isn't really required.
// Reduce the aggregation time (to 1s?) then run always.
t.Skip("Skip the integration test, if required enable it manually")
t.Parallel()

results := make(chan *pbcloud.MetricSet)
ts := httptest.NewServer(metricsHandler(results))
defer ts.Close()

// init conifg
c := cloudapi.NewConfig()
c.Host = null.StringFrom(ts.URL)
c.Token = null.StringFrom("my-secret-token")
c.AggregationPeriod = types.NullDurationFrom(3 * time.Second)
c.AggregationWaitPeriod = types.NullDurationFrom(1 * time.Second)

// init and start the output
o, err := expv2.New(testutils.NewLogger(t), c)
require.NoError(t, err)
o.SetReferenceID("my-test-run-id-123")
require.NoError(t, o.Start())

// collect and flush samples
o.AddMetricSamples([]metrics.SampleContainer{
testSamples(),
})

// wait for results
mset := <-results
close(results)
assert.NoError(t, o.StopWithTestError(nil))

// read and convert the json version
// of the expected protobuf sent request
var exp pbcloud.MetricSet
expjs, err := os.ReadFile("./testdata/metricset.json") //nolint:forbidigo // ReadFile here is used in a test
require.NoError(t, err)
err = protojson.Unmarshal(expjs, &exp)
require.NoError(t, err)

msetjs, err := protojson.Marshal(mset)
require.NoError(t, err)
assert.JSONEq(t, string(expjs), string(msetjs))
}

func metricsHandler(results chan<- *pbcloud.MetricSet) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
token := r.Header.Get("Authorization")
if token != "Token my-secret-token" {
http.Error(rw, fmt.Sprintf("token is required; got %q", token), http.StatusUnauthorized)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
mset, err := metricSetFromRequest(b)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
results <- mset
}
}

func metricSetFromRequest(b []byte) (*pbcloud.MetricSet, error) {
b, err := snappy.Decode(nil, b)
if err != nil {
return nil, err
}
var mset pbcloud.MetricSet
err = proto.Unmarshal(b, &mset)
if err != nil {
return nil, err
}
return &mset, nil
}

func testSamples() metrics.Samples {
r := metrics.NewRegistry()
m1 := r.MustNewMetric("metric_counter_1", metrics.Counter)
m2 := r.MustNewMetric("metric_gauge_2", metrics.Gauge)
m3 := r.MustNewMetric("metric_rate_3", metrics.Rate)
m4 := r.MustNewMetric("metric_trend_4", metrics.Trend)

samples := []metrics.Sample{
{
TimeSeries: metrics.TimeSeries{
Metric: m1,
Tags: r.RootTagSet().With("my_label_1", "my_label_value_1"),
},
Time: time.Date(2023, time.May, 1, 1, 0, 0, 0, time.UTC),
Value: 42.2,
},
{
TimeSeries: metrics.TimeSeries{
Metric: m2,
Tags: r.RootTagSet().With("my_label_2", "my_label_value_2"),
},
Time: time.Date(2023, time.May, 1, 2, 0, 0, 0, time.UTC),
Value: 3.14,
},
{
TimeSeries: metrics.TimeSeries{
Metric: m3,
Tags: r.RootTagSet().With("my_label_3", "my_label_value_3"),
},
Time: time.Date(2023, time.May, 1, 3, 0, 0, 0, time.UTC),
Value: 2.718,
},
{
TimeSeries: metrics.TimeSeries{
Metric: m4,
Tags: r.RootTagSet().With("my_label_4", "my_label_value_4"),
},
Time: time.Date(2023, time.May, 1, 4, 0, 0, 0, time.UTC),
Value: 186,
},
}
return samples
}
Loading