Skip to content

Commit

Permalink
Support configurable AggregationTemporality in exporters; add OTLP mi…
Browse files Browse the repository at this point in the history
…ssing sum point temporality/monotonic fields (#1296)

* Restructure ExportKindSelector helpers; eliminate PassThroughExportKind; add StatelessExportKindSelector()

* WithExportKindSelector(); Additional testing

* Changelog update

* Test the new selectors

* From review feedback

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
jmacd and MrAlias authored Nov 10, 2020
1 parent 3a06b39 commit f9984f2
Show file tree
Hide file tree
Showing 19 changed files with 341 additions and 168 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `ErrorOption` has been changed to an interface to conform with project design standards which included adding a `NewErrorConfig` function.
- `EmptySpanContext` is removed.
- Move the `go.opentelemetry.io/otel/api/trace/tracetest` package into `go.opentelemetry.io/otel/oteltest`. (#1229)
- OTLP Exporter supports OTLP v0.5.0. (#1230)
- OTLP Exporter updates:
- supports OTLP v0.5.0 (#1230)
- supports configurable aggregation temporality (default: Cumulative, optional: Stateless). (#1296)
- The Sampler is now called on local child spans. (#1233)
- The `Kind` type from the `go.opentelemetry.io/otel/api/metric` package was renamed to `InstrumentKind` to more specifically describe what it is and avoid semantic ambiguity. (#1240)
- The `MetricKind` method of the `Descriptor` type in the `go.opentelemetry.io/otel/api/metric` package was renamed to `Descriptor.InstrumentKind`.
Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ func (e *Exporter) Controller() *pull.Controller {
return e.controller
}

func (e *Exporter) ExportKindFor(*otel.Descriptor, aggregation.Kind) export.ExportKind {
func (e *Exporter) ExportKindFor(desc *otel.Descriptor, kind aggregation.Kind) export.ExportKind {
// NOTE: Summary values should use Delta aggregation, then be
// combined into a sliding window, see the TODO below.
// NOTE: Prometheus also supports a "GaugeDelta" exposition format,
// which is expressed as a delta histogram. Need to understand if this
// should be a default behavior for ValueRecorder/ValueObserver.
return export.CumulativeExporter
return export.CumulativeExportKindSelector().ExportKindFor(desc, kind)
}

func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down
38 changes: 26 additions & 12 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector
for i := uint(0); i < numWorkers; i++ {
go func() {
defer wg.Done()
transformer(ctx, records, transformed)
transformer(ctx, exportSelector, records, transformed)
}()
}
go func() {
Expand Down Expand Up @@ -131,9 +131,9 @@ func source(ctx context.Context, exportSelector export.ExportKindSelector, cps e

// transformer transforms records read from the passed in chan into
// OTLP Metrics which are sent on the out chan.
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
func transformer(ctx context.Context, exportSelector export.ExportKindSelector, in <-chan export.Record, out chan<- result) {
for r := range in {
m, err := Record(r)
m, err := Record(exportSelector, r)
// Propagate errors, but do not send empty results.
if err == nil && m == nil {
continue
Expand Down Expand Up @@ -250,7 +250,7 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e

// Record transforms a Record into an OTLP Metric. An ErrIncompatibleAgg
// error is returned if the Record Aggregator is not supported.
func Record(r export.Record) (*metricpb.Metric, error) {
func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricpb.Metric, error) {
agg := r.Aggregation()
switch agg.Kind() {
case aggregation.MinMaxSumCountKind:
Expand All @@ -265,7 +265,7 @@ func Record(r export.Record) (*metricpb.Metric, error) {
if !ok {
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
}
return histogram(r, h)
return histogramPoint(r, exportSelector.ExportKindFor(r.Descriptor(), aggregation.HistogramKind), h)

case aggregation.SumKind:
s, ok := agg.(aggregation.Sum)
Expand All @@ -276,7 +276,7 @@ func Record(r export.Record) (*metricpb.Metric, error) {
if err != nil {
return nil, err
}
return scalar(r, sum, r.StartTime(), r.EndTime())
return sumPoint(r, sum, r.StartTime(), r.EndTime(), exportSelector.ExportKindFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic())

case aggregation.LastValueKind:
lv, ok := agg.(aggregation.LastValue)
Expand All @@ -287,14 +287,14 @@ func Record(r export.Record) (*metricpb.Metric, error) {
if err != nil {
return nil, err
}
return gauge(r, value, time.Time{}, tm)
return gaugePoint(r, value, time.Time{}, tm)

default:
return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg)
}
}

func gauge(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) {
func gaugePoint(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()

Expand Down Expand Up @@ -338,9 +338,17 @@ func gauge(record export.Record, num otel.Number, start, end time.Time) (*metric
return m, nil
}

// scalar transforms a Sum or LastValue Aggregator into an OTLP Metric.
// For LastValue (Gauge), use start==time.Time{}.
func scalar(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) {
func exportKindToTemporality(ek export.ExportKind) metricpb.AggregationTemporality {
switch ek {
case export.DeltaExportKind:
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA
case export.CumulativeExportKind:
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE
}
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED
}

func sumPoint(record export.Record, num otel.Number, start, end time.Time, ek export.ExportKind, monotonic bool) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()

Expand All @@ -354,6 +362,8 @@ func scalar(record export.Record, num otel.Number, start, end time.Time) (*metri
case otel.Int64NumberKind:
m.Data = &metricpb.Metric_IntSum{
IntSum: &metricpb.IntSum{
IsMonotonic: monotonic,
AggregationTemporality: exportKindToTemporality(ek),
DataPoints: []*metricpb.IntDataPoint{
{
Value: num.CoerceToInt64(n),
Expand All @@ -367,6 +377,8 @@ func scalar(record export.Record, num otel.Number, start, end time.Time) (*metri
case otel.Float64NumberKind:
m.Data = &metricpb.Metric_DoubleSum{
DoubleSum: &metricpb.DoubleSum{
IsMonotonic: monotonic,
AggregationTemporality: exportKindToTemporality(ek),
DataPoints: []*metricpb.DoubleDataPoint{
{
Value: num.CoerceToFloat64(n),
Expand Down Expand Up @@ -473,7 +485,7 @@ func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []fl
}

// histogram transforms a Histogram Aggregator into an OTLP Metric.
func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric, error) {
func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Histogram) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
boundaries, counts, err := histogramValues(a)
Expand Down Expand Up @@ -504,6 +516,7 @@ func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric,
case otel.Int64NumberKind:
m.Data = &metricpb.Metric_IntHistogram{
IntHistogram: &metricpb.IntHistogram{
AggregationTemporality: exportKindToTemporality(ek),
DataPoints: []*metricpb.IntHistogramDataPoint{
{
Sum: sum.CoerceToInt64(n),
Expand All @@ -520,6 +533,7 @@ func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric,
case otel.Float64NumberKind:
m.Data = &metricpb.Metric_DoubleHistogram{
DoubleHistogram: &metricpb.DoubleHistogram{
AggregationTemporality: exportKindToTemporality(ek),
DataPoints: []*metricpb.DoubleHistogramDataPoint{
{
Sum: sum.CoerceToFloat64(n),
Expand Down
43 changes: 27 additions & 16 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var (
intervalEnd = intervalStart.Add(time.Hour)
)

const (
otelCumulative = metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE
otelDelta = metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA
)

func TestStringKeyValues(t *testing.T) {
tests := []struct {
kvs []label.KeyValue
Expand Down Expand Up @@ -167,14 +172,17 @@ func TestSumIntDataPoints(t *testing.T) {
value, err := sum.Sum()
require.NoError(t, err)

if m, err := scalar(record, value, record.StartTime(), record.EndTime()); assert.NoError(t, err) {
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true); assert.NoError(t, err) {
assert.Nil(t, m.GetIntGauge())
assert.Nil(t, m.GetIntHistogram())
assert.Equal(t, []*metricpb.IntDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}, m.GetIntSum().DataPoints)
assert.Equal(t, &metricpb.IntSum{
AggregationTemporality: otelCumulative,
IsMonotonic: true,
DataPoints: []*metricpb.IntDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}}, m.GetIntSum())
assert.Nil(t, m.GetDoubleGauge())
assert.Nil(t, m.GetDoubleHistogram())
}
Expand All @@ -192,17 +200,20 @@ func TestSumFloatDataPoints(t *testing.T) {
value, err := sum.Sum()
require.NoError(t, err)

if m, err := scalar(record, value, record.StartTime(), record.EndTime()); assert.NoError(t, err) {
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.DeltaExportKind, false); assert.NoError(t, err) {
assert.Nil(t, m.GetIntGauge())
assert.Nil(t, m.GetIntHistogram())
assert.Nil(t, m.GetIntSum())
assert.Nil(t, m.GetDoubleGauge())
assert.Nil(t, m.GetDoubleHistogram())
assert.Equal(t, []*metricpb.DoubleDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}, m.GetDoubleSum().DataPoints)
assert.Equal(t, &metricpb.DoubleSum{
IsMonotonic: false,
AggregationTemporality: otelDelta,
DataPoints: []*metricpb.DoubleDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}}, m.GetDoubleSum())
}
}

Expand All @@ -218,7 +229,7 @@ func TestLastValueIntDataPoints(t *testing.T) {
value, timestamp, err := sum.LastValue()
require.NoError(t, err)

if m, err := gauge(record, value, time.Time{}, timestamp); assert.NoError(t, err) {
if m, err := gaugePoint(record, value, time.Time{}, timestamp); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.IntDataPoint{{
Value: 100,
StartTimeUnixNano: 0,
Expand All @@ -240,7 +251,7 @@ func TestSumErrUnknownValueType(t *testing.T) {
value, err := s.Sum()
require.NoError(t, err)

_, err = scalar(record, value, record.StartTime(), record.EndTime())
_, err = sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true)
assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) {
t.Errorf("expected ErrUnknownValueType, got %v", err)
Expand Down Expand Up @@ -325,7 +336,7 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
kind: kind,
agg: agg,
}
return Record(export.NewRecord(&desc, &labels, res, test, intervalStart, intervalEnd))
return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, res, test, intervalStart, intervalEnd))
}

mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0])
Expand Down Expand Up @@ -358,7 +369,7 @@ func TestRecordAggregatorUnexpectedErrors(t *testing.T) {
desc := otel.NewDescriptor("things", otel.CounterInstrumentKind, otel.Int64NumberKind)
labels := label.NewSet()
res := resource.Empty()
return Record(export.NewRecord(&desc, &labels, res, agg, intervalStart, intervalEnd))
return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, res, agg, intervalStart, intervalEnd))
}

errEx := fmt.Errorf("timeout")
Expand Down
11 changes: 11 additions & 0 deletions exporters/otlp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
)

const (
Expand Down Expand Up @@ -83,6 +85,7 @@ type config struct {
headers map[string]string
clientCredentials credentials.TransportCredentials
numWorkers uint
exportKindSelector metricsdk.ExportKindSelector
}

// WorkerCount sets the number of Goroutines to use when processing telemetry.
Expand Down Expand Up @@ -165,3 +168,11 @@ func WithGRPCDialOption(opts ...grpc.DialOption) ExporterOption {
cfg.grpcDialOptions = opts
}
}

// WithMetricExportKindSelector defines the ExportKindSelector used for selecting
// AggregationTemporality (i.e., Cumulative vs. Delta aggregation).
func WithMetricExportKindSelector(selector metricsdk.ExportKindSelector) ExporterOption {
return func(cfg *config) {
cfg.exportKindSelector = selector
}
}
14 changes: 8 additions & 6 deletions exporters/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func newConfig(opts ...ExporterOption) config {
cfg := config{
numWorkers: DefaultNumWorkers,
grpcServiceConfig: DefaultGRPCServiceConfig,

// Note: the default ExportKindSelector is specified
// as Cumulative:
// https://github.com/open-telemetry/opentelemetry-specification/issues/731
exportKindSelector: metricsdk.CumulativeExportKindSelector(),
}
for _, opt := range opts {
opt(&cfg)
Expand All @@ -93,9 +98,6 @@ func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
if len(e.c.headers) > 0 {
e.metadata = metadata.New(e.c.headers)
}

// TODO (rghetia): add resources

return e
}

Expand Down Expand Up @@ -286,9 +288,9 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
}

// ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter
// metric telemetry that it needs to be provided in a pass-through format.
func (e *Exporter) ExportKindFor(*otel.Descriptor, aggregation.Kind) metricsdk.ExportKind {
return metricsdk.PassThroughExporter
// metric telemetry that it needs to be provided in a cumulative format.
func (e *Exporter) ExportKindFor(desc *otel.Descriptor, kind aggregation.Kind) metricsdk.ExportKind {
return e.c.exportKindSelector.ExportKindFor(desc, kind)
}

// ExportSpans exports a batch of SpanData.
Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
}

selector := simple.NewWithInexpensiveDistribution()
processor := processor.New(selector, metricsdk.PassThroughExporter)
processor := processor.New(selector, metricsdk.StatelessExportKindSelector())
pusher := push.New(processor, exp)
pusher.Start()

Expand Down Expand Up @@ -509,7 +509,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
span.End()

selector := simple.NewWithInexpensiveDistribution()
processor := processor.New(selector, metricsdk.PassThroughExporter)
processor := processor.New(selector, metricsdk.StatelessExportKindSelector())
pusher := push.New(processor, exp)
pusher.Start()

Expand Down
Loading

0 comments on commit f9984f2

Please sign in to comment.