From f9984f2d4ec2a09f42358406d3bac8d83b589494 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 10 Nov 2020 07:44:42 -0800 Subject: [PATCH] Support configurable AggregationTemporality in exporters; add OTLP missing sum point temporality/monotonic fields (#1296) * Restructure ExportKindSelector helpers; eliminate PassThroughExportKind; add StatelessExportKindSelector() * WithExportKindSelector(); Additional testing * Changelog update * Test the new selectors * From review feedback Co-authored-by: Tyler Yahn --- CHANGELOG.md | 4 +- exporters/metric/prometheus/prometheus.go | 4 +- exporters/otlp/internal/transform/metric.go | 38 +++-- .../otlp/internal/transform/metric_test.go | 43 ++++-- exporters/otlp/options.go | 11 ++ exporters/otlp/otlp.go | 14 +- exporters/otlp/otlp_integration_test.go | 4 +- exporters/otlp/otlp_metric_test.go | 142 +++++++++++++++--- exporters/stdout/metric.go | 4 +- sdk/export/metric/exportkind_string.go | 26 +--- sdk/export/metric/exportkind_test.go | 43 +++--- sdk/export/metric/metric.go | 70 +++++++-- sdk/metric/controller/pull/pull_test.go | 14 +- sdk/metric/controller/push/push_test.go | 2 +- sdk/metric/processor/basic/basic.go | 18 +-- sdk/metric/processor/basic/basic_test.go | 66 ++++---- sdk/metric/processor/processortest/test.go | 2 +- .../processor/processortest/test_test.go | 2 +- sdk/metric/processor/reducer/reducer_test.go | 2 +- 19 files changed, 341 insertions(+), 168 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 262d87c7da5..20a503020dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `ErrorOption` has been changed to an interface to conform with project design standards which included adding a `NewErrorConfig` function. - `EmptySpanContext` is removed. - Move the `go.opentelemetry.io/otel/api/trace/tracetest` package into `go.opentelemetry.io/otel/oteltest`. (#1229) -- OTLP Exporter supports OTLP v0.5.0. (#1230) +- OTLP Exporter updates: + - supports OTLP v0.5.0 (#1230) + - supports configurable aggregation temporality (default: Cumulative, optional: Stateless). (#1296) - The Sampler is now called on local child spans. (#1233) - The `Kind` type from the `go.opentelemetry.io/otel/api/metric` package was renamed to `InstrumentKind` to more specifically describe what it is and avoid semantic ambiguity. (#1240) - The `MetricKind` method of the `Descriptor` type in the `go.opentelemetry.io/otel/api/metric` package was renamed to `Descriptor.InstrumentKind`. diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 5730e71460a..6d047954f0f 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -166,13 +166,13 @@ func (e *Exporter) Controller() *pull.Controller { return e.controller } -func (e *Exporter) ExportKindFor(*otel.Descriptor, aggregation.Kind) export.ExportKind { +func (e *Exporter) ExportKindFor(desc *otel.Descriptor, kind aggregation.Kind) export.ExportKind { // NOTE: Summary values should use Delta aggregation, then be // combined into a sliding window, see the TODO below. // NOTE: Prometheus also supports a "GaugeDelta" exposition format, // which is expressed as a delta histogram. Need to understand if this // should be a default behavior for ValueRecorder/ValueObserver. - return export.CumulativeExporter + return export.CumulativeExportKindSelector().ExportKindFor(desc, kind) } func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/exporters/otlp/internal/transform/metric.go b/exporters/otlp/internal/transform/metric.go index 1bcffcf19c6..04bd3fccdd9 100644 --- a/exporters/otlp/internal/transform/metric.go +++ b/exporters/otlp/internal/transform/metric.go @@ -86,7 +86,7 @@ func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector for i := uint(0); i < numWorkers; i++ { go func() { defer wg.Done() - transformer(ctx, records, transformed) + transformer(ctx, exportSelector, records, transformed) }() } go func() { @@ -131,9 +131,9 @@ func source(ctx context.Context, exportSelector export.ExportKindSelector, cps e // transformer transforms records read from the passed in chan into // OTLP Metrics which are sent on the out chan. -func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) { +func transformer(ctx context.Context, exportSelector export.ExportKindSelector, in <-chan export.Record, out chan<- result) { for r := range in { - m, err := Record(r) + m, err := Record(exportSelector, r) // Propagate errors, but do not send empty results. if err == nil && m == nil { continue @@ -250,7 +250,7 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e // Record transforms a Record into an OTLP Metric. An ErrIncompatibleAgg // error is returned if the Record Aggregator is not supported. -func Record(r export.Record) (*metricpb.Metric, error) { +func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricpb.Metric, error) { agg := r.Aggregation() switch agg.Kind() { case aggregation.MinMaxSumCountKind: @@ -265,7 +265,7 @@ func Record(r export.Record) (*metricpb.Metric, error) { if !ok { return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg) } - return histogram(r, h) + return histogramPoint(r, exportSelector.ExportKindFor(r.Descriptor(), aggregation.HistogramKind), h) case aggregation.SumKind: s, ok := agg.(aggregation.Sum) @@ -276,7 +276,7 @@ func Record(r export.Record) (*metricpb.Metric, error) { if err != nil { return nil, err } - return scalar(r, sum, r.StartTime(), r.EndTime()) + return sumPoint(r, sum, r.StartTime(), r.EndTime(), exportSelector.ExportKindFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic()) case aggregation.LastValueKind: lv, ok := agg.(aggregation.LastValue) @@ -287,14 +287,14 @@ func Record(r export.Record) (*metricpb.Metric, error) { if err != nil { return nil, err } - return gauge(r, value, time.Time{}, tm) + return gaugePoint(r, value, time.Time{}, tm) default: return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg) } } -func gauge(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) { +func gaugePoint(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) { desc := record.Descriptor() labels := record.Labels() @@ -338,9 +338,17 @@ func gauge(record export.Record, num otel.Number, start, end time.Time) (*metric return m, nil } -// scalar transforms a Sum or LastValue Aggregator into an OTLP Metric. -// For LastValue (Gauge), use start==time.Time{}. -func scalar(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) { +func exportKindToTemporality(ek export.ExportKind) metricpb.AggregationTemporality { + switch ek { + case export.DeltaExportKind: + return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA + case export.CumulativeExportKind: + return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE + } + return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED +} + +func sumPoint(record export.Record, num otel.Number, start, end time.Time, ek export.ExportKind, monotonic bool) (*metricpb.Metric, error) { desc := record.Descriptor() labels := record.Labels() @@ -354,6 +362,8 @@ func scalar(record export.Record, num otel.Number, start, end time.Time) (*metri case otel.Int64NumberKind: m.Data = &metricpb.Metric_IntSum{ IntSum: &metricpb.IntSum{ + IsMonotonic: monotonic, + AggregationTemporality: exportKindToTemporality(ek), DataPoints: []*metricpb.IntDataPoint{ { Value: num.CoerceToInt64(n), @@ -367,6 +377,8 @@ func scalar(record export.Record, num otel.Number, start, end time.Time) (*metri case otel.Float64NumberKind: m.Data = &metricpb.Metric_DoubleSum{ DoubleSum: &metricpb.DoubleSum{ + IsMonotonic: monotonic, + AggregationTemporality: exportKindToTemporality(ek), DataPoints: []*metricpb.DoubleDataPoint{ { Value: num.CoerceToFloat64(n), @@ -473,7 +485,7 @@ func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []fl } // histogram transforms a Histogram Aggregator into an OTLP Metric. -func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric, error) { +func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Histogram) (*metricpb.Metric, error) { desc := record.Descriptor() labels := record.Labels() boundaries, counts, err := histogramValues(a) @@ -504,6 +516,7 @@ func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric, case otel.Int64NumberKind: m.Data = &metricpb.Metric_IntHistogram{ IntHistogram: &metricpb.IntHistogram{ + AggregationTemporality: exportKindToTemporality(ek), DataPoints: []*metricpb.IntHistogramDataPoint{ { Sum: sum.CoerceToInt64(n), @@ -520,6 +533,7 @@ func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric, case otel.Float64NumberKind: m.Data = &metricpb.Metric_DoubleHistogram{ DoubleHistogram: &metricpb.DoubleHistogram{ + AggregationTemporality: exportKindToTemporality(ek), DataPoints: []*metricpb.DoubleHistogramDataPoint{ { Sum: sum.CoerceToFloat64(n), diff --git a/exporters/otlp/internal/transform/metric_test.go b/exporters/otlp/internal/transform/metric_test.go index 1d1bdb5b978..2122ef3dec4 100644 --- a/exporters/otlp/internal/transform/metric_test.go +++ b/exporters/otlp/internal/transform/metric_test.go @@ -48,6 +48,11 @@ var ( intervalEnd = intervalStart.Add(time.Hour) ) +const ( + otelCumulative = metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE + otelDelta = metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA +) + func TestStringKeyValues(t *testing.T) { tests := []struct { kvs []label.KeyValue @@ -167,14 +172,17 @@ func TestSumIntDataPoints(t *testing.T) { value, err := sum.Sum() require.NoError(t, err) - if m, err := scalar(record, value, record.StartTime(), record.EndTime()); assert.NoError(t, err) { + if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true); assert.NoError(t, err) { assert.Nil(t, m.GetIntGauge()) assert.Nil(t, m.GetIntHistogram()) - assert.Equal(t, []*metricpb.IntDataPoint{{ - Value: 1, - StartTimeUnixNano: uint64(intervalStart.UnixNano()), - TimeUnixNano: uint64(intervalEnd.UnixNano()), - }}, m.GetIntSum().DataPoints) + assert.Equal(t, &metricpb.IntSum{ + AggregationTemporality: otelCumulative, + IsMonotonic: true, + DataPoints: []*metricpb.IntDataPoint{{ + Value: 1, + StartTimeUnixNano: uint64(intervalStart.UnixNano()), + TimeUnixNano: uint64(intervalEnd.UnixNano()), + }}}, m.GetIntSum()) assert.Nil(t, m.GetDoubleGauge()) assert.Nil(t, m.GetDoubleHistogram()) } @@ -192,17 +200,20 @@ func TestSumFloatDataPoints(t *testing.T) { value, err := sum.Sum() require.NoError(t, err) - if m, err := scalar(record, value, record.StartTime(), record.EndTime()); assert.NoError(t, err) { + if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.DeltaExportKind, false); assert.NoError(t, err) { assert.Nil(t, m.GetIntGauge()) assert.Nil(t, m.GetIntHistogram()) assert.Nil(t, m.GetIntSum()) assert.Nil(t, m.GetDoubleGauge()) assert.Nil(t, m.GetDoubleHistogram()) - assert.Equal(t, []*metricpb.DoubleDataPoint{{ - Value: 1, - StartTimeUnixNano: uint64(intervalStart.UnixNano()), - TimeUnixNano: uint64(intervalEnd.UnixNano()), - }}, m.GetDoubleSum().DataPoints) + assert.Equal(t, &metricpb.DoubleSum{ + IsMonotonic: false, + AggregationTemporality: otelDelta, + DataPoints: []*metricpb.DoubleDataPoint{{ + Value: 1, + StartTimeUnixNano: uint64(intervalStart.UnixNano()), + TimeUnixNano: uint64(intervalEnd.UnixNano()), + }}}, m.GetDoubleSum()) } } @@ -218,7 +229,7 @@ func TestLastValueIntDataPoints(t *testing.T) { value, timestamp, err := sum.LastValue() require.NoError(t, err) - if m, err := gauge(record, value, time.Time{}, timestamp); assert.NoError(t, err) { + if m, err := gaugePoint(record, value, time.Time{}, timestamp); assert.NoError(t, err) { assert.Equal(t, []*metricpb.IntDataPoint{{ Value: 100, StartTimeUnixNano: 0, @@ -240,7 +251,7 @@ func TestSumErrUnknownValueType(t *testing.T) { value, err := s.Sum() require.NoError(t, err) - _, err = scalar(record, value, record.StartTime(), record.EndTime()) + _, err = sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true) assert.Error(t, err) if !errors.Is(err, ErrUnknownValueType) { t.Errorf("expected ErrUnknownValueType, got %v", err) @@ -325,7 +336,7 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) { kind: kind, agg: agg, } - return Record(export.NewRecord(&desc, &labels, res, test, intervalStart, intervalEnd)) + return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, res, test, intervalStart, intervalEnd)) } mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0]) @@ -358,7 +369,7 @@ func TestRecordAggregatorUnexpectedErrors(t *testing.T) { desc := otel.NewDescriptor("things", otel.CounterInstrumentKind, otel.Int64NumberKind) labels := label.NewSet() res := resource.Empty() - return Record(export.NewRecord(&desc, &labels, res, agg, intervalStart, intervalEnd)) + return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, res, agg, intervalStart, intervalEnd)) } errEx := fmt.Errorf("timeout") diff --git a/exporters/otlp/options.go b/exporters/otlp/options.go index 9cd375005f4..737f56b8c08 100644 --- a/exporters/otlp/options.go +++ b/exporters/otlp/options.go @@ -19,6 +19,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" + + metricsdk "go.opentelemetry.io/otel/sdk/export/metric" ) const ( @@ -83,6 +85,7 @@ type config struct { headers map[string]string clientCredentials credentials.TransportCredentials numWorkers uint + exportKindSelector metricsdk.ExportKindSelector } // WorkerCount sets the number of Goroutines to use when processing telemetry. @@ -165,3 +168,11 @@ func WithGRPCDialOption(opts ...grpc.DialOption) ExporterOption { cfg.grpcDialOptions = opts } } + +// WithMetricExportKindSelector defines the ExportKindSelector used for selecting +// AggregationTemporality (i.e., Cumulative vs. Delta aggregation). +func WithMetricExportKindSelector(selector metricsdk.ExportKindSelector) ExporterOption { + return func(cfg *config) { + cfg.exportKindSelector = selector + } +} diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 3cd43ca0a9f..78d20a66f86 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -70,6 +70,11 @@ func newConfig(opts ...ExporterOption) config { cfg := config{ numWorkers: DefaultNumWorkers, grpcServiceConfig: DefaultGRPCServiceConfig, + + // Note: the default ExportKindSelector is specified + // as Cumulative: + // https://github.com/open-telemetry/opentelemetry-specification/issues/731 + exportKindSelector: metricsdk.CumulativeExportKindSelector(), } for _, opt := range opts { opt(&cfg) @@ -93,9 +98,6 @@ func NewUnstartedExporter(opts ...ExporterOption) *Exporter { if len(e.c.headers) > 0 { e.metadata = metadata.New(e.c.headers) } - - // TODO (rghetia): add resources - return e } @@ -286,9 +288,9 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e } // ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter -// metric telemetry that it needs to be provided in a pass-through format. -func (e *Exporter) ExportKindFor(*otel.Descriptor, aggregation.Kind) metricsdk.ExportKind { - return metricsdk.PassThroughExporter +// metric telemetry that it needs to be provided in a cumulative format. +func (e *Exporter) ExportKindFor(desc *otel.Descriptor, kind aggregation.Kind) metricsdk.ExportKind { + return e.c.exportKindSelector.ExportKindFor(desc, kind) } // ExportSpans exports a batch of SpanData. diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index 84a0f3e8d87..7999659ea4a 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -120,7 +120,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) } selector := simple.NewWithInexpensiveDistribution() - processor := processor.New(selector, metricsdk.PassThroughExporter) + processor := processor.New(selector, metricsdk.StatelessExportKindSelector()) pusher := push.New(processor, exp) pusher.Start() @@ -509,7 +509,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { span.End() selector := simple.NewWithInexpensiveDistribution() - processor := processor.New(selector, metricsdk.PassThroughExporter) + processor := processor.New(selector, metricsdk.StatelessExportKindSelector()) pusher := push.New(processor, exp) pusher.Start() diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index c82b605edf0..96d517b29e7 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -159,6 +159,7 @@ var ( func TestNoGroupingExport(t *testing.T) { runMetricExportTests( t, + nil, []record{ { "int64-count", @@ -280,7 +281,7 @@ func TestValuerecorderMetricGroupingExport(t *testing.T) { }, }, } - runMetricExportTests(t, []record{r, r}, expected) + runMetricExportTests(t, nil, []record{r, r}, expected) } func TestCountInt64MetricGroupingExport(t *testing.T) { @@ -294,6 +295,7 @@ func TestCountInt64MetricGroupingExport(t *testing.T) { } runMetricExportTests( t, + nil, []record{r, r}, []metricpb.ResourceMetrics{ { @@ -343,6 +345,7 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) { } runMetricExportTests( t, + nil, []record{r, r}, []metricpb.ResourceMetrics{ { @@ -402,6 +405,7 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) { func TestResourceMetricGroupingExport(t *testing.T) { runMetricExportTests( t, + nil, []record{ { "int64-count", @@ -519,6 +523,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { } runMetricExportTests( t, + nil, []record{ { "int64-count", @@ -695,13 +700,78 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { ) } +func TestStatelessExportKind(t *testing.T) { + type testcase struct { + name string + instrumentKind otel.InstrumentKind + aggTemporality metricpb.AggregationTemporality + monotonic bool + } + + for _, k := range []testcase{ + {"counter", otel.CounterInstrumentKind, metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, true}, + {"updowncounter", otel.UpDownCounterInstrumentKind, metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, false}, + {"sumobserver", otel.SumObserverInstrumentKind, metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, true}, + {"updownsumobserver", otel.UpDownSumObserverInstrumentKind, metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, false}, + } { + t.Run(k.name, func(t *testing.T) { + runMetricExportTests( + t, + []ExporterOption{ + WithMetricExportKindSelector( + metricsdk.StatelessExportKindSelector(), + ), + }, + []record{ + { + "instrument", + k.instrumentKind, + otel.Int64NumberKind, + testInstA, + nil, + append(baseKeyValues, cpuKey.Int(1)), + }, + }, + []metricpb.ResourceMetrics{ + { + Resource: testerAResource, + InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{ + { + Metrics: []*metricpb.Metric{ + { + Name: "instrument", + Data: &metricpb.Metric_IntSum{ + IntSum: &metricpb.IntSum{ + IsMonotonic: k.monotonic, + AggregationTemporality: k.aggTemporality, + DataPoints: []*metricpb.IntDataPoint{ + { + Value: 11, + Labels: cpu1Labels, + StartTimeUnixNano: startTime(), + TimeUnixNano: pointTime(), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + ) + }) + } +} + // What works single-threaded should work multi-threaded -func runMetricExportTests(t *testing.T, rs []record, expected []metricpb.ResourceMetrics) { +func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { t.Run("1 goroutine", func(t *testing.T) { - runMetricExportTest(t, NewUnstartedExporter(WorkerCount(1)), rs, expected) + runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(1))...), rs, expected) }) t.Run("20 goroutines", func(t *testing.T) { - runMetricExportTest(t, NewUnstartedExporter(WorkerCount(20)), rs, expected) + runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(20))...), rs, expected) }) } @@ -713,27 +783,41 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me recs := map[label.Distinct][]metricsdk.Record{} resources := map[label.Distinct]*resource.Resource{} for _, r := range rs { + lcopy := make([]label.KeyValue, len(r.labels)) + copy(lcopy, r.labels) desc := otel.NewDescriptor(r.name, r.iKind, r.nKind, r.opts...) - labs := label.NewSet(r.labels...) + labs := label.NewSet(lcopy...) var agg, ckpt metricsdk.Aggregator - switch r.iKind { - case otel.CounterInstrumentKind: + if r.iKind.Adding() { agg, ckpt = metrictest.Unslice2(sum.New(2)) - default: + } else { agg, ckpt = metrictest.Unslice2(histogram.New(2, &desc, testHistogramBoundaries)) } ctx := context.Background() - switch r.nKind { - case otel.Int64NumberKind: - require.NoError(t, agg.Update(ctx, otel.NewInt64Number(1), &desc)) - require.NoError(t, agg.Update(ctx, otel.NewInt64Number(10), &desc)) - case otel.Float64NumberKind: - require.NoError(t, agg.Update(ctx, otel.NewFloat64Number(1), &desc)) - require.NoError(t, agg.Update(ctx, otel.NewFloat64Number(10), &desc)) - default: - t.Fatalf("invalid number kind: %v", r.nKind) + if r.iKind.Synchronous() { + // For synchronous instruments, perform two updates: 1 and 10 + switch r.nKind { + case otel.Int64NumberKind: + require.NoError(t, agg.Update(ctx, otel.NewInt64Number(1), &desc)) + require.NoError(t, agg.Update(ctx, otel.NewInt64Number(10), &desc)) + case otel.Float64NumberKind: + require.NoError(t, agg.Update(ctx, otel.NewFloat64Number(1), &desc)) + require.NoError(t, agg.Update(ctx, otel.NewFloat64Number(10), &desc)) + default: + t.Fatalf("invalid number kind: %v", r.nKind) + } + } else { + // For asynchronous instruments, perform a single update: 11 + switch r.nKind { + case otel.Int64NumberKind: + require.NoError(t, agg.Update(ctx, otel.NewInt64Number(11), &desc)) + case otel.Float64NumberKind: + require.NoError(t, agg.Update(ctx, otel.NewFloat64Number(11), &desc)) + default: + t.Fatalf("invalid number kind: %v", r.nKind) + } } require.NoError(t, agg.SynchronizedMove(ckpt, &desc)) @@ -787,14 +871,38 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me case *metricpb.Metric_IntGauge: assert.ElementsMatch(t, expected.GetIntGauge().DataPoints, g[i].GetIntGauge().DataPoints) case *metricpb.Metric_IntHistogram: + assert.Equal(t, + expected.GetIntHistogram().GetAggregationTemporality(), + g[i].GetIntHistogram().GetAggregationTemporality(), + ) assert.ElementsMatch(t, expected.GetIntHistogram().DataPoints, g[i].GetIntHistogram().DataPoints) case *metricpb.Metric_IntSum: + assert.Equal(t, + expected.GetIntSum().GetAggregationTemporality(), + g[i].GetIntSum().GetAggregationTemporality(), + ) + assert.Equal(t, + expected.GetIntSum().GetIsMonotonic(), + g[i].GetIntSum().GetIsMonotonic(), + ) assert.ElementsMatch(t, expected.GetIntSum().DataPoints, g[i].GetIntSum().DataPoints) case *metricpb.Metric_DoubleGauge: assert.ElementsMatch(t, expected.GetDoubleGauge().DataPoints, g[i].GetDoubleGauge().DataPoints) case *metricpb.Metric_DoubleHistogram: + assert.Equal(t, + expected.GetDoubleHistogram().GetAggregationTemporality(), + g[i].GetDoubleHistogram().GetAggregationTemporality(), + ) assert.ElementsMatch(t, expected.GetDoubleHistogram().DataPoints, g[i].GetDoubleHistogram().DataPoints) case *metricpb.Metric_DoubleSum: + assert.Equal(t, + expected.GetDoubleSum().GetAggregationTemporality(), + g[i].GetDoubleSum().GetAggregationTemporality(), + ) + assert.Equal(t, + expected.GetDoubleSum().GetIsMonotonic(), + g[i].GetDoubleSum().GetIsMonotonic(), + ) assert.ElementsMatch(t, expected.GetDoubleSum().DataPoints, g[i].GetDoubleSum().DataPoints) default: assert.Failf(t, "unknown data type", g[i].Name) diff --git a/exporters/stdout/metric.go b/exporters/stdout/metric.go index 91f6876a209..a7c02c5aa35 100644 --- a/exporters/stdout/metric.go +++ b/exporters/stdout/metric.go @@ -52,8 +52,8 @@ type quantile struct { Value interface{} `json:"Value"` } -func (e *metricExporter) ExportKindFor(*otel.Descriptor, aggregation.Kind) exportmetric.ExportKind { - return exportmetric.PassThroughExporter +func (e *metricExporter) ExportKindFor(desc *otel.Descriptor, kind aggregation.Kind) exportmetric.ExportKind { + return exportmetric.StatelessExportKindSelector().ExportKindFor(desc, kind) } func (e *metricExporter) Export(_ context.Context, checkpointSet exportmetric.CheckpointSet) error { diff --git a/sdk/export/metric/exportkind_string.go b/sdk/export/metric/exportkind_string.go index c2f8bcf8dff..a92c1c1f2de 100644 --- a/sdk/export/metric/exportkind_string.go +++ b/sdk/export/metric/exportkind_string.go @@ -8,28 +8,18 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} - _ = x[CumulativeExporter-1] - _ = x[DeltaExporter-2] - _ = x[PassThroughExporter-4] + _ = x[CumulativeExportKind-1] + _ = x[DeltaExportKind-2] } -const ( - _ExportKind_name_0 = "CumulativeExporterDeltaExporter" - _ExportKind_name_1 = "PassThroughExporter" -) +const _ExportKind_name = "CumulativeExportKindDeltaExportKind" -var ( - _ExportKind_index_0 = [...]uint8{0, 18, 31} -) +var _ExportKind_index = [...]uint8{0, 20, 35} func (i ExportKind) String() string { - switch { - case 1 <= i && i <= 2: - i -= 1 - return _ExportKind_name_0[_ExportKind_index_0[i]:_ExportKind_index_0[i+1]] - case i == 4: - return _ExportKind_name_1 - default: - return "ExportKind(" + strconv.FormatInt(int64(i), 10) + ")" + i -= 1 + if i < 0 || i >= ExportKind(len(_ExportKind_index)-1) { + return "ExportKind(" + strconv.FormatInt(int64(i+1), 10) + ")" } + return _ExportKind_name[_ExportKind_index[i]:_ExportKind_index[i+1]] } diff --git a/sdk/export/metric/exportkind_test.go b/sdk/export/metric/exportkind_test.go index 19ef5542ae4..030aff57547 100644 --- a/sdk/export/metric/exportkind_test.go +++ b/sdk/export/metric/exportkind_test.go @@ -23,18 +23,9 @@ import ( "go.opentelemetry.io/otel/sdk/export/metric/aggregation" ) -func TestExportKindIdentity(t *testing.T) { - akind := aggregation.Kind("Noop") - - require.Equal(t, CumulativeExporter, CumulativeExporter.ExportKindFor(nil, akind)) - require.Equal(t, DeltaExporter, DeltaExporter.ExportKindFor(nil, akind)) - require.Equal(t, PassThroughExporter, PassThroughExporter.ExportKindFor(nil, akind)) -} - func TestExportKindIncludes(t *testing.T) { - require.True(t, CumulativeExporter.Includes(CumulativeExporter)) - require.True(t, DeltaExporter.Includes(CumulativeExporter|DeltaExporter)) - require.False(t, DeltaExporter.Includes(PassThroughExporter|CumulativeExporter)) + require.True(t, CumulativeExportKind.Includes(CumulativeExportKind)) + require.True(t, DeltaExportKind.Includes(CumulativeExportKind|DeltaExportKind)) } var deltaMemoryKinds = []otel.InstrumentKind{ @@ -51,14 +42,32 @@ var cumulativeMemoryKinds = []otel.InstrumentKind{ func TestExportKindMemoryRequired(t *testing.T) { for _, kind := range deltaMemoryKinds { - require.True(t, DeltaExporter.MemoryRequired(kind)) - require.False(t, CumulativeExporter.MemoryRequired(kind)) - require.False(t, PassThroughExporter.MemoryRequired(kind)) + require.True(t, DeltaExportKind.MemoryRequired(kind)) + require.False(t, CumulativeExportKind.MemoryRequired(kind)) } for _, kind := range cumulativeMemoryKinds { - require.True(t, CumulativeExporter.MemoryRequired(kind)) - require.False(t, DeltaExporter.MemoryRequired(kind)) - require.False(t, PassThroughExporter.MemoryRequired(kind)) + require.True(t, CumulativeExportKind.MemoryRequired(kind)) + require.False(t, DeltaExportKind.MemoryRequired(kind)) + } +} + +func TestExportKindSelectors(t *testing.T) { + ceks := CumulativeExportKindSelector() + deks := DeltaExportKindSelector() + seks := StatelessExportKindSelector() + + for _, ikind := range append(deltaMemoryKinds, cumulativeMemoryKinds...) { + desc := otel.NewDescriptor("instrument", ikind, otel.Int64NumberKind) + + var akind aggregation.Kind + if ikind.Adding() { + akind = aggregation.SumKind + } else { + akind = aggregation.HistogramKind + } + require.Equal(t, CumulativeExportKind, ceks.ExportKindFor(&desc, akind)) + require.Equal(t, DeltaExportKind, deks.ExportKindFor(&desc, akind)) + require.False(t, seks.ExportKindFor(&desc, akind).MemoryRequired(ikind)) } } diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 1f7fa5e5077..a89a19db449 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -358,18 +358,13 @@ func (r Record) EndTime() time.Time { type ExportKind int const ( - // CumulativeExporter indicates that the Exporter expects a + // CumulativeExportKind indicates that an Exporter expects a // Cumulative Aggregation. - CumulativeExporter ExportKind = 1 // e.g., Prometheus + CumulativeExportKind ExportKind = 1 - // DeltaExporter indicates that the Exporter expects a + // DeltaExportKind indicates that an Exporter expects a // Delta Aggregation. - DeltaExporter ExportKind = 2 // e.g., StatsD - - // PassThroughExporter indicates that the Exporter expects - // either a Cumulative or a Delta Aggregation, whichever does - // not require maintaining state for the given instrument. - PassThroughExporter ExportKind = 4 // e.g., OTLP + DeltaExportKind ExportKind = 2 ) // Includes tests whether `kind` includes a specific kind of @@ -378,11 +373,6 @@ func (kind ExportKind) Includes(has ExportKind) bool { return kind&has != 0 } -// ExportKindFor returns a constant, as an implementation of ExportKindSelector. -func (kind ExportKind) ExportKindFor(_ *otel.Descriptor, _ aggregation.Kind) ExportKind { - return kind -} - // MemoryRequired returns whether an exporter of this kind requires // memory to export correctly. func (kind ExportKind) MemoryRequired(mkind otel.InstrumentKind) bool { @@ -390,14 +380,62 @@ func (kind ExportKind) MemoryRequired(mkind otel.InstrumentKind) bool { case otel.ValueRecorderInstrumentKind, otel.ValueObserverInstrumentKind, otel.CounterInstrumentKind, otel.UpDownCounterInstrumentKind: // Delta-oriented instruments: - return kind.Includes(CumulativeExporter) + return kind.Includes(CumulativeExportKind) case otel.SumObserverInstrumentKind, otel.UpDownSumObserverInstrumentKind: // Cumulative-oriented instruments: - return kind.Includes(DeltaExporter) + return kind.Includes(DeltaExportKind) } // Something unexpected is happening--we could panic. This // will become an error when the exporter tries to access a // checkpoint, presumably, so let it be. return false } + +type ( + constantExportKindSelector ExportKind + statelessExportKindSelector struct{} +) + +var ( + _ ExportKindSelector = constantExportKindSelector(0) + _ ExportKindSelector = statelessExportKindSelector{} +) + +// ConstantExportKindSelector returns an ExportKindSelector that returns +// a constant ExportKind, one that is either always cumulative or always delta. +func ConstantExportKindSelector(kind ExportKind) ExportKindSelector { + return constantExportKindSelector(kind) +} + +// CumulativeExportKindSelector returns an ExportKindSelector that +// always returns CumulativeExportKind. +func CumulativeExportKindSelector() ExportKindSelector { + return ConstantExportKindSelector(CumulativeExportKind) +} + +// DeltaExportKindSelector returns an ExportKindSelector that +// always returns DeltaExportKind. +func DeltaExportKindSelector() ExportKindSelector { + return ConstantExportKindSelector(DeltaExportKind) +} + +// StatelessExportKindSelector returns an ExportKindSelector that +// always returns the ExportKind that avoids long-term memory +// requirements. +func StatelessExportKindSelector() ExportKindSelector { + return statelessExportKindSelector{} +} + +// ExportKindFor implements ExportKindSelector. +func (c constantExportKindSelector) ExportKindFor(_ *otel.Descriptor, _ aggregation.Kind) ExportKind { + return ExportKind(c) +} + +// ExportKindFor implements ExportKindSelector. +func (s statelessExportKindSelector) ExportKindFor(desc *otel.Descriptor, kind aggregation.Kind) ExportKind { + if kind == aggregation.SumKind && desc.InstrumentKind().PrecomputedSum() { + return CumulativeExportKind + } + return DeltaExportKind +} diff --git a/sdk/metric/controller/pull/pull_test.go b/sdk/metric/controller/pull/pull_test.go index a9502d673a6..f5d987e5785 100644 --- a/sdk/metric/controller/pull/pull_test.go +++ b/sdk/metric/controller/pull/pull_test.go @@ -36,7 +36,7 @@ func TestPullNoCache(t *testing.T) { puller := pull.New( basic.New( selector.NewWithExactDistribution(), - export.CumulativeExporter, + export.CumulativeExportKindSelector(), basic.WithMemory(true), ), pull.WithCachePeriod(0), @@ -50,7 +50,7 @@ func TestPullNoCache(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records := processortest.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExportKindSelector(), records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 10, @@ -60,7 +60,7 @@ func TestPullNoCache(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records = processortest.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExportKindSelector(), records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 20, @@ -71,7 +71,7 @@ func TestPullWithCache(t *testing.T) { puller := pull.New( basic.New( selector.NewWithExactDistribution(), - export.CumulativeExporter, + export.CumulativeExportKindSelector(), basic.WithMemory(true), ), pull.WithCachePeriod(time.Second), @@ -87,7 +87,7 @@ func TestPullWithCache(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records := processortest.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExportKindSelector(), records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 10, @@ -98,7 +98,7 @@ func TestPullWithCache(t *testing.T) { // Cached value! require.NoError(t, puller.Collect(ctx)) records = processortest.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExportKindSelector(), records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 10, @@ -110,7 +110,7 @@ func TestPullWithCache(t *testing.T) { // Re-computed value! require.NoError(t, puller.Collect(ctx)) records = processortest.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExportKindSelector(), records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 20, diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index ddb325a7bf3..879d794481a 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -66,7 +66,7 @@ func init() { func newExporter() *processorTest.Exporter { return processorTest.NewExporter( - export.PassThroughExporter, + export.StatelessExportKindSelector(), label.DefaultEncoder(), ) } diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go index 120830a3c60..9d1eba2ebdc 100644 --- a/sdk/metric/processor/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -120,7 +120,7 @@ var _ export.Processor = &Processor{} var _ export.Checkpointer = &Processor{} var _ export.CheckpointSet = &state{} var ErrInconsistentState = fmt.Errorf("inconsistent processor state") -var ErrInvalidExporterKind = fmt.Errorf("invalid exporter kind") +var ErrInvalidExportKind = fmt.Errorf("invalid export kind") // New returns a basic Processor that is also a Checkpointer using the provided // AggregatorSelector to select Aggregators. The ExportKindSelector @@ -338,17 +338,7 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record ekind := exporter.ExportKindFor(key.descriptor, value.current.Aggregation().Kind()) switch ekind { - case export.PassThroughExporter: - // No state is required, pass through the checkpointed value. - agg = value.current.Aggregation() - - if mkind.PrecomputedSum() { - start = b.processStart - } else { - start = b.intervalStart - } - - case export.CumulativeExporter: + case export.CumulativeExportKind: // If stateful, the sum has been computed. If stateless, the // input was already cumulative. Either way, use the checkpointed // value: @@ -359,7 +349,7 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record } start = b.processStart - case export.DeltaExporter: + case export.DeltaExportKind: // Precomputed sums are a special case. if mkind.PrecomputedSum() { agg = value.delta.Aggregation() @@ -369,7 +359,7 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record start = b.intervalStart default: - return fmt.Errorf("%v: %w", ekind, ErrInvalidExporterKind) + return fmt.Errorf("%v: %w", ekind, ErrInvalidExportKind) } if err := f(export.NewRecord( diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 71e9d3870d4..42c94a4412f 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -50,9 +50,8 @@ func TestProcessor(t *testing.T) { } for _, tc := range []exportCase{ - {kind: export.PassThroughExporter}, - {kind: export.CumulativeExporter}, - {kind: export.DeltaExporter}, + {kind: export.CumulativeExportKind}, + {kind: export.DeltaExportKind}, } { t.Run(tc.kind.String(), func(t *testing.T) { for _, ic := range []instrumentCase{ @@ -127,7 +126,7 @@ func testProcessor( labs2 := []label.KeyValue{label.String("L2", "V")} testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) { - processor := basic.New(selector, ekind, basic.WithMemory(hasMemory)) + processor := basic.New(selector, export.ConstantExportKindSelector(ekind), basic.WithMemory(hasMemory)) instSuffix := fmt.Sprint(".", strings.ToLower(akind.String())) @@ -159,7 +158,7 @@ func testProcessor( _, canSub := subr.(export.Subtractor) // Allow unsupported subraction case only when it is called for. - require.True(t, mkind.PrecomputedSum() && ekind == export.DeltaExporter && !canSub) + require.True(t, mkind.PrecomputedSum() && ekind == export.DeltaExportKind && !canSub) return } else if err != nil { t.Fatal("unexpected FinishCollection error: ", err) @@ -183,7 +182,7 @@ func testProcessor( // Test the final checkpoint state. records1 := processorTest.NewOutput(label.DefaultEncoder()) - err = checkpointSet.ForEach(ekind, records1.AddRecord) + err = checkpointSet.ForEach(export.ConstantExportKindSelector(ekind), records1.AddRecord) // Test for an allowed error: if err != nil && err != aggregation.ErrNoSubtraction { @@ -196,7 +195,7 @@ func testProcessor( // number of Accumulators, unless LastValue aggregation. // If a precomputed sum, we expect cumulative inputs. if mkind.PrecomputedSum() { - if ekind == export.DeltaExporter && akind != aggregation.LastValueKind { + if ekind == export.DeltaExportKind && akind != aggregation.LastValueKind { multiplier = int64(nAccum) } else if akind == aggregation.LastValueKind { multiplier = cumulativeMultiplier @@ -204,7 +203,7 @@ func testProcessor( multiplier = cumulativeMultiplier * int64(nAccum) } } else { - if ekind == export.CumulativeExporter && akind != aggregation.LastValueKind { + if ekind == export.CumulativeExportKind && akind != aggregation.LastValueKind { multiplier = cumulativeMultiplier * int64(nAccum) } else if akind == aggregation.LastValueKind { multiplier = 1 @@ -216,7 +215,7 @@ func testProcessor( // Synchronous accumulate results from multiple accumulators, // use that number as the baseline multiplier. multiplier = int64(nAccum) - if ekind == export.CumulativeExporter { + if ekind == export.CumulativeExportKind { // If a cumulative exporter, include prior checkpoints. multiplier *= cumulativeMultiplier } @@ -268,39 +267,39 @@ func (bogusExporter) Export(context.Context, export.CheckpointSet) error { func TestBasicInconsistent(t *testing.T) { // Test double-start - b := basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) + b := basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) b.StartCollection() b.StartCollection() require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) // Test finish without start - b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) + b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) // Test no finish - b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) + b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) b.StartCollection() require.Equal( t, basic.ErrInconsistentState, b.ForEach( - export.PassThroughExporter, + export.StatelessExportKindSelector(), func(export.Record) error { return nil }, ), ) // Test no start - b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) + b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) desc := otel.NewDescriptor("inst", otel.CounterInstrumentKind, otel.Int64NumberKind) accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{}) require.Equal(t, basic.ErrInconsistentState, b.Process(accum)) // Test invalid kind: - b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) + b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) b.StartCollection() require.NoError(t, b.Process(accum)) require.NoError(t, b.FinishCollection()) @@ -309,13 +308,13 @@ func TestBasicInconsistent(t *testing.T) { bogusExporter{}, func(export.Record) error { return nil }, ) - require.True(t, errors.Is(err, basic.ErrInvalidExporterKind)) + require.True(t, errors.Is(err, basic.ErrInvalidExportKind)) } func TestBasicTimestamps(t *testing.T) { beforeNew := time.Now() - b := basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) + b := basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) afterNew := time.Now() desc := otel.NewDescriptor("inst", otel.CounterInstrumentKind, otel.Int64NumberKind) @@ -327,7 +326,7 @@ func TestBasicTimestamps(t *testing.T) { var start1, end1 time.Time - require.NoError(t, b.ForEach(export.PassThroughExporter, func(rec export.Record) error { + require.NoError(t, b.ForEach(export.StatelessExportKindSelector(), func(rec export.Record) error { start1 = rec.StartTime() end1 = rec.EndTime() return nil @@ -344,7 +343,7 @@ func TestBasicTimestamps(t *testing.T) { var start2, end2 time.Time - require.NoError(t, b.ForEach(export.PassThroughExporter, func(rec export.Record) error { + require.NoError(t, b.ForEach(export.StatelessExportKindSelector(), func(rec export.Record) error { start2 = rec.StartTime() end2 = rec.EndTime() return nil @@ -362,12 +361,12 @@ func TestBasicTimestamps(t *testing.T) { func TestStatefulNoMemoryCumulative(t *testing.T) { res := resource.NewWithAttributes(label.String("R", "V")) - ekind := export.CumulativeExporter + ekindSel := export.CumulativeExportKindSelector() desc := otel.NewDescriptor("inst.sum", otel.CounterInstrumentKind, otel.Int64NumberKind) selector := processorTest.AggregatorSelector() - processor := basic.New(selector, ekind, basic.WithMemory(false)) + processor := basic.New(selector, ekindSel, basic.WithMemory(false)) checkpointSet := processor.CheckpointSet() for i := 1; i < 3; i++ { @@ -377,7 +376,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { // Verify zero elements records := processorTest.NewOutput(label.DefaultEncoder()) - require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) + require.NoError(t, checkpointSet.ForEach(ekindSel, records.AddRecord)) require.EqualValues(t, map[string]float64{}, records.Map()) // Add 10 @@ -387,7 +386,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { // Verify one element records = processorTest.NewOutput(label.DefaultEncoder()) - require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) + require.NoError(t, checkpointSet.ForEach(ekindSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ "inst.sum/A=B/R=V": float64(i * 10), }, records.Map()) @@ -396,12 +395,12 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { func TestStatefulNoMemoryDelta(t *testing.T) { res := resource.NewWithAttributes(label.String("R", "V")) - ekind := export.DeltaExporter + ekindSel := export.DeltaExportKindSelector() desc := otel.NewDescriptor("inst.sum", otel.SumObserverInstrumentKind, otel.Int64NumberKind) selector := processorTest.AggregatorSelector() - processor := basic.New(selector, ekind, basic.WithMemory(false)) + processor := basic.New(selector, ekindSel, basic.WithMemory(false)) checkpointSet := processor.CheckpointSet() for i := 1; i < 3; i++ { @@ -411,7 +410,7 @@ func TestStatefulNoMemoryDelta(t *testing.T) { // Verify zero elements records := processorTest.NewOutput(label.DefaultEncoder()) - require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) + require.NoError(t, checkpointSet.ForEach(ekindSel, records.AddRecord)) require.EqualValues(t, map[string]float64{}, records.Map()) // Add 10 @@ -421,7 +420,7 @@ func TestStatefulNoMemoryDelta(t *testing.T) { // Verify one element records = processorTest.NewOutput(label.DefaultEncoder()) - require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) + require.NoError(t, checkpointSet.ForEach(ekindSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ "inst.sum/A=B/R=V": 10, }, records.Map()) @@ -429,17 +428,16 @@ func TestStatefulNoMemoryDelta(t *testing.T) { } func TestMultiObserverSum(t *testing.T) { - for _, ekind := range []export.ExportKind{ - export.PassThroughExporter, - export.CumulativeExporter, - export.DeltaExporter, + for _, ekindSel := range []export.ExportKindSelector{ + export.CumulativeExportKindSelector(), + export.DeltaExportKindSelector(), } { res := resource.NewWithAttributes(label.String("R", "V")) desc := otel.NewDescriptor("observe.sum", otel.SumObserverInstrumentKind, otel.Int64NumberKind) selector := processorTest.AggregatorSelector() - processor := basic.New(selector, ekind, basic.WithMemory(false)) + processor := basic.New(selector, ekindSel, basic.WithMemory(false)) checkpointSet := processor.CheckpointSet() for i := 1; i < 3; i++ { @@ -452,13 +450,13 @@ func TestMultiObserverSum(t *testing.T) { // Multiplier is 1 for deltas, otherwise i. multiplier := i - if ekind == export.DeltaExporter { + if ekindSel.ExportKindFor(&desc, aggregation.SumKind) == export.DeltaExportKind { multiplier = 1 } // Verify one element records := processorTest.NewOutput(label.DefaultEncoder()) - require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) + require.NoError(t, checkpointSet.ForEach(ekindSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ "observe.sum/A=B/R=V": float64(3 * 10 * multiplier), }, records.Map()) diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go index 5ee33f75db6..642bdee0248 100644 --- a/sdk/metric/processor/processortest/test.go +++ b/sdk/metric/processor/processortest/test.go @@ -259,7 +259,7 @@ func (o *Output) AddRecord(rec export.Record) error { // is chosen, whichever is implemented by the underlying Aggregator. func (o *Output) Map() map[string]float64 { r := make(map[string]float64) - err := o.ForEach(export.PassThroughExporter, func(record export.Record) error { + err := o.ForEach(export.StatelessExportKindSelector(), func(record export.Record) error { for key, value := range o.m { encoded := value.labels.Encoded(o.labelEncoder) rencoded := value.resource.Encoded(o.labelEncoder) diff --git a/sdk/metric/processor/processortest/test_test.go b/sdk/metric/processor/processortest/test_test.go index d7ab2eee872..32b4a09e406 100644 --- a/sdk/metric/processor/processortest/test_test.go +++ b/sdk/metric/processor/processortest/test_test.go @@ -74,7 +74,7 @@ func TestProcessorTesting(t *testing.T) { // Export the data and validate it again. exporter := processorTest.NewExporter( - export.PassThroughExporter, + export.StatelessExportKindSelector(), label.DefaultEncoder(), ) diff --git a/sdk/metric/processor/reducer/reducer_test.go b/sdk/metric/processor/reducer/reducer_test.go index 6c0a87301d2..4f60097d901 100644 --- a/sdk/metric/processor/reducer/reducer_test.go +++ b/sdk/metric/processor/reducer/reducer_test.go @@ -89,7 +89,7 @@ func TestFilterProcessor(t *testing.T) { // Test a filter with the ../basic Processor. func TestFilterBasicProcessor(t *testing.T) { - basicProc := basic.New(processorTest.AggregatorSelector(), export.CumulativeExporter) + basicProc := basic.New(processorTest.AggregatorSelector(), export.CumulativeExportKindSelector()) accum := metricsdk.NewAccumulator( reducer.New(testFilter{}, basicProc), resource.NewWithAttributes(label.String("R", "V")),