Skip to content

Commit b6ff474

Browse files
committed
Pass logger down to FromMetrics
This commit passes logger down to FromMetrics and extends the converter to printout some information about the incoming series timestamps and start timestamps. Only for sums. Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> format series Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> dont store, log right away Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com>
1 parent 2a2019e commit b6ff474

File tree

5 files changed

+43
-9
lines changed

5 files changed

+43
-9
lines changed

storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ import (
2121
"errors"
2222
"fmt"
2323
"sort"
24+
"strings"
2425

26+
"github.com/go-kit/log"
27+
"github.com/go-kit/log/level"
2528
"go.opentelemetry.io/collector/pdata/pcommon"
2629
"go.opentelemetry.io/collector/pdata/pmetric"
2730
"go.uber.org/multierr"
@@ -42,6 +45,12 @@ type Settings struct {
4245
EnableCreatedTimestampZeroIngestion bool
4346
}
4447

48+
type StartTsAndTs struct {
49+
StartTs int64
50+
Ts int64
51+
Labels []prompb.Label
52+
}
53+
4554
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
4655
type PrometheusConverter struct {
4756
unique map[uint64]*prompb.TimeSeries
@@ -57,7 +66,7 @@ func NewPrometheusConverter() *PrometheusConverter {
5766
}
5867

5968
// FromMetrics converts pmetric.Metrics to Prometheus remote write format.
60-
func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) {
69+
func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings, logger log.Logger) (annots annotations.Annotations, errs error) {
6170
c.everyN = everyNTimes{n: 128}
6271
resourceMetricsSlice := md.ResourceMetrics()
6372
for i := 0; i < resourceMetricsSlice.Len(); i++ {
@@ -108,7 +117,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
108117
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
109118
break
110119
}
111-
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName); err != nil {
120+
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName, logger); err != nil {
112121
errs = multierr.Append(errs, err)
113122
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
114123
return
@@ -226,3 +235,22 @@ func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Lab
226235
ts.Samples = append(ts.Samples, *sample)
227236
return ts
228237
}
238+
239+
// TODO(jesus.vazquez) This method is for debugging only and its meant to be removed soon.
240+
// trackStartTimestampForSeries logs the start timestamp for a series if it has been seen before.
241+
func (c *PrometheusConverter) trackStartTimestampForSeries(startTs, ts int64, lbls []prompb.Label, logger log.Logger) {
242+
h := timeSeriesSignature(lbls)
243+
if _, ok := c.unique[h]; ok {
244+
var seriesBuilder strings.Builder
245+
seriesBuilder.WriteString("{")
246+
for i, l := range lbls {
247+
if i > 0 {
248+
seriesBuilder.WriteString(",")
249+
}
250+
seriesBuilder.WriteString(fmt.Sprintf("%s=%s", l.Name, l.Value))
251+
252+
}
253+
seriesBuilder.WriteString("}")
254+
level.Debug(logger).Log("labels", seriesBuilder.String(), "start_ts", startTs, "ts", ts)
255+
}
256+
}

storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/go-kit/log"
2526
"github.com/stretchr/testify/require"
2627
"go.opentelemetry.io/collector/pdata/pcommon"
2728
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -36,7 +37,7 @@ func TestFromMetrics(t *testing.T) {
3637
cancel()
3738
payload := createExportRequest(5, 128, 128, 2, 0)
3839

39-
_, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{})
40+
_, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{}, log.NewNopLogger())
4041
require.ErrorIs(t, err, context.Canceled)
4142
})
4243

@@ -47,7 +48,7 @@ func TestFromMetrics(t *testing.T) {
4748
t.Cleanup(cancel)
4849
payload := createExportRequest(5, 128, 128, 2, 0)
4950

50-
_, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{})
51+
_, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{}, log.NewNopLogger())
5152
require.ErrorIs(t, err, context.DeadlineExceeded)
5253
})
5354

@@ -74,7 +75,7 @@ func TestFromMetrics(t *testing.T) {
7475
}
7576

7677
converter := NewPrometheusConverter()
77-
annots, err := converter.FromMetrics(context.Background(), request.Metrics(), Settings{})
78+
annots, err := converter.FromMetrics(context.Background(), request.Metrics(), Settings{}, log.NewNopLogger())
7879
require.NoError(t, err)
7980
require.NotEmpty(t, annots)
8081
ws, infos := annots.AsStrings("", 0, 0)
@@ -107,7 +108,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
107108

108109
for i := 0; i < b.N; i++ {
109110
converter := NewPrometheusConverter()
110-
annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{})
111+
annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{}, log.NewNopLogger())
111112
require.NoError(b, err)
112113
require.Empty(b, annots)
113114
require.NotNil(b, converter.TimeSeries())

storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"math"
2222

23+
"github.com/go-kit/log"
2324
"github.com/prometheus/common/model"
2425
"go.opentelemetry.io/collector/pdata/pcommon"
2526
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -65,13 +66,14 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
6566
}
6667

6768
func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
68-
resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) error {
69+
resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string, logger log.Logger) error {
6970
for x := 0; x < dataPoints.Len(); x++ {
7071
if err := c.everyN.checkContext(ctx); err != nil {
7172
return err
7273
}
7374

7475
pt := dataPoints.At(x)
76+
timestamp := convertTimeStamp(pt.Timestamp())
7577
startTimestampNs := pt.StartTimestamp()
7678
startTimestampMs := convertTimeStamp(pt.StartTimestamp())
7779
lbls := createAttributes(
@@ -83,7 +85,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
8385
model.MetricNameLabel,
8486
name,
8587
)
86-
timestamp := convertTimeStamp(pt.Timestamp())
88+
8789
sample := &prompb.Sample{
8890
Timestamp: timestamp,
8991
}
@@ -125,6 +127,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
125127
}
126128
c.addTimeSeriesIfNeeded(createdLabels, startTimestampMs, pt.Timestamp())
127129
}
130+
c.trackStartTimestampForSeries(startTimestampMs, timestamp, lbls, logger)
128131
}
129132

130133
return nil

storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/go-kit/log"
2425
"github.com/prometheus/common/model"
2526
"github.com/stretchr/testify/assert"
2627
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -255,6 +256,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) {
255256
EnableCreatedTimestampZeroIngestion: true,
256257
},
257258
metric.Name(),
259+
log.NewNopLogger(),
258260
)
259261

260262
assert.Equal(t, tt.want(), converter.unique)

storage/remote/write_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
518518
AddMetricSuffixes: true,
519519
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
520520
EnableCreatedTimestampZeroIngestion: h.enableCTZeroIngestion,
521-
})
521+
}, h.logger)
522522
if err != nil {
523523
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
524524
}

0 commit comments

Comments
 (0)