diff --git a/.chloggen/prw-translator-support-exemplars-for-sum.yaml b/.chloggen/prw-translator-support-exemplars-for-sum.yaml new file mode 100644 index 000000000000..bb7a88533f87 --- /dev/null +++ b/.chloggen/prw-translator-support-exemplars-for-sum.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Export exemplars for the Monotonic Sum metric. + +# One or more tracking issues related to the change +issues: [ 17573 ] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/translator/prometheusremotewrite/helper.go b/pkg/translator/prometheusremotewrite/helper.go index f39fd769df46..1e6e733eb475 100644 --- a/pkg/translator/prometheusremotewrite/helper.go +++ b/pkg/translator/prometheusremotewrite/helper.go @@ -258,47 +258,6 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { return false } -// addSingleNumberDataPoint converts the metric value stored in pt to a Prometheus sample, and add the sample -// to its corresponding time series in tsMap -func addSingleNumberDataPoint(pt pmetric.NumberDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) { - // create parameters for addSample - name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) - labels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, name) - sample := &prompb.Sample{ - // convert ns to ms - Timestamp: convertTimeStamp(pt.Timestamp()), - } - switch pt.ValueType() { - case pmetric.NumberDataPointValueTypeInt: - sample.Value = float64(pt.IntValue()) - case pmetric.NumberDataPointValueTypeDouble: - sample.Value = pt.DoubleValue() - } - if pt.Flags().NoRecordedValue() { - sample.Value = math.Float64frombits(value.StaleNaN) - } - addSample(tsMap, sample, labels, metric.Type().String()) - - // add _created time series if needed - if settings.ExportCreatedMetric && isMonotonicSum(metric) { - startTimestamp := pt.StartTimestamp() - if startTimestamp != 0 { - createdLabels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - nameStr, - name+createdSuffix, - ) - addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) - } - } -} - -func isMonotonicSum(metric pmetric.Metric) bool { - return metric.Type() == pmetric.MetricTypeSum && metric.Sum().IsMonotonic() -} - // addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It // ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) { @@ -389,7 +348,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon } type exemplarType interface { - pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint + pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint | pmetric.NumberDataPoint Exemplars() pmetric.ExemplarSlice } diff --git a/pkg/translator/prometheusremotewrite/helper_test.go b/pkg/translator/prometheusremotewrite/helper_test.go index 987bbca70446..c841192468f3 100644 --- a/pkg/translator/prometheusremotewrite/helper_test.go +++ b/pkg/translator/prometheusremotewrite/helper_test.go @@ -670,128 +670,6 @@ func TestMostRecentTimestampInMetric(t *testing.T) { } } -func TestAddSingleNumberDataPoint(t *testing.T) { - ts := pcommon.Timestamp(time.Now().UnixNano()) - tests := []struct { - name string - metric func() pmetric.Metric - want func() map[string]*prompb.TimeSeries - }{ - { - name: "monotonic cumulative sum with start timestamp", - metric: func() pmetric.Metric { - metric := pmetric.NewMetric() - metric.SetName("test_sum") - metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - metric.SetEmptySum().SetIsMonotonic(true) - - dp := metric.Sum().DataPoints().AppendEmpty() - dp.SetDoubleValue(1) - dp.SetTimestamp(ts) - dp.SetStartTimestamp(ts) - - return metric - }, - want: func() map[string]*prompb.TimeSeries { - labels := []prompb.Label{ - {Name: model.MetricNameLabel, Value: "test_sum"}, - } - createdLabels := []prompb.Label{ - {Name: model.MetricNameLabel, Value: "test_sum" + createdSuffix}, - } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { - Labels: labels, - Samples: []prompb.Sample{ - {Value: 1, Timestamp: convertTimeStamp(ts)}, - }, - }, - timeSeriesSignature(pmetric.MetricTypeSum.String(), &createdLabels): { - Labels: createdLabels, - Samples: []prompb.Sample{ - {Value: float64(convertTimeStamp(ts))}, - }, - }, - } - }, - }, - { - name: "monotonic cumulative sum with no start time", - metric: func() pmetric.Metric { - metric := pmetric.NewMetric() - metric.SetName("test_sum") - metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - metric.SetEmptySum().SetIsMonotonic(true) - - dp := metric.Sum().DataPoints().AppendEmpty() - dp.SetTimestamp(ts) - - return metric - }, - want: func() map[string]*prompb.TimeSeries { - labels := []prompb.Label{ - {Name: model.MetricNameLabel, Value: "test_sum"}, - } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { - Labels: labels, - Samples: []prompb.Sample{ - {Value: 0, Timestamp: convertTimeStamp(ts)}, - }, - }, - } - }, - }, - { - name: "non-monotonic cumulative sum with start time", - metric: func() pmetric.Metric { - metric := pmetric.NewMetric() - metric.SetName("test_sum") - metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - metric.SetEmptySum().SetIsMonotonic(false) - - dp := metric.Sum().DataPoints().AppendEmpty() - dp.SetTimestamp(ts) - - return metric - }, - want: func() map[string]*prompb.TimeSeries { - labels := []prompb.Label{ - {Name: model.MetricNameLabel, Value: "test_sum"}, - } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { - Labels: labels, - Samples: []prompb.Sample{ - {Value: 0, Timestamp: convertTimeStamp(ts)}, - }, - }, - } - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - metric := tt.metric() - - got := make(map[string]*prompb.TimeSeries) - for x := 0; x < metric.Sum().DataPoints().Len(); x++ { - addSingleNumberDataPoint( - metric.Sum().DataPoints().At(x), - pcommon.NewResource(), - metric, - Settings{ - ExportCreatedMetric: true, - }, - got, - ) - } - - assert.Equal(t, tt.want(), got) - }) - } -} - func TestAddSingleSummaryDataPoint(t *testing.T) { ts := pcommon.Timestamp(time.Now().UnixNano()) tests := []struct { diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw.go b/pkg/translator/prometheusremotewrite/metrics_to_prw.go index 912f3dfeccfa..0bebe4e158f1 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw.go @@ -63,13 +63,19 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp switch metric.Type() { case pmetric.MetricTypeGauge: dataPoints := metric.Gauge().DataPoints() - if err := addNumberDataPointSlice(dataPoints, resource, metric, settings, tsMap); err != nil { - errs = multierr.Append(errs, err) + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() - if err := addNumberDataPointSlice(dataPoints, resource, metric, settings, tsMap); err != nil { - errs = multierr.Append(errs, err) + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) } case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() @@ -115,15 +121,3 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp return } - -func addNumberDataPointSlice(dataPoints pmetric.NumberDataPointSlice, - resource pcommon.Resource, metric pmetric.Metric, - settings Settings, tsMap map[string]*prompb.TimeSeries) error { - if dataPoints.Len() == 0 { - return fmt.Errorf("empty data points. %s is dropped", metric.Name()) - } - for x := 0; x < dataPoints.Len(); x++ { - addSingleNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) - } - return nil -} diff --git a/pkg/translator/prometheusremotewrite/number_data_points.go b/pkg/translator/prometheusremotewrite/number_data_points.go new file mode 100644 index 000000000000..e57ac75581d8 --- /dev/null +++ b/pkg/translator/prometheusremotewrite/number_data_points.go @@ -0,0 +1,113 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" + +import ( + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" +) + +// addSingleSumNumberDataPoint converts the Gauge metric data point to a +// Prometheus time series with samples and labels. The result is stored in the +// series map. +func addSingleGaugeNumberDataPoint( + pt pmetric.NumberDataPoint, + resource pcommon.Resource, + metric pmetric.Metric, + settings Settings, + series map[string]*prompb.TimeSeries, +) { + name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, name, + ) + sample := &prompb.Sample{ + // convert ns to ms + Timestamp: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + addSample(series, sample, labels, metric.Type().String()) +} + +// addSingleSumNumberDataPoint converts the Sum metric data point to a Prometheus +// time series with samples, labels and exemplars. The result is stored in the +// series map. +func addSingleSumNumberDataPoint( + pt pmetric.NumberDataPoint, + resource pcommon.Resource, + metric pmetric.Metric, + settings Settings, + series map[string]*prompb.TimeSeries, +) { + name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, name, + ) + sample := &prompb.Sample{ + // convert ns to ms + Timestamp: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + sig := addSample(series, sample, labels, metric.Type().String()) + + if ts, ok := series[sig]; sig != "" && ok { + exemplars := getPromExemplars[pmetric.NumberDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + + // add _created time series if needed + if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { + startTimestamp := pt.StartTimestamp() + if startTimestamp != 0 { + createdLabels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nameStr, + name+createdSuffix, + ) + addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, metric.Type().String()) + } + } +} diff --git a/pkg/translator/prometheusremotewrite/number_data_points_test.go b/pkg/translator/prometheusremotewrite/number_data_points_test.go new file mode 100644 index 000000000000..255e128904e4 --- /dev/null +++ b/pkg/translator/prometheusremotewrite/number_data_points_test.go @@ -0,0 +1,256 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheusremotewrite + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func TestAddSingleGaugeNumberDataPoint(t *testing.T) { + ts := uint64(time.Now().UnixNano()) + tests := []struct { + name string + metric func() pmetric.Metric + want func() map[string]*prompb.TimeSeries + }{ + { + name: "gauge", + metric: func() pmetric.Metric { + return getIntGaugeMetric( + "test", + pcommon.NewMap(), + 1, ts, + ) + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeGauge.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + { + Value: 1, + Timestamp: convertTimeStamp(pcommon.Timestamp(ts)), + }}, + }, + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := tt.metric() + + gotSeries := make(map[string]*prompb.TimeSeries) + + for x := 0; x < metric.Gauge().DataPoints().Len(); x++ { + addSingleGaugeNumberDataPoint( + metric.Gauge().DataPoints().At(x), + pcommon.NewResource(), + metric, + Settings{}, + gotSeries, + ) + } + assert.Equal(t, tt.want(), gotSeries) + }) + } +} + +func TestAddSingleSumNumberDataPoint(t *testing.T) { + ts := pcommon.Timestamp(time.Now().UnixNano()) + //ts := uint64(time.Now().UnixNano()) + tests := []struct { + name string + metric func() pmetric.Metric + want func() map[string]*prompb.TimeSeries + }{ + { + name: "sum", + metric: func() pmetric.Metric { + return getIntSumMetric( + "test", + pcommon.NewMap(), + pmetric.AggregationTemporalityCumulative, + 1, uint64(ts.AsTime().UnixNano()), + ) + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + { + Value: 1, + Timestamp: convertTimeStamp(pcommon.Timestamp(ts)), + }}, + }, + } + }, + }, + { + name: "sum with exemplars", + metric: func() pmetric.Metric { + m := getIntSumMetric( + "test", + pcommon.NewMap(), + pmetric.AggregationTemporalityCumulative, + 1, uint64(ts.AsTime().UnixNano()), + ) + m.Sum().DataPoints().At(0).Exemplars().AppendEmpty().SetDoubleValue(2) + return m + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{{ + Value: 1, + Timestamp: convertTimeStamp(pcommon.Timestamp(ts)), + }}, + Exemplars: []prompb.Exemplar{ + {Value: 2}, + }, + }, + } + }, + }, + { + name: "monotonic cumulative sum with start timestamp", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_sum") + metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + metric.SetEmptySum().SetIsMonotonic(true) + + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetDoubleValue(1) + dp.SetTimestamp(ts) + dp.SetStartTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_sum"}, + } + createdLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_sum" + createdSuffix}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 1, Timestamp: convertTimeStamp(ts)}, + }, + }, + timeSeriesSignature(pmetric.MetricTypeSum.String(), &createdLabels): { + Labels: createdLabels, + Samples: []prompb.Sample{ + {Value: float64(convertTimeStamp(ts))}, + }, + }, + } + }, + }, + { + name: "monotonic cumulative sum with no start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_sum") + metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + metric.SetEmptySum().SetIsMonotonic(true) + + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_sum"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + } + }, + }, + { + name: "non-monotonic cumulative sum with start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_sum") + metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + metric.SetEmptySum().SetIsMonotonic(false) + + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_sum"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := tt.metric() + + got := make(map[string]*prompb.TimeSeries) + + for x := 0; x < metric.Sum().DataPoints().Len(); x++ { + addSingleSumNumberDataPoint( + metric.Sum().DataPoints().At(x), + pcommon.NewResource(), + metric, + Settings{ExportCreatedMetric: true}, + got, + ) + } + assert.Equal(t, tt.want(), got) + }) + } +}