Skip to content

Commit

Permalink
Use LastValue by default for ValueObserver instruments (#1165)
Browse files Browse the repository at this point in the history
* Use LastValue by default for ValueObserver instruments

* Update test

* Update test with feedback

* Update fix

* Update changelog

* sum->scalar

* Tests pass

* Add a test

* Undo incorrect proto adds

* Restore origin protos

* Restore

* Upstream

* Add more tests

* Precommit

* Typo

* ore test feedback
  • Loading branch information
jmacd authored Sep 24, 2020
1 parent 304d4cd commit 8c3cc43
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 55 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Don't consider unset environment variable for resource detection to be an error. (#1170)
- Rename `go.opentelemetry.io/otel/api/metric.ConfigureInstrument` to `NewInstrumentConfig` and
`go.opentelemetry.io/otel/api/metric.ConfigureMeter` to `NewMeterConfig`.
- ValueObserver instruments use LastValue aggregator by default. (#1165)
- OTLP Metric exporter supports LastValue aggregation. (#1165)
- Move the `go.opentelemetry.io/otel/api/unit` package to `go.opentelemetry.io/otel/unit`. (#1185)
- Rename `Provider` to `MeterProvider` in the `go.opentelemetry.io/otel/api/metric` package. (#1190)
- Rename `NoopProvider` to `NoopMeterProvider` in the `go.opentelemetry.io/otel/api/metric` package. (#1190)
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
// ),
// )
tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter))
pusher := push.New(simple.NewWithExactDistribution(), exporter)
pusher := push.New(simple.NewWithInexpensiveDistribution(), exporter)
pusher.Start()
metricProvider := pusher.MeterProvider()

Expand Down
79 changes: 58 additions & 21 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"strings"
"sync"
"time"

commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
Expand All @@ -40,6 +41,11 @@ var (
// aggregator is attempted.
ErrUnimplementedAgg = errors.New("unimplemented aggregator")

// ErrIncompatibleAgg is returned when
// aggregation.Kind implies an interface conversion that has
// failed
ErrIncompatibleAgg = errors.New("incompatible aggregation type")

// ErrUnknownValueType is returned when a transformation of an unknown value
// is attempted.
ErrUnknownValueType = errors.New("invalid value type")
Expand All @@ -60,6 +66,14 @@ type result struct {
Err error
}

// toNanos returns the number of nanoseconds since the UNIX epoch.
func toNanos(t time.Time) uint64 {
if t.IsZero() {
return 0
}
return uint64(t.UnixNano())
}

// CheckpointSet transforms all records contained in a checkpoint into
// batched OTLP ResourceMetrics.
func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
Expand Down Expand Up @@ -231,27 +245,50 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e
return rms, nil
}

// Record transforms a Record into an OTLP Metric. An ErrUnimplementedAgg
// Record transforms a Record into an OTLP Metric. An ErrIncompatibleAgg
// error is returned if the Record Aggregator is not supported.
func Record(r export.Record) (*metricpb.Metric, error) {
switch a := r.Aggregation().(type) {
case aggregation.MinMaxSumCount:
return minMaxSumCount(r, a)
case aggregation.Sum:
return sum(r, a)
agg := r.Aggregation()
switch agg.Kind() {
case aggregation.MinMaxSumCountKind:
mmsc, ok := agg.(aggregation.MinMaxSumCount)
if !ok {
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
}
return minMaxSumCount(r, mmsc)

case aggregation.SumKind:
s, ok := agg.(aggregation.Sum)
if !ok {
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
}
sum, err := s.Sum()
if err != nil {
return nil, err
}
return scalar(r, sum, r.StartTime(), r.EndTime())

case aggregation.LastValueKind:
lv, ok := agg.(aggregation.LastValue)
if !ok {
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
}
value, tm, err := lv.LastValue()
if err != nil {
return nil, err
}
return scalar(r, value, time.Time{}, tm)

default:
return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a)
return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg)
}
}

// sum transforms a Sum Aggregator into an OTLP Metric.
func sum(record export.Record, a aggregation.Sum) (*metricpb.Metric, error) {
// scalar transforms a Sum or LastValue Aggregator into an OTLP Metric.
// For LastValue (Gauge), use start==time.Time{}.
func scalar(record export.Record, num metric.Number, start, end time.Time) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
sum, err := a.Sum()
if err != nil {
return nil, err
}

m := &metricpb.Metric{
MetricDescriptor: &metricpb.MetricDescriptor{
Expand All @@ -266,20 +303,20 @@ func sum(record export.Record, a aggregation.Sum) (*metricpb.Metric, error) {
m.MetricDescriptor.Type = metricpb.MetricDescriptor_INT64
m.Int64DataPoints = []*metricpb.Int64DataPoint{
{
Value: sum.CoerceToInt64(n),
Value: num.CoerceToInt64(n),
Labels: stringKeyValues(labels.Iter()),
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
StartTimeUnixNano: toNanos(start),
TimeUnixNano: toNanos(end),
},
}
case metric.Float64NumberKind:
m.MetricDescriptor.Type = metricpb.MetricDescriptor_DOUBLE
m.DoubleDataPoints = []*metricpb.DoubleDataPoint{
{
Value: sum.CoerceToFloat64(n),
Value: num.CoerceToFloat64(n),
Labels: stringKeyValues(labels.Iter()),
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
StartTimeUnixNano: toNanos(start),
TimeUnixNano: toNanos(end),
},
}
default:
Expand Down Expand Up @@ -339,8 +376,8 @@ func minMaxSumCount(record export.Record, a aggregation.MinMaxSumCount) (*metric
Value: max.CoerceToFloat64(numKind),
},
},
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
StartTimeUnixNano: toNanos(record.StartTime()),
TimeUnixNano: toNanos(record.EndTime()),
},
},
}, nil
Expand Down
Loading

0 comments on commit 8c3cc43

Please sign in to comment.