Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] Export exemplars for the Monotonic S…
Browse files Browse the repository at this point in the history
…um metric.
  • Loading branch information
kovrus committed Jan 17, 2023
1 parent cee4d97 commit e1a337f
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 180 deletions.
16 changes: 16 additions & 0 deletions .chloggen/prw-translator-support-exemplars-for-sum.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Export exemplars for the Monotonic Sum metric.

# One or more tracking issues related to the change
issues: [ 17573 ]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
43 changes: 1 addition & 42 deletions pkg/translator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,47 +258,6 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool {
return false
}

// addSingleNumberDataPoint converts the metric value stored in pt to a Prometheus sample, and add the sample
// to its corresponding time series in tsMap
func addSingleNumberDataPoint(pt pmetric.NumberDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) {
// create parameters for addSample
name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
labels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, name)
sample := &prompb.Sample{
// convert ns to ms
Timestamp: convertTimeStamp(pt.Timestamp()),
}
switch pt.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
sample.Value = float64(pt.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
sample.Value = pt.DoubleValue()
}
if pt.Flags().NoRecordedValue() {
sample.Value = math.Float64frombits(value.StaleNaN)
}
addSample(tsMap, sample, labels, metric.Type().String())

// add _created time series if needed
if settings.ExportCreatedMetric && isMonotonicSum(metric) {
startTimestamp := pt.StartTimestamp()
if startTimestamp != 0 {
createdLabels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
nameStr,
name+createdSuffix,
)
addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String())
}
}
}

func isMonotonicSum(metric pmetric.Metric) bool {
return metric.Type() == pmetric.MetricTypeSum && metric.Sum().IsMonotonic()
}

// addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It
// ignore extra buckets if len(ExplicitBounds) > len(BucketCounts)
func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) {
Expand Down Expand Up @@ -389,7 +348,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon
}

type exemplarType interface {
pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint
pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint | pmetric.NumberDataPoint
Exemplars() pmetric.ExemplarSlice
}

Expand Down
122 changes: 0 additions & 122 deletions pkg/translator/prometheusremotewrite/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,128 +670,6 @@ func TestMostRecentTimestampInMetric(t *testing.T) {
}
}

func TestAddSingleNumberDataPoint(t *testing.T) {
ts := pcommon.Timestamp(time.Now().UnixNano())
tests := []struct {
name string
metric func() pmetric.Metric
want func() map[string]*prompb.TimeSeries
}{
{
name: "monotonic cumulative sum with start timestamp",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_sum")
metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.SetEmptySum().SetIsMonotonic(true)

dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleValue(1)
dp.SetTimestamp(ts)
dp.SetStartTimestamp(ts)

return metric
},
want: func() map[string]*prompb.TimeSeries {
labels := []prompb.Label{
{Name: model.MetricNameLabel, Value: "test_sum"},
}
createdLabels := []prompb.Label{
{Name: model.MetricNameLabel, Value: "test_sum" + createdSuffix},
}
return map[string]*prompb.TimeSeries{
timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 1, Timestamp: convertTimeStamp(ts)},
},
},
timeSeriesSignature(pmetric.MetricTypeSum.String(), &createdLabels): {
Labels: createdLabels,
Samples: []prompb.Sample{
{Value: float64(convertTimeStamp(ts))},
},
},
}
},
},
{
name: "monotonic cumulative sum with no start time",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_sum")
metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.SetEmptySum().SetIsMonotonic(true)

dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetTimestamp(ts)

return metric
},
want: func() map[string]*prompb.TimeSeries {
labels := []prompb.Label{
{Name: model.MetricNameLabel, Value: "test_sum"},
}
return map[string]*prompb.TimeSeries{
timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(ts)},
},
},
}
},
},
{
name: "non-monotonic cumulative sum with start time",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_sum")
metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.SetEmptySum().SetIsMonotonic(false)

dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetTimestamp(ts)

return metric
},
want: func() map[string]*prompb.TimeSeries {
labels := []prompb.Label{
{Name: model.MetricNameLabel, Value: "test_sum"},
}
return map[string]*prompb.TimeSeries{
timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(ts)},
},
},
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metric := tt.metric()

got := make(map[string]*prompb.TimeSeries)
for x := 0; x < metric.Sum().DataPoints().Len(); x++ {
addSingleNumberDataPoint(
metric.Sum().DataPoints().At(x),
pcommon.NewResource(),
metric,
Settings{
ExportCreatedMetric: true,
},
got,
)
}

assert.Equal(t, tt.want(), got)
})
}
}

func TestAddSingleSummaryDataPoint(t *testing.T) {
ts := pcommon.Timestamp(time.Now().UnixNano())
tests := []struct {
Expand Down
26 changes: 10 additions & 16 deletions pkg/translator/prometheusremotewrite/metrics_to_prw.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,19 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if err := addNumberDataPointSlice(dataPoints, resource, metric, settings, tsMap); err != nil {
errs = multierr.Append(errs, err)
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap)
}
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
if err := addNumberDataPointSlice(dataPoints, resource, metric, settings, tsMap); err != nil {
errs = multierr.Append(errs, err)
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap)
}
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
Expand Down Expand Up @@ -115,15 +121,3 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp

return
}

func addNumberDataPointSlice(dataPoints pmetric.NumberDataPointSlice,
resource pcommon.Resource, metric pmetric.Metric,
settings Settings, tsMap map[string]*prompb.TimeSeries) error {
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap)
}
return nil
}
113 changes: 113 additions & 0 deletions pkg/translator/prometheusremotewrite/number_data_points.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"math"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

// addSingleSumNumberDataPoint converts the Gauge metric data point to a
// Prometheus time series with samples and labels. The result is stored in the
// series map.
func addSingleGaugeNumberDataPoint(
pt pmetric.NumberDataPoint,
resource pcommon.Resource,
metric pmetric.Metric,
settings Settings,
series map[string]*prompb.TimeSeries,
) {
name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
labels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
model.MetricNameLabel, name,
)
sample := &prompb.Sample{
// convert ns to ms
Timestamp: convertTimeStamp(pt.Timestamp()),
}
switch pt.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
sample.Value = float64(pt.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
sample.Value = pt.DoubleValue()
}
if pt.Flags().NoRecordedValue() {
sample.Value = math.Float64frombits(value.StaleNaN)
}
addSample(series, sample, labels, metric.Type().String())
}

// addSingleSumNumberDataPoint converts the Sum metric data point to a Prometheus
// time series with samples, labels and exemplars. The result is stored in the
// series map.
func addSingleSumNumberDataPoint(
pt pmetric.NumberDataPoint,
resource pcommon.Resource,
metric pmetric.Metric,
settings Settings,
series map[string]*prompb.TimeSeries,
) {
name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
labels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
model.MetricNameLabel, name,
)
sample := &prompb.Sample{
// convert ns to ms
Timestamp: convertTimeStamp(pt.Timestamp()),
}
switch pt.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
sample.Value = float64(pt.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
sample.Value = pt.DoubleValue()
}
if pt.Flags().NoRecordedValue() {
sample.Value = math.Float64frombits(value.StaleNaN)
}
sig := addSample(series, sample, labels, metric.Type().String())

if ts, ok := series[sig]; sig != "" && ok {
exemplars := getPromExemplars[pmetric.NumberDataPoint](pt)
ts.Exemplars = append(ts.Exemplars, exemplars...)
}

// add _created time series if needed
if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() {
startTimestamp := pt.StartTimestamp()
if startTimestamp != 0 {
createdLabels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
nameStr,
name+createdSuffix,
)
addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, metric.Type().String())
}
}
}
Loading

0 comments on commit e1a337f

Please sign in to comment.