Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/cumulativetodelta] Convert cumulative Histograms to delta temporality (#12423) #12563

Merged
merged 15 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions processor/cumulativetodeltaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

## Description

The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum metrics to monotonic, delta sum metrics. Non-monotonic sums are excluded.
The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum and histogram metrics to monotonic, delta metrics. Non-monotonic sums and exponential histograms are excluded.

## Configuration

Expand All @@ -31,7 +31,7 @@ processors:
# processor name: cumulativetodelta
cumulativetodelta:

# list the exact cumulative sum metrics to convert to delta
# list the exact cumulative sum or histogram metrics to convert to delta
include:
metrics:
- <metric_1_name>
Expand All @@ -47,8 +47,8 @@ processors:
# processor name: cumulativetodelta
cumulativetodelta:

# Convert cumulative sum metrics to delta
# if and only if 'metric' is in the name
# Convert cumulative sum or histogram metrics to delta
# if and only if 'metric' is in the name
include:
metrics:
- "*metric*"
Expand All @@ -60,8 +60,8 @@ processors:
# processor name: cumulativetodelta
cumulativetodelta:

# Convert cumulative sum metrics to delta
# if and only if 'metric' is not in the name
# Convert cumulative sum or histogram metrics to delta
# if and only if 'metric' is not in the name
exclude:
metrics:
- "*metric*"
Expand All @@ -73,7 +73,7 @@ processors:
# processor name: cumulativetodelta
cumulativetodelta:
# If include/exclude are not specified
# convert all cumulative sum metrics to delta
# convert all cumulative sum or histogram metrics to delta
```

## Warnings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ type MetricIdentity struct {
StartTimestamp pcommon.Timestamp
Attributes pcommon.Map
MetricValueType pmetric.NumberDataPointValueType
MetricField string
}

type HistogramIdentities struct {
CountIdentity MetricIdentity
SumIdentity MetricIdentity
BucketIdentities []MetricIdentity
}

const A = int32('A')
Expand Down Expand Up @@ -75,12 +82,17 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) {
})
b.WriteByte(SEP)
b.WriteString(strconv.FormatInt(int64(mi.StartTimestamp), 36))

if mi.MetricField != "" {
b.WriteByte(SEP)
b.WriteString(mi.MetricField)
}
}

func (mi *MetricIdentity) IsFloatVal() bool {
return mi.MetricValueType == pmetric.NumberDataPointValueTypeDouble
}

func (mi *MetricIdentity) IsSupportedMetricType() bool {
return mi.MetricDataType == pmetric.MetricDataTypeSum
return mi.MetricDataType == pmetric.MetricDataTypeSum || mi.MetricDataType == pmetric.MetricDataTypeHistogram
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestMetricIdentity_Write(t *testing.T) {
StartTimestamp pcommon.Timestamp
Attributes pcommon.Map
MetricValueType pmetric.NumberDataPointValueType
MetricField string
}
tests := []struct {
name string
Expand Down Expand Up @@ -72,6 +73,18 @@ func TestMetricIdentity_Write(t *testing.T) {
},
want: []string{"C" + SEPSTR + "B", "Y"},
},
{
name: "histogram sum",
fields: fields{
Resource: resource,
InstrumentationLibrary: il,
Attributes: attributes,
MetricDataType: pmetric.MetricDataTypeHistogram,
MetricValueType: pmetric.NumberDataPointValueTypeInt,
MetricField: "bound_100",
},
want: []string{"D" + SEPSTR + "B", "bound_100"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -85,6 +98,7 @@ func TestMetricIdentity_Write(t *testing.T) {
StartTimestamp: tt.fields.StartTimestamp,
Attributes: tt.fields.Attributes,
MetricValueType: tt.fields.MetricValueType,
MetricField: tt.fields.MetricField,
}
b := &bytes.Buffer{}
mi.Write(b)
Expand Down Expand Up @@ -159,6 +173,34 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) {
fields: fields{
MetricDataType: pmetric.MetricDataTypeHistogram,
},
want: true,
},
{
name: "none",
fields: fields{
MetricDataType: pmetric.MetricDataTypeNone,
},
want: false,
},
{
name: "gauge",
fields: fields{
MetricDataType: pmetric.MetricDataTypeGauge,
},
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
want: false,
},
{
name: "exponential_histogram",
fields: fields{
MetricDataType: pmetric.MetricDataTypeExponentialHistogram,
},
want: false,
},
{
name: "summary",
fields: fields{
MetricDataType: pmetric.MetricDataTypeSummary,
},
want: false,
},
}
Expand Down
154 changes: 145 additions & 9 deletions processor/cumulativetodeltaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,47 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele

import (
"context"
"fmt"
"math"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/service/featuregate"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking"
)

const enableHistogramSupportGateID = "processor.cumulativetodeltaprocessor.EnableHistogramSupport"

var enableHistogramSupportGate = featuregate.Gate{
ID: enableHistogramSupportGateID,
Enabled: false,
Description: "wip",
}

func init() {
featuregate.GetRegistry().MustRegister(enableHistogramSupportGate)
}

type cumulativeToDeltaProcessor struct {
metrics map[string]struct{}
includeFS filterset.FilterSet
excludeFS filterset.FilterSet
logger *zap.Logger
deltaCalculator *tracking.MetricTracker
cancelFunc context.CancelFunc
metrics map[string]struct{}
includeFS filterset.FilterSet
excludeFS filterset.FilterSet
logger *zap.Logger
deltaCalculator *tracking.MetricTracker
cancelFunc context.CancelFunc
histogramSupportEnabled bool
}

func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor {
ctx, cancel := context.WithCancel(context.Background())
p := &cumulativeToDeltaProcessor{
logger: logger,
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness),
cancelFunc: cancel,
logger: logger,
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness),
cancelFunc: cancel,
histogramSupportEnabled: featuregate.GetRegistry().IsEnabled(enableHistogramSupportGateID),
}
if len(config.Metrics) > 0 {
p.logger.Warn("The 'metrics' configuration is deprecated. Use 'include'/'exclude' instead.")
Expand Down Expand Up @@ -89,6 +106,47 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme
MetricIsMonotonic: ms.IsMonotonic(),
}
ctdp.convertDataPoints(ms.DataPoints(), baseIdentity)
ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
return ms.DataPoints().Len() == 0
case pmetric.MetricDataTypeHistogram:
if !ctdp.histogramSupportEnabled {
return false
}

ms := m.Histogram()
if ms.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative {
return false
}

if ms.DataPoints().Len() == 0 {
return false
}

countIdentity := tracking.MetricIdentity{
Resource: rm.Resource(),
InstrumentationLibrary: ilm.Scope(),
MetricDataType: m.DataType(),
MetricName: m.Name(),
MetricUnit: m.Unit(),
MetricIsMonotonic: true,
MetricValueType: pmetric.NumberDataPointValueTypeInt,
MetricField: "count",
}

sumIdentity := countIdentity
sumIdentity.MetricField = "sum"
sumIdentity.MetricValueType = pmetric.NumberDataPointValueTypeDouble

bucketIdentities := makeBucketIdentities(countIdentity, ms.DataPoints().At(0))

histogramIdentities := tracking.HistogramIdentities{
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
CountIdentity: countIdentity,
SumIdentity: sumIdentity,
BucketIdentities: bucketIdentities,
}

ctdp.convertHistogramDataPoints(ms.DataPoints(), &histogramIdentities)

ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
return ms.DataPoints().Len() == 0
default:
Expand All @@ -102,6 +160,19 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme
return md, nil
}

func makeBucketIdentities(baseIdentity tracking.MetricIdentity, dp pmetric.HistogramDataPoint) []tracking.MetricIdentity {
numBuckets := dp.BucketCounts().Len()
bucketIdentities := make([]tracking.MetricIdentity, numBuckets)

for index := 0; index < numBuckets; index++ {
bucketIdentity := baseIdentity
bucketIdentity.MetricField = fmt.Sprintf("bucket_%d", index)
bucketIdentities[index] = bucketIdentity
}

return bucketIdentities
}

func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error {
ctdp.cancelFunc()
return nil
Expand All @@ -117,6 +188,32 @@ func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) b
(ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName))
}

func (ctdp *cumulativeToDeltaProcessor) convertHistogramFloatValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value float64) (tracking.DeltaValue, bool) {
id.StartTimestamp = dp.StartTimestamp()
id.Attributes = dp.Attributes()
trackingPoint := tracking.MetricPoint{
Identity: id,
Value: tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
FloatValue: value,
},
}
return ctdp.deltaCalculator.Convert(trackingPoint)
}

func (ctdp *cumulativeToDeltaProcessor) convertHistogramIntValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value int64) (tracking.DeltaValue, bool) {
id.StartTimestamp = dp.StartTimestamp()
id.Attributes = dp.Attributes()
trackingPoint := tracking.MetricPoint{
Identity: id,
Value: tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
IntValue: value,
},
}
return ctdp.deltaCalculator.Convert(trackingPoint)
}

func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) {

if dps, ok := in.(pmetric.NumberDataPointSlice); ok {
Expand Down Expand Up @@ -159,3 +256,42 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId
})
}
}

func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentities *tracking.HistogramIdentities) {

if dps, ok := in.(pmetric.HistogramDataPointSlice); ok {
dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
countID := baseIdentities.CountIdentity
countDelta, countValid := ctdp.convertHistogramIntValue(countID, dp, int64(dp.Count()))

hasSum := dp.HasSum() && !math.IsNaN(dp.Sum())
sumDelta, sumValid := tracking.DeltaValue{}, true

if hasSum {
sumID := baseIdentities.SumIdentity
sumDelta, sumValid = ctdp.convertHistogramFloatValue(sumID, dp, dp.Sum())
}

bucketsValid := true
rawBucketCounts := dp.BucketCounts().AsRaw()
for index := 0; index < len(rawBucketCounts); index++ {
bucketID := baseIdentities.BucketIdentities[index]
bucketDelta, bucketValid := ctdp.convertHistogramIntValue(bucketID, dp, int64(rawBucketCounts[index]))
rawBucketCounts[index] = uint64(bucketDelta.IntValue)
bucketsValid = bucketsValid && bucketValid
}

if countValid && sumValid && bucketsValid {
dp.SetStartTimestamp(countDelta.StartTimestamp)
dp.SetCount(uint64(countDelta.IntValue))
if hasSum {
dp.SetSum(sumDelta.FloatValue)
}
dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(rawBucketCounts))
return false
}

return true
})
}
}
Loading