Skip to content

Commit

Permalink
prometheusremotewrite: Introduce pluggable conversion architecture
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Apr 29, 2024
1 parent 3f64154 commit f6b5ed8
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 202 deletions.
17 changes: 9 additions & 8 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,23 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
converter := prometheusremotewrite.NewPrometheusConverter()
err := converter.FromMetrics(md, prwe.exporterSettings)
timeSeries := converter.TimeSeries()
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(timeSeries)))
}

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))
prwe.telemetry.recordTranslatedTimeSeries(ctx, len(timeSeries))

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m)
return prwe.handleExport(ctx, timeSeries, m)
}
}

Expand All @@ -210,14 +211,14 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
return sanitizedLabels, nil
}

func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error {
func (prwe *prwExporter) handleExport(ctx context.Context, timeSeries []prompb.TimeSeries, m []*prompb.MetricMetadata) error {
// There are no metrics to export, so return.
if len(tsMap) == 0 {
if len(timeSeries) == 0 {
return nil
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m)
requests, err := batchTimeSeries(timeSeries, prwe.maxBatchSizeBytes, m)
if err != nil {
return err
}
Expand Down
21 changes: 9 additions & 12 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ func TestNoMetricsNoError(t *testing.T) {

func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
// First we will construct a TimeSeries array from the testutils package
testmap := make(map[string]*prompb.TimeSeries)
var timeSeries []prompb.TimeSeries
if ts != nil {
testmap["test"] = ts
timeSeries = append(timeSeries, *ts)
}

cfg := createDefaultConfig().(*Config)
Expand Down Expand Up @@ -369,7 +369,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
return err
}

return prwe.handleExport(context.Background(), testmap, nil)
return prwe.handleExport(context.Background(), timeSeries, nil)
}

type mockPRWTelemetry struct {
Expand Down Expand Up @@ -944,19 +944,16 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
})
require.NotNil(t, prwe.wal)

ts1 := &prompb.TimeSeries{
ts1 := prompb.TimeSeries{
Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
}
ts2 := &prompb.TimeSeries{
ts2 := prompb.TimeSeries{
Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}},
Samples: []prompb.Sample{{Value: 2, Timestamp: 200}},
}
tsMap := map[string]*prompb.TimeSeries{
"timeseries1": ts1,
"timeseries2": ts2,
}
errs := prwe.handleExport(ctx, tsMap, nil)
timeSeries := []prompb.TimeSeries{ts1, ts2}
errs := prwe.handleExport(ctx, timeSeries, nil)
assert.NoError(t, errs)
// Shutdown after we've written to the WAL. This ensures that our
// exported data in-flight will flushed flushed to the WAL before exiting.
Expand Down Expand Up @@ -988,12 +985,12 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
reqs = append(reqs, req)
}
assert.Equal(t, 1, len(reqs))
// We MUST have 2 time series as were passed into tsMap.
// We MUST have 2 time series as were passed into timeSeries.
gotFromWAL := reqs[0]
assert.Equal(t, 2, len(gotFromWAL.Timeseries))
want := &prompb.WriteRequest{
Timeseries: orderBySampleTimestamp([]prompb.TimeSeries{
*ts1, *ts2,
ts1, ts2,
}),
}

Expand Down
18 changes: 9 additions & 9 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ import (
)

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
func batchTimeSeries(timeSeries []prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
if len(timeSeries) == 0 {
return nil, errors.New("invalid timeSeries: cannot be empty")
}

requests := make([]*prompb.WriteRequest, 0, len(tsMap)+len(m))
tsArray := make([]prompb.TimeSeries, 0, len(tsMap))
requests := make([]*prompb.WriteRequest, 0, len(timeSeries)+len(m))
tsArray := make([]prompb.TimeSeries, 0, len(timeSeries))
sizeOfCurrentBatch := 0

i := 0
for _, v := range tsMap {
sizeOfSeries := v.Size()
for _, ts := range timeSeries {
sizeOfSeries := ts.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)

tsArray = make([]prompb.TimeSeries, 0, len(tsMap)-i)
tsArray = make([]prompb.TimeSeries, 0, len(timeSeries)-i)
sizeOfCurrentBatch = 0
}

tsArray = append(tsArray, *v)
tsArray = append(tsArray, ts)
sizeOfCurrentBatch += sizeOfSeries
i++
}
Expand Down
16 changes: 8 additions & 8 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,34 @@ func Test_batchTimeSeries(t *testing.T) {
ts1 := getTimeSeries(labels, sample1, sample2)
ts2 := getTimeSeries(labels, sample1, sample2, sample3)

tsMap1 := getTimeseriesMap([]*prompb.TimeSeries{})
tsMap2 := getTimeseriesMap([]*prompb.TimeSeries{ts1})
tsMap3 := getTimeseriesMap([]*prompb.TimeSeries{ts1, ts2})
timeSeries1 := []prompb.TimeSeries{}
timeSeries2 := []prompb.TimeSeries{*ts1}
timeSeries3 := []prompb.TimeSeries{*ts1, *ts2}

tests := []struct {
name string
tsMap map[string]*prompb.TimeSeries
timeSeries []prompb.TimeSeries
maxBatchByteSize int
numExpectedRequests int
returnErr bool
}{
{
"no_timeseries",
tsMap1,
timeSeries1,
100,
-1,
true,
},
{
"normal_case",
tsMap2,
timeSeries2,
300,
1,
false,
},
{
"two_requests",
tsMap3,
timeSeries3,
300,
2,
false,
Expand All @@ -57,7 +57,7 @@ func Test_batchTimeSeries(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil)
requests, err := batchTimeSeries(tt.timeSeries, tt.maxBatchByteSize, nil)
if tt.returnErr {
assert.Error(t, err)
return
Expand Down
9 changes: 0 additions & 9 deletions exporter/prometheusremotewriteexporter/testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package prometheusremotewriteexporter

import (
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -383,11 +382,3 @@ func getQuantiles(bounds []float64, values []float64) pmetric.SummaryDataPointVa

return quantiles
}

func getTimeseriesMap(timeseries []*prompb.TimeSeries) map[string]*prompb.TimeSeries {
tsMap := make(map[string]*prompb.TimeSeries)
for i, v := range timeseries {
tsMap[fmt.Sprintf("%s%d", "timeseries_name", i)] = v
}
return tsMap
}
94 changes: 94 additions & 0 deletions pkg/translator/prometheusremotewrite/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

// ConvertMetrics converts pmetric.Metrics to another metrics format via the converter.
func ConvertMetrics(md pmetric.Metrics, settings Settings, converter MetricsConverter) (errs error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
resource := resourceMetrics.Resource()
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
// keep track of the most recent timestamp in the ResourceMetrics for
// use with the "target" info metric
var mostRecentTimestamp pcommon.Timestamp
for j := 0; j < scopeMetricsSlice.Len(); j++ {
metricSlice := scopeMetricsSlice.At(j).Metrics()

// TODO: decide if instrumentation library information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric))

if !isValidAggregationTemporality(metric) {
errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name()))
continue
}

promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes)

// handle individual metrics based on type
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddGaugeNumberDataPoints(dataPoints, resource, settings, promName))
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddSumNumberDataPoints(dataPoints, resource, metric, settings, promName))
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddHistogramDataPoints(dataPoints, resource, settings, promName))
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddExponentialHistogramDataPoints(
dataPoints,
resource,
settings,
promName,
))
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddSummaryDataPoints(dataPoints, resource, settings, promName))
default:
errs = multierr.Append(errs, errors.New("unsupported metric type"))
}
}
}
addResourceTargetInfo(resource, settings, mostRecentTimestamp, converter)
}

return
}
Loading

0 comments on commit f6b5ed8

Please sign in to comment.