Skip to content

Commit 0c21f74

Browse files
committed
Pass logger down to FromMetrics
This commit passes logger down to FromMetrics and extends the converter to printout some information about the incoming series timestamps and start timestamps. Only for sums. Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> format series Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> dont store, log right away Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com>
1 parent 2a2019e commit 0c21f74

File tree

5 files changed

+44
-11
lines changed

5 files changed

+44
-11
lines changed

storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ import (
2121
"errors"
2222
"fmt"
2323
"sort"
24+
"strings"
2425

26+
"github.com/go-kit/log"
27+
"github.com/go-kit/log/level"
2528
"go.opentelemetry.io/collector/pdata/pcommon"
2629
"go.opentelemetry.io/collector/pdata/pmetric"
2730
"go.uber.org/multierr"
@@ -42,6 +45,12 @@ type Settings struct {
4245
EnableCreatedTimestampZeroIngestion bool
4346
}
4447

48+
type StartTsAndTs struct {
49+
StartTs int64
50+
Ts int64
51+
Labels []prompb.Label
52+
}
53+
4554
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
4655
type PrometheusConverter struct {
4756
unique map[uint64]*prompb.TimeSeries
@@ -57,7 +66,7 @@ func NewPrometheusConverter() *PrometheusConverter {
5766
}
5867

5968
// FromMetrics converts pmetric.Metrics to Prometheus remote write format.
60-
func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) {
69+
func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings, logger log.Logger) (annots annotations.Annotations, errs error) {
6170
c.everyN = everyNTimes{n: 128}
6271
resourceMetricsSlice := md.ResourceMetrics()
6372
for i := 0; i < resourceMetricsSlice.Len(); i++ {
@@ -108,7 +117,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
108117
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
109118
break
110119
}
111-
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName); err != nil {
120+
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName, logger); err != nil {
112121
errs = multierr.Append(errs, err)
113122
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
114123
return
@@ -226,3 +235,22 @@ func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Lab
226235
ts.Samples = append(ts.Samples, *sample)
227236
return ts
228237
}
238+
239+
// TODO(jesus.vazquez) This method is for debugging only and its meant to be removed soon.
240+
// trackStartTimestampForSeries logs the start timestamp for a series if it has been seen before.
241+
func (c *PrometheusConverter) trackStartTimestampForSeries(startTs, ts int64, lbls []prompb.Label, logger log.Logger) {
242+
h := timeSeriesSignature(lbls)
243+
if _, ok := c.unique[h]; ok {
244+
var seriesBuilder strings.Builder
245+
seriesBuilder.WriteString("{")
246+
for i, l := range lbls {
247+
if i > 0 {
248+
seriesBuilder.WriteString(",")
249+
}
250+
seriesBuilder.WriteString(fmt.Sprintf("%s=%s", l.Name, l.Value))
251+
252+
}
253+
seriesBuilder.WriteString("}")
254+
level.Debug(logger).Log("labels", seriesBuilder.String(), "start_ts", startTs, "ts", ts)
255+
}
256+
}

storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/go-kit/log"
2526
"github.com/stretchr/testify/require"
2627
"go.opentelemetry.io/collector/pdata/pcommon"
2728
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -36,7 +37,7 @@ func TestFromMetrics(t *testing.T) {
3637
cancel()
3738
payload := createExportRequest(5, 128, 128, 2, 0)
3839

39-
_, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{})
40+
_, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{}, log.NewNopLogger())
4041
require.ErrorIs(t, err, context.Canceled)
4142
})
4243

@@ -47,7 +48,7 @@ func TestFromMetrics(t *testing.T) {
4748
t.Cleanup(cancel)
4849
payload := createExportRequest(5, 128, 128, 2, 0)
4950

50-
_, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{})
51+
_, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{}, log.NewNopLogger())
5152
require.ErrorIs(t, err, context.DeadlineExceeded)
5253
})
5354

@@ -74,7 +75,7 @@ func TestFromMetrics(t *testing.T) {
7475
}
7576

7677
converter := NewPrometheusConverter()
77-
annots, err := converter.FromMetrics(context.Background(), request.Metrics(), Settings{})
78+
annots, err := converter.FromMetrics(context.Background(), request.Metrics(), Settings{}, log.NewNopLogger())
7879
require.NoError(t, err)
7980
require.NotEmpty(t, annots)
8081
ws, infos := annots.AsStrings("", 0, 0)
@@ -107,7 +108,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
107108

108109
for i := 0; i < b.N; i++ {
109110
converter := NewPrometheusConverter()
110-
annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{})
111+
annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{}, log.NewNopLogger())
111112
require.NoError(b, err)
112113
require.Empty(b, annots)
113114
require.NotNil(b, converter.TimeSeries())

storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"math"
2222

23+
"github.com/go-kit/log"
2324
"github.com/prometheus/common/model"
2425
"go.opentelemetry.io/collector/pdata/pcommon"
2526
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -65,14 +66,14 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
6566
}
6667

6768
func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
68-
resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) error {
69+
resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string, logger log.Logger) error {
6970
for x := 0; x < dataPoints.Len(); x++ {
7071
if err := c.everyN.checkContext(ctx); err != nil {
7172
return err
7273
}
7374

7475
pt := dataPoints.At(x)
75-
startTimestampNs := pt.StartTimestamp()
76+
timestamp := convertTimeStamp(pt.Timestamp())
7677
startTimestampMs := convertTimeStamp(pt.StartTimestamp())
7778
lbls := createAttributes(
7879
resource,
@@ -83,7 +84,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
8384
model.MetricNameLabel,
8485
name,
8586
)
86-
timestamp := convertTimeStamp(pt.Timestamp())
87+
8788
sample := &prompb.Sample{
8889
Timestamp: timestamp,
8990
}
@@ -111,7 +112,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
111112

112113
// add created time series if needed
113114
if settings.ExportCreatedMetric && isMonotonic {
114-
if startTimestampNs == 0 {
115+
if startTimestampMs == 0 {
115116
return nil
116117
}
117118

@@ -125,6 +126,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
125126
}
126127
c.addTimeSeriesIfNeeded(createdLabels, startTimestampMs, pt.Timestamp())
127128
}
129+
c.trackStartTimestampForSeries(startTimestampMs, timestamp, lbls, logger)
128130
}
129131

130132
return nil

storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/go-kit/log"
2425
"github.com/prometheus/common/model"
2526
"github.com/stretchr/testify/assert"
2627
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -255,6 +256,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) {
255256
EnableCreatedTimestampZeroIngestion: true,
256257
},
257258
metric.Name(),
259+
log.NewNopLogger(),
258260
)
259261

260262
assert.Equal(t, tt.want(), converter.unique)

storage/remote/write_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
518518
AddMetricSuffixes: true,
519519
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
520520
EnableCreatedTimestampZeroIngestion: h.enableCTZeroIngestion,
521-
})
521+
}, h.logger)
522522
if err != nil {
523523
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
524524
}

0 commit comments

Comments
 (0)