diff --git a/lightstep/sdk/metric/internal/viewstate/accumulators.go b/lightstep/sdk/metric/internal/viewstate/accumulators.go new file mode 100644 index 00000000..f6705612 --- /dev/null +++ b/lightstep/sdk/metric/internal/viewstate/accumulators.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + +import ( + "sync" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" +) + +// multiAccumulator +type multiAccumulator[N number.Any] []Accumulator + +func (acc multiAccumulator[N]) SnapshotAndProcess() { + for _, coll := range acc { + coll.SnapshotAndProcess() + } +} + +func (acc multiAccumulator[N]) Update(value N) { + for _, coll := range acc { + coll.(Updater[N]).Update(value) + } +} + +// syncAccumulator +type syncAccumulator[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + current Storage + snapshot Storage + findStorage func() *Storage +} + +func (acc *syncAccumulator[N, Storage, Methods]) Update(number N) { + var methods Methods + methods.Update(&acc.current, number) +} + +func (acc *syncAccumulator[N, Storage, Methods]) SnapshotAndProcess() { + var methods Methods + methods.SynchronizedMove(&acc.current, &acc.snapshot) + methods.Merge(acc.findStorage(), &acc.snapshot) +} + +// asyncAccumulator +type asyncAccumulator[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + lock sync.Mutex + current N + findStorage func() *Storage +} + +func (acc *asyncAccumulator[N, Storage, Methods]) Update(number N) { + acc.lock.Lock() + defer acc.lock.Unlock() + acc.current = number +} + +func (acc *asyncAccumulator[N, Storage, Methods]) SnapshotAndProcess() { + acc.lock.Lock() + defer acc.lock.Unlock() + + var methods Methods + methods.Update(acc.findStorage(), acc.current) +} diff --git a/lightstep/sdk/metric/internal/viewstate/base_instrument.go b/lightstep/sdk/metric/internal/viewstate/base_instrument.go new file mode 100644 index 00000000..7ed0babc --- /dev/null +++ b/lightstep/sdk/metric/internal/viewstate/base_instrument.go @@ -0,0 +1,144 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + +import ( + "sync" + "time" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "go.opentelemetry.io/otel/attribute" +) + +// instrumentBase is the common type embedded in any of the compiled instrument views. +type instrumentBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + lock sync.Mutex + fromName string + desc sdkinstrument.Descriptor + acfg aggregator.Config + data map[attribute.Set]*Storage + + keysSet *attribute.Set + keysFilter *attribute.Filter +} + +func (metric *instrumentBase[N, Storage, Methods]) Aggregation() aggregation.Kind { + var methods Methods + return methods.Kind() +} + +func (metric *instrumentBase[N, Storage, Methods]) OriginalName() string { + return metric.fromName +} + +func (metric *instrumentBase[N, Storage, Methods]) Descriptor() sdkinstrument.Descriptor { + return metric.desc +} + +func (metric *instrumentBase[N, Storage, Methods]) Keys() *attribute.Set { + return metric.keysSet +} + +func (metric *instrumentBase[N, Storage, Methods]) Config() aggregator.Config { + return metric.acfg +} + +func (metric *instrumentBase[N, Storage, Methods]) initStorage(s *Storage) { + var methods Methods + methods.Init(s, metric.acfg) +} + +func (metric *instrumentBase[N, Storage, Methods]) mergeDescription(d string) { + metric.lock.Lock() + defer metric.lock.Unlock() + if len(d) > len(metric.desc.Description) { + metric.desc.Description = d + } +} + +// storageFinder searches for and possibly allocates an output Storage +// for this metric. Filtered keys, if a filter is provided, will be +// computed once. +func (metric *instrumentBase[N, Storage, Methods]) storageFinder( + kvs attribute.Set, +) func() *Storage { + if metric.keysFilter != nil { + kvs, _ = attribute.NewSetWithFiltered(kvs.ToSlice(), *metric.keysFilter) + } + + return func() *Storage { + metric.lock.Lock() + defer metric.lock.Unlock() + + storage, has := metric.data[kvs] + if has { + return storage + } + + ns := metric.newStorage() + metric.data[kvs] = ns + return ns + } +} + +// newStorage allocates and initializes a new Storage. +func (metric *instrumentBase[N, Storage, Methods]) newStorage() *Storage { + ns := new(Storage) + metric.initStorage(ns) + return ns +} + +// appendInstrument adds a new instrument to the output. Note that +// this is expected to be called unconditionally (even when there are +// no points); it means that the same list of instruments will always +// be produced (in the same order); consumers of delta temporality +// should expect to see empty instruments in the output for metric +// data that is unchanged. +func (metric *instrumentBase[N, Storage, Methods]) appendInstrument(output *[]data.Instrument) *data.Instrument { + inst := data.ReallocateFrom(output) + inst.Descriptor = metric.desc + return inst +} + +// appendPoint is used in cases where the output Aggregation is the +// stored object; use appendOrReusePoint in the case where the output +// Aggregation is a copy of the stored object (in case the stored +// object will be reset on collection, as opposed to a second pass to +// reset delta temporality outputs before the next accumulation. +func (metric *instrumentBase[N, Storage, Methods]) appendPoint(inst *data.Instrument, set attribute.Set, agg aggregation.Aggregation, tempo aggregation.Temporality, start, end time.Time) { + point := data.ReallocateFrom(&inst.Points) + + point.Attributes = set + point.Aggregation = agg + point.Temporality = tempo + point.Start = start + point.End = end +} + +// appendOrReusePoint is an alternate to appendPoint; this form is used when +// the storage will be reset on collection. +func (metric *instrumentBase[N, Storage, Methods]) appendOrReusePoint(inst *data.Instrument) (*data.Point, *Storage) { + point := data.ReallocateFrom(&inst.Points) + + var methods Methods + if s, ok := methods.ToStorage(point.Aggregation); ok { + return point, s + } + return point, nil +} diff --git a/lightstep/sdk/metric/internal/viewstate/collectors.go b/lightstep/sdk/metric/internal/viewstate/collectors.go new file mode 100644 index 00000000..3c026abe --- /dev/null +++ b/lightstep/sdk/metric/internal/viewstate/collectors.go @@ -0,0 +1,182 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + +import ( + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "go.opentelemetry.io/otel/attribute" +) + +// compiledSyncBase is any synchronous instrument view. +type compiledSyncBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + instrumentBase[N, Storage, Methods] +} + +// NewAccumulator returns a Accumulator for a synchronous instrument view. +func (csv *compiledSyncBase[N, Storage, Methods]) NewAccumulator(kvs attribute.Set) Accumulator { + sc := &syncAccumulator[N, Storage, Methods]{} + csv.initStorage(&sc.current) + csv.initStorage(&sc.snapshot) + + sc.findStorage = csv.storageFinder(kvs) + + return sc +} + +// compiledSyncBase is any asynchronous instrument view. +type compiledAsyncBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + instrumentBase[N, Storage, Methods] +} + +// NewAccumulator returns a Accumulator for an asynchronous instrument view. +func (cav *compiledAsyncBase[N, Storage, Methods]) NewAccumulator(kvs attribute.Set) Accumulator { + ac := &asyncAccumulator[N, Storage, Methods]{} + + ac.findStorage = cav.storageFinder(kvs) + + return ac +} + +// statefulSyncInstrument is a synchronous instrument that maintains cumulative state. +type statefulSyncInstrument[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + compiledSyncBase[N, Storage, Methods] +} + +// Collect for synchronous cumulative temporality. +func (p *statefulSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence, output *[]data.Instrument) { + var methods Methods + + p.lock.Lock() + defer p.lock.Unlock() + + ioutput := p.appendInstrument(output) + + for set, storage := range p.data { + p.appendPoint(ioutput, set, methods.ToAggregation(storage), aggregation.CumulativeTemporality, seq.Start, seq.Now) + } +} + +// statelessSyncInstrument is a synchronous instrument that maintains no state. +type statelessSyncInstrument[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + compiledSyncBase[N, Storage, Methods] +} + +// Collect for synchronous delta temporality. +func (p *statelessSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence, output *[]data.Instrument) { + var methods Methods + + p.lock.Lock() + defer p.lock.Unlock() + + ioutput := p.appendInstrument(output) + + for set, storage := range p.data { + if !methods.HasChange(storage) { + delete(p.data, set) + continue + } + + // Possibly re-use the underlying storage. For + // synchronous instruments, where accumulation happens + // between collection events (e.g., due to other + // readers collecting), we must reset the storage now + // or completely clear the map. + point, exists := p.appendOrReusePoint(ioutput) + if exists == nil { + exists = p.newStorage() + } else { + methods.Reset(exists) + } + + // Note: This can be improved with a Copy() or Swap() + // operation in the Methods, since Merge() may be + // relatively expensive by comparison. + methods.Merge(exists, storage) + + point.Attributes = set + point.Aggregation = methods.ToAggregation(exists) + point.Temporality = aggregation.DeltaTemporality + point.Start = seq.Last + point.End = seq.Now + + methods.Reset(storage) + } +} + +// statelessAsyncInstrument is an asynchronous instrument that keeps +// maintains no state. +type statelessAsyncInstrument[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + compiledAsyncBase[N, Storage, Methods] +} + +// Collect for asynchronous cumulative temporality. +func (p *statelessAsyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence, output *[]data.Instrument) { + var methods Methods + + p.lock.Lock() + defer p.lock.Unlock() + + ioutput := p.appendInstrument(output) + + for set, storage := range p.data { + // Copy the underlying storage. + p.appendPoint(ioutput, set, methods.ToAggregation(storage), aggregation.CumulativeTemporality, seq.Start, seq.Now) + } + + // Reset the entire map. + p.data = map[attribute.Set]*Storage{} +} + +// statefulAsyncInstrument is an instrument that keeps asynchronous instrument state +// in order to perform cumulative to delta translation. +type statefulAsyncInstrument[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + compiledAsyncBase[N, Storage, Methods] + prior map[attribute.Set]*Storage +} + +// Collect for asynchronous delta temporality. +func (p *statefulAsyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence, output *[]data.Instrument) { + var methods Methods + + p.lock.Lock() + defer p.lock.Unlock() + + ioutput := p.appendInstrument(output) + + for set, storage := range p.data { + pval, has := p.prior[set] + if has { + // This does `*pval := *storage - *pval` + methods.SubtractSwap(storage, pval) + + // Skip the series if it has not changed. + if !methods.HasChange(pval) { + continue + } + // Output the difference except for Gauge, in + // which case output the new value. + if p.desc.Kind.HasTemporality() { + storage = pval + } + } + p.appendPoint(ioutput, set, methods.ToAggregation(storage), aggregation.DeltaTemporality, seq.Last, seq.Now) + } + // Copy the current to the prior and reset. + p.prior = p.data + p.data = map[attribute.Set]*Storage{} +} diff --git a/lightstep/sdk/metric/internal/viewstate/errors.go b/lightstep/sdk/metric/internal/viewstate/errors.go new file mode 100644 index 00000000..8a8b367e --- /dev/null +++ b/lightstep/sdk/metric/internal/viewstate/errors.go @@ -0,0 +1,235 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + +import ( + "fmt" + "strings" + + "go.opentelemetry.io/otel/attribute" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" +) + +// ViewConflicts represents a per-reader summary of conflicts caused +// by creating an instrument after applying the view configuration. +// ViewConflicts may contain either or both of a list of duplicates +// and a semantic error. Typically these conflicts will be the same +// for all readers, however since readers influence the defaults for +// aggregation kind and aggregator configuration, it is possible for +// different conflicts to arise. +// +// Full information about every conflict or error is always returned +// to the caller that registered the instrument along with a valid +// (potentially in-conflict) instrument. +type ViewConflictsBuilder map[string][]Conflict +type ViewConflictsError map[string][]Conflict + +var _ error = ViewConflictsError{} + +const noConflictsString = "no conflicts" + +// Error shows one example Conflict with a summary of how many +// conflicts and readers experienced conflicts, in case of multiple +// readers and/or conflicts. +func (vc ViewConflictsError) Error() string { + total := 0 + for _, l := range vc { + total += len(l) + } + // These are almost always duplicative, so we print only examples for one Config. + for rd, conflictsByReader := range vc { + if len(conflictsByReader) == 0 { + break + } + if len(vc) == 1 { + if len(conflictsByReader) == 1 { + return fmt.Sprintf("%v: %v", rd, conflictsByReader[0].Error()) + } + return fmt.Sprintf("%d conflicts, e.g. %v: %v", total, rd, conflictsByReader[0]) + } + return fmt.Sprintf("%d conflicts in %d readers, e.g. %v: %v", total, len(vc), rd, conflictsByReader[0]) + } + return noConflictsString +} + +func (ViewConflictsError) Is(err error) bool { + _, ok := err.(ViewConflictsError) + return ok +} + +func (vc *ViewConflictsBuilder) Add(name string, c Conflict) { + if *vc == nil { + *vc = ViewConflictsBuilder{} + } + + (*vc)[name] = append((*vc)[name], c) +} + +func (vc *ViewConflictsBuilder) Combine(other ViewConflictsBuilder) { + if *vc == nil { + if len(other) == 0 { + return + } + *vc = ViewConflictsBuilder{} + } + for k, v := range other { + (*vc)[k] = v + } +} + +func (vc *ViewConflictsBuilder) AsError() error { + if vc == nil || *vc == nil { + return nil + } + return ViewConflictsError(*vc) +} + +// Conflict describes both the duplicates instruments found and +// semantic errors caused when registering a new instrument. +type Conflict struct { + // Duplicates + Duplicates []Duplicate + // Semantic will be a SemanticError if there was an instrument + // vs. aggregation conflict or nil otherwise. + Semantic error +} + +var _ error = Conflict{} + +// Duplicate is one of the other matching instruments when duplicate +// instrument registration conflicts arise. +type Duplicate interface { + // Aggregation is the requested aggregation kind. (If the + // original aggregation caused a semantic error, this will + // have been corrected to the default aggregation.) + Aggregation() aggregation.Kind + // Descriptor describes the output of the View (Name, + // Description, Unit, Number Kind, InstrumentKind). + Descriptor() sdkinstrument.Descriptor + // Keys is non-nil with the distinct set of keys. This uses + // an attribute.Set where the Key is significant and the Value + // is a meaningless Int(0), for simplicity. + Keys() *attribute.Set + // Config is the aggregator configuration. + Config() aggregator.Config + // OriginalName is the original name of the Duplicate + // instrument before renaming. + OriginalName() string +} + +// Error shows the semantic error if non-nil and a summary of the +// duplicates if any were present. +func (c Conflict) Error() string { + se := c.semanticError() + de := c.duplicateError() + if se == "" { + return de + } + if de == "" { + return se + } + return fmt.Sprintf("%s; %s", se, de) +} + +func (c Conflict) semanticError() string { + if c.Semantic == nil { + return "" + } + return c.Semantic.Error() +} + +func (c Conflict) duplicateError() string { + if len(c.Duplicates) < 2 { + return "" + } + // Note: choose the first and last element of the current conflicts + // list because they are ordered, and if the conflict in question is + // new it will be the last item. + inst1 := c.Duplicates[0] + inst2 := c.Duplicates[len(c.Duplicates)-1] + name1 := fullNameString(inst1) + name2 := renameString(inst2) + conf1 := shortString(inst1) + conf2 := shortString(inst2) + + var s strings.Builder + s.WriteString(name1) + + if conf1 != conf2 { + s.WriteString(fmt.Sprintf(" conflicts %v, %v%v", conf1, conf2, name2)) + } else if !equalConfigs(inst1.Config(), inst2.Config()) { + s.WriteString(" has conflicts: different aggregator configuration") + } else { + s.WriteString(" has conflicts: different attribute filters") + } + + if len(c.Duplicates) > 2 { + s.WriteString(fmt.Sprintf(" and %d more", len(c.Duplicates)-2)) + } + return s.String() +} + +// SemanticError occurs when an instrument is paired with an +// incompatible aggregation. +type SemanticError struct { + Instrument sdkinstrument.Kind + Aggregation aggregation.Kind +} + +var _ error = SemanticError{} + +func (s SemanticError) Error() string { + return fmt.Sprintf("%v instrument incompatible with %v aggregation", + strings.TrimSuffix(s.Instrument.String(), "Kind"), + strings.TrimSuffix(s.Aggregation.String(), "Kind"), + ) +} + +func (SemanticError) Is(err error) bool { + _, ok := err.(SemanticError) + return ok +} + +// fullNameString helps rendering concise error descriptions by +// showing the original name only when it is different. +func fullNameString(d Duplicate) string { + return fmt.Sprintf("name %q%v", d.Descriptor().Name, renameString(d)) +} + +// renameString is the fragment used by fullNameString when the +// original name is different than the output name. +func renameString(d Duplicate) string { + if d.OriginalName() == d.Descriptor().Name { + return "" + } + return fmt.Sprintf(" (original %q)", d.OriginalName()) +} + +// shortString concatenates the instrument kind, number kind, +// aggregation kind, and unit to summarize most of the potentially +// conflicting characteristics of an instrument. +func shortString(d Duplicate) string { + s := fmt.Sprintf("%v-%v-%v", + strings.TrimSuffix(d.Descriptor().Kind.String(), "Kind"), + strings.TrimSuffix(d.Descriptor().NumberKind.String(), "Kind"), + strings.TrimSuffix(d.Aggregation().String(), "Kind"), + ) + if d.Descriptor().Unit != "" { + s = fmt.Sprintf("%v-%v", s, d.Descriptor().Unit) + } + return s +} diff --git a/lightstep/sdk/metric/internal/viewstate/errors_test.go b/lightstep/sdk/metric/internal/viewstate/errors_test.go new file mode 100644 index 00000000..a51a0a82 --- /dev/null +++ b/lightstep/sdk/metric/internal/viewstate/errors_test.go @@ -0,0 +1,146 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + +import ( + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/test" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" +) + +var oneConflict = Conflict{ + Semantic: SemanticError{ + Instrument: sdkinstrument.CounterKind, + Aggregation: aggregation.GaugeKind, + }, +} + +func TestViewConflictsError(t *testing.T) { + var err error + err = ViewConflictsError{} + require.Equal(t, noConflictsString, err.Error()) + require.True(t, errors.Is(err, ViewConflictsError{})) + + require.True(t, errors.Is(oneConflict.Semantic, SemanticError{})) +} + +// TestViewConflictsError exercises the code paths that construct example +// error messages from duplicate instrument conditions. +func TestViewConflictsBuilder(t *testing.T) { + // Note: These all use "no conflicts" strings, which happens + // under artificial conditions such as conflicts w/ < 2 examples + // and allows testing the code that avoids lengthy messages + // when there is only one conflict or only one reader. + rd0 := "test0" + rd1 := "test1" + + // This is a synthetic case, for the sake of coverage. + builder := ViewConflictsBuilder{ + rd0: []Conflict{}, + } + err := builder.AsError() + require.Equal(t, noConflictsString, err.Error()) + + // Note: This test ignores duplicates, one semantic error is + // enough to test the ViewConflicts logic. + oneError := oneConflict.Semantic.Error() + + builder = ViewConflictsBuilder{} + builder.Add(rd0, oneConflict) + err = builder.AsError() + require.True(t, strings.HasSuffix(err.Error(), oneError), err) + + builder = ViewConflictsBuilder{} + builder.Add(rd0, oneConflict) + builder.Add(rd0, oneConflict) + err = builder.AsError() + require.True(t, strings.HasSuffix(err.Error(), oneError), err) + require.True(t, strings.HasPrefix(err.Error(), "2 conflicts, e.g. "), err) + + builder = ViewConflictsBuilder{} + builder.Add(rd0, oneConflict) + builder.Add(rd1, oneConflict) + err = builder.AsError() + require.True(t, strings.HasSuffix(err.Error(), oneError), err) + require.True(t, strings.HasPrefix(err.Error(), "2 conflicts in 2 readers, e.g. "), err) +} + +func TestConflictCombine(t *testing.T) { + rd0 := "test0" + rd1 := "test1" + + builder1 := ViewConflictsBuilder{} + builder1.Add(rd0, oneConflict) + + builder2 := ViewConflictsBuilder{} + builder2.Add(rd1, oneConflict) + + builder1.Combine(builder2) + err1 := builder1.AsError() + require.True(t, strings.HasSuffix(err1.Error(), oneConflict.Semantic.Error()), err1) + require.True(t, strings.HasPrefix(err1.Error(), "2 conflicts in 2 readers, e.g. "), err1) + + var builder3 ViewConflictsBuilder + builder3.Combine(ViewConflictsBuilder{}) // empty builder has no effect + builder3.Combine(builder1) + err3 := builder3.AsError() + + require.Equal(t, err1, err3) +} + +// TestConflictError tests that both semantic errors and duplicate +// conflicts are printed. Note this uses the real library to generate +// the conflict, to avoid creating a relatively large test-only type. +func TestConflictError(t *testing.T) { + views := view.New( + "problem", + view.WithDefaultAggregationKindSelector(func(k sdkinstrument.Kind) aggregation.Kind { + return aggregation.GaugeKind + }), + view.WithClause( + // "bar" is renamed "foo" w/ histogram + view.MatchInstrumentName("bar"), + view.WithName("foo"), + view.WithAggregation(aggregation.HistogramKind), + ), + ) + + vc := New(testLib, views) + + // Sync counter named bar becomes histogram + inst2, conf2 := vc.Compile(test.Descriptor("bar", sdkinstrument.CounterKind, number.Int64Kind)) + require.NoError(t, conf2.AsError()) + require.NotNil(t, inst2) + + // Async counter named foo becomes gauge + inst1, conf1 := vc.Compile(test.Descriptor("foo", sdkinstrument.CounterObserverKind, number.Int64Kind)) + require.Error(t, conf1.AsError()) + require.NotNil(t, inst1) + require.Equal(t, + "problem: CounterObserver instrument incompatible with Gauge aggregation; "+ + "name \"foo\" (original \"bar\") conflicts Counter-Int64-Histogram, CounterObserver-Int64-MonotonicSum", + conf1.AsError().Error(), + ) + + require.NotEqual(t, inst1, inst2) +} diff --git a/lightstep/sdk/metric/internal/viewstate/viewstate.go b/lightstep/sdk/metric/internal/viewstate/viewstate.go new file mode 100644 index 00000000..bea0843e --- /dev/null +++ b/lightstep/sdk/metric/internal/viewstate/viewstate.go @@ -0,0 +1,591 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + +import ( + "sync" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/gauge" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/histogram" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/sum" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/instrumentation" +) + +// Compiler implements Views for a single Meter. A single Compiler +// controls the namespace of metric instruments output and reports +// conflicting definitions for the same name. +// +// Information flows through the Compiler as follows: +// +// When new instruments are created: +// - The Compiler.Compile() method returns an Instrument value and possible +// duplicate or semantic conflict error. +// +// When instruments are used: +// - The Instrument.NewAccumulator() method returns an Accumulator value for each attribute.Set used +// - The Accumulator.Update() aggregates one value for each measurement. +// +// During collection: +// - The Accumulator.SnapshotAndProcess() method captures the current value +// and conveys it to the output storage +// - The Compiler.Collectors() interface returns one Collector per output +// Metric in the Meter (duplicate definitions included). +// - The Collector.Collect() method outputs one Point for each attribute.Set +// in the result. +type Compiler struct { + // views is the configuration of this compiler. + views *view.Views + + // library is the value used fr matching + // instrumentation library information. + library instrumentation.Library + + // lock protects collectors and names. + lock sync.Mutex + + // collectors is the de-duplicated list of metric outputs, which may + // contain conflicting identities. + collectors []data.Collector + + // names is the map of output names for metrics + // produced by this compiler. + names map[string][]leafInstrument +} + +// Instrument is a compiled implementation of an instrument +// corresponding with one or more instrument-view behaviors. +type Instrument interface { + // NewAccumulator returns an Accumulator and an Updater[N] + // matching the number type of the API-level instrument. + // + // Callers are expected to type-assert Updater[int64] or + // Updater[float64] before calling Update(). + // + // The caller's primary responsibility is to maintain + // the collection of Accumulators that had Update() + // called since the last collection and to ensure that each + // of them has SnapshotAndProcess() called. + NewAccumulator(kvs attribute.Set) Accumulator +} + +// Updater captures single measurements, for N an int64 or float64. +type Updater[N number.Any] interface { + // Update captures a single measurement. For synchronous + // instruments, this passes directly through to the + // aggregator. For asynchronous instruments, the last value + // is captured by the accumulator snapshot. + Update(value N) +} + +// Accumulator is an intermediate interface used for short-term +// aggregation. Every Accumulator is also an Updater. The owner of +// an Accumulator is responsible for maintaining the current set +// of Accumulators, defined as those which have been Updated and not +// yet had SnapshotAndProcess() called. +type Accumulator interface { + // SnapshotAndProcess() takes a snapshot of data aggregated + // through Update() and simultaneously resets the current + // aggregator. The attribute.Set is possibly filtered, after + // which the snapshot is merged into the output. + // + // There is no return value from this method; the caller can + // safely forget an Accumulator after this method is called, + // provided Update is not used again. + SnapshotAndProcess() +} + +// leafInstrument is one of the (synchronous or asynchronous), +// (cumulative or delta) instrument implementations. This is used in +// duplicate conflict detection and resolution. +type leafInstrument interface { + // Instrument is the form returned by Compile(). + Instrument + // Collector is the form returned in Collectors(). + data.Collector + // Duplicate is how other instruments this in a conflict. + Duplicate + + // mergeDescription handles the special case allowing + // descriptions to be merged instead of conflict. + mergeDescription(string) +} + +// singleBehavior is one instrument-view behavior, including the +// original instrument details, the aggregation kind and temporality, +// aggregator configuration, and optional keys to filter. +type singleBehavior struct { + // fromName is the original instrument name + fromName string + + // desc is the output of the view, including naming, + // description and unit. This includes the original + // instrument's instrument kind and number kind. + desc sdkinstrument.Descriptor + + // kind is the aggregation indicated by this view behavior. + kind aggregation.Kind + + // tempo is the configured aggregation temporality. + tempo aggregation.Temporality + + // acfg is the aggregator configuration. + acfg aggregator.Config + + // keysSet (if non-nil) is an attribute set containing each + // key being filtered with a zero value. This is used to + // compare against potential duplicates for having the + // same/different filter. + keysSet *attribute.Set // With Int(0) + + // keysFilter (if non-nil) is the constructed keys filter. + keysFilter *attribute.Filter +} + +// New returns a compiler for library given configured views. +func New(library instrumentation.Library, views *view.Views) *Compiler { + return &Compiler{ + library: library, + views: views, + names: map[string][]leafInstrument{}, + } +} + +func (v *Compiler) Collectors() []data.Collector { + v.lock.Lock() + defer v.lock.Unlock() + return v.collectors +} + +// Compile is called during NewInstrument by the Meter +// implementation, the result saved in the instrument and used to +// construct new Accumulators throughout its lifetime. +func (v *Compiler) Compile(instrument sdkinstrument.Descriptor) (Instrument, ViewConflictsBuilder) { + var behaviors []singleBehavior + var matches []view.ClauseConfig + + for _, view := range v.views.Clauses { + if !view.Matches(v.library, instrument) { + continue + } + matches = append(matches, view) + } + + for _, view := range matches { + akind := view.Aggregation() + if akind == aggregation.DropKind { + continue + } + if akind == aggregation.UndefinedKind { + akind = v.views.Defaults.Aggregation(instrument.Kind) + } + + cf := singleBehavior{ + fromName: instrument.Name, + desc: viewDescriptor(instrument, view), + kind: akind, + acfg: viewAggConfig(&v.views.Defaults, akind, instrument.Kind, instrument.NumberKind, view.AggregatorConfig()), + tempo: v.views.Defaults.Temporality(instrument.Kind), + } + + keys := view.Keys() + if keys != nil { + cf.keysSet = keysToSet(view.Keys()) + cf.keysFilter = keysToFilter(view.Keys()) + } + behaviors = append(behaviors, cf) + } + + // If there were no matching views, set the default aggregation. + if len(matches) == 0 { + akind := v.views.Defaults.Aggregation(instrument.Kind) + if akind != aggregation.DropKind { + behaviors = append(behaviors, singleBehavior{ + fromName: instrument.Name, + desc: instrument, + kind: akind, + acfg: viewAggConfig(&v.views.Defaults, akind, instrument.Kind, instrument.NumberKind, aggregator.Config{}), + tempo: v.views.Defaults.Temporality(instrument.Kind), + }) + } + } + + v.lock.Lock() + defer v.lock.Unlock() + + var conflicts ViewConflictsBuilder + var compiled []Instrument + + for _, behavior := range behaviors { + // the following checks semantic compatibility + // and if necessary fixes the aggregation kind + // to the default, via in place update. + semanticErr := checkSemanticCompatibility(instrument.Kind, &behavior.kind) + + existingInsts := v.names[behavior.desc.Name] + var leaf leafInstrument + + // Scan the existing instruments for a match. + for _, inst := range existingInsts { + // Test for equivalence among the fields that we + // cannot merge or will not convert, means the + // testing everything except the description for + // equality. + + if inst.Aggregation() != behavior.kind { + continue + } + if inst.Descriptor().Kind.Synchronous() != behavior.desc.Kind.Synchronous() { + continue + } + + if inst.Descriptor().Unit != behavior.desc.Unit { + continue + } + if inst.Descriptor().NumberKind != behavior.desc.NumberKind { + continue + } + if !equalConfigs(inst.Config(), behavior.acfg) { + continue + } + + // For attribute keys, test for equal nil-ness or equal value. + instKeys := inst.Keys() + confKeys := behavior.keysSet + if (instKeys == nil) != (confKeys == nil) { + continue + } + if instKeys != nil && *instKeys != *confKeys { + continue + } + // We can return the previously-compiled instrument, + // we may have different descriptions and that is + // specified to choose the longer one. + inst.mergeDescription(behavior.desc.Description) + leaf = inst + break + } + if leaf == nil { + switch behavior.desc.NumberKind { + case number.Int64Kind: + leaf = buildView[int64, number.Int64Traits](behavior) + case number.Float64Kind: + leaf = buildView[float64, number.Float64Traits](behavior) + } + + v.collectors = append(v.collectors, leaf) + existingInsts = append(existingInsts, leaf) + v.names[behavior.desc.Name] = existingInsts + + } + if len(existingInsts) > 1 || semanticErr != nil { + c := Conflict{ + Semantic: semanticErr, + Duplicates: make([]Duplicate, len(existingInsts)), + } + for i := range existingInsts { + c.Duplicates[i] = existingInsts[i] + } + conflicts.Add(v.views.Name, c) + } + compiled = append(compiled, leaf) + } + return Combine(instrument, compiled...), conflicts +} + +// buildView compiles either a synchronous or asynchronous instrument +// given its behavior and generic number type/traits. +func buildView[N number.Any, Traits number.Traits[N]](behavior singleBehavior) leafInstrument { + if behavior.desc.Kind.Synchronous() { + return compileSync[N, Traits](behavior) + } + return compileAsync[N, Traits](behavior) +} + +// newSyncView returns a compiled synchronous instrument. If the view +// calls for delta temporality, a stateless instrument is returned, +// otherwise for cumulative temporality a stateful instrument will be +// used. I.e., Delta->Stateless, Cumulative->Stateful. +func newSyncView[ + N number.Any, + Storage any, + Methods aggregator.Methods[N, Storage], +](behavior singleBehavior) leafInstrument { + // Note: nolint:govet below is to avoid copylocks. The lock + // is being copied before the new object is returned to the + // user, and the extra allocation cost here would be + // noticeable. + metric := instrumentBase[N, Storage, Methods]{ + fromName: behavior.fromName, + desc: behavior.desc, + acfg: behavior.acfg, + data: map[attribute.Set]*Storage{}, + keysSet: behavior.keysSet, + keysFilter: behavior.keysFilter, + } + instrument := compiledSyncBase[N, Storage, Methods]{ + instrumentBase: metric, //nolint:govet + } + if behavior.tempo == aggregation.DeltaTemporality { + return &statelessSyncInstrument[N, Storage, Methods]{ + compiledSyncBase: instrument, //nolint:govet + } + } + + return &statefulSyncInstrument[N, Storage, Methods]{ + compiledSyncBase: instrument, //nolint:govet + } +} + +// compileSync calls newSyncView to compile a synchronous +// instrument with specific aggregator storage and methods. +func compileSync[N number.Any, Traits number.Traits[N]](behavior singleBehavior) leafInstrument { + switch behavior.kind { + case aggregation.HistogramKind: + return newSyncView[ + N, + histogram.State[N, Traits], + histogram.Methods[N, Traits, histogram.State[N, Traits]], + ](behavior) + case aggregation.NonMonotonicSumKind: + return newSyncView[ + N, + sum.State[N, Traits, sum.NonMonotonic], + sum.Methods[N, Traits, sum.NonMonotonic, sum.State[N, Traits, sum.NonMonotonic]], + ](behavior) + default: // e.g., aggregation.MonotonicSumKind + return newSyncView[ + N, + sum.State[N, Traits, sum.Monotonic], + sum.Methods[N, Traits, sum.Monotonic, sum.State[N, Traits, sum.Monotonic]], + ](behavior) + } +} + +// newAsyncView returns a compiled asynchronous instrument. If the +// view calls for delta temporality, a stateful instrument is +// returned, otherwise for cumulative temporality a stateless +// instrument will be used. I.e., Cumulative->Stateless, +// Delta->Stateful. +func newAsyncView[ + N number.Any, + Storage any, + Methods aggregator.Methods[N, Storage], +](behavior singleBehavior) leafInstrument { + // Note: nolint:govet below is to avoid copylocks. The lock + // is being copied before the new object is returned to the + // user, and the extra allocation cost here would be + // noticeable. + metric := instrumentBase[N, Storage, Methods]{ + fromName: behavior.fromName, + desc: behavior.desc, + acfg: behavior.acfg, + data: map[attribute.Set]*Storage{}, + keysSet: behavior.keysSet, + keysFilter: behavior.keysFilter, + } + instrument := compiledAsyncBase[N, Storage, Methods]{ + instrumentBase: metric, //nolint:govet + } + if behavior.tempo == aggregation.DeltaTemporality { + return &statefulAsyncInstrument[N, Storage, Methods]{ + compiledAsyncBase: instrument, //nolint:govet + } + } + + return &statelessAsyncInstrument[N, Storage, Methods]{ + compiledAsyncBase: instrument, //nolint:govet + } +} + +// compileAsync calls newAsyncView to compile an asynchronous +// instrument with specific aggregator storage and methods. +func compileAsync[N number.Any, Traits number.Traits[N]](behavior singleBehavior) leafInstrument { + switch behavior.kind { + case aggregation.MonotonicSumKind: + return newAsyncView[ + N, + sum.State[N, Traits, sum.Monotonic], + sum.Methods[N, Traits, sum.Monotonic, sum.State[N, Traits, sum.Monotonic]], + ](behavior) + case aggregation.NonMonotonicSumKind: + return newAsyncView[ + N, + sum.State[N, Traits, sum.NonMonotonic], + sum.Methods[N, Traits, sum.NonMonotonic, sum.State[N, Traits, sum.NonMonotonic]], + ](behavior) + default: // e.g., aggregation.GaugeKind + return newAsyncView[ + N, + gauge.State[N, Traits], + gauge.Methods[N, Traits, gauge.State[N, Traits]], + ](behavior) + } +} + +// Combine accepts a variable number of Instruments to combine. If 0 +// items, nil is returned. If 1 item, the item itself is return. +// otherwise, a multiInstrument of the appropriate number kind is returned. +func Combine(desc sdkinstrument.Descriptor, insts ...Instrument) Instrument { + if len(insts) == 0 { + return nil + } + if len(insts) == 1 { + return insts[0] + } + if desc.NumberKind == number.Float64Kind { + return multiInstrument[float64](insts) + } + return multiInstrument[int64](insts) +} + +// multiInstrument is used by Combine() to combine the effects of +// multiple instrument-view behaviors. These instruments produce +// multiAccumulators in NewAccumulator. +type multiInstrument[N number.Any] []Instrument + +// NewAccumulator returns a Accumulator for multiple views of the same instrument. +func (mi multiInstrument[N]) NewAccumulator(kvs attribute.Set) Accumulator { + accs := make([]Accumulator, 0, len(mi)) + + for _, inst := range mi { + accs = append(accs, inst.NewAccumulator(kvs)) + } + return multiAccumulator[N](accs) +} + +// Uses a int(0)-value attribute to identify distinct key sets. +func keysToSet(keys []attribute.Key) *attribute.Set { + attrs := make([]attribute.KeyValue, len(keys)) + for i, key := range keys { + attrs[i] = key.Int(0) + } + ns := attribute.NewSet(attrs...) + return &ns +} + +// keyFilter provides an attribute.Filter implementation based on a +// map[attribute.Key]. +type keyFilter map[attribute.Key]struct{} + +// filter is an attribute.Filter. +func (ks keyFilter) filter(kv attribute.KeyValue) bool { + _, has := ks[kv.Key] + return has +} + +// keysToFilter constructs a keyFilter. +func keysToFilter(keys []attribute.Key) *attribute.Filter { + kf := keyFilter{} + for _, k := range keys { + kf[k] = struct{}{} + } + var af attribute.Filter = kf.filter + return &af +} + +// equalConfigs compares two aggregator configurations. +func equalConfigs(a, b aggregator.Config) bool { + return a == b +} + +// viewAggConfig returns the aggregator configuration prescribed by a view clause. +func viewAggConfig(r *view.DefaultConfig, ak aggregation.Kind, ik sdkinstrument.Kind, nk number.Kind, vcfg aggregator.Config) aggregator.Config { + if ak != aggregation.HistogramKind { + return aggregator.Config{} + } + if vcfg != (aggregator.Config{}) { + return vcfg + } + return r.AggregationConfig(ik, nk) +} + +// checkSemanticCompatibility checks whether an instrument / +// aggregator pairing is well defined. +// +// TODO(jmacd): There are a couple of specification questions about +// this worth raising. +func checkSemanticCompatibility(ik sdkinstrument.Kind, aggPtr *aggregation.Kind) error { + agg := *aggPtr + cat := agg.Category(ik) + + if agg == aggregation.AnySumKind { + switch cat { + case aggregation.MonotonicSumCategory, aggregation.HistogramCategory: + agg = aggregation.MonotonicSumKind + case aggregation.NonMonotonicSumCategory: + agg = aggregation.NonMonotonicSumKind + default: + agg = aggregation.UndefinedKind + } + *aggPtr = agg + } + + switch ik { + case sdkinstrument.CounterKind, sdkinstrument.HistogramKind: + switch cat { + case aggregation.MonotonicSumCategory, aggregation.NonMonotonicSumCategory, aggregation.HistogramCategory: + return nil + } + + case sdkinstrument.UpDownCounterKind, sdkinstrument.UpDownCounterObserverKind: + switch cat { + case aggregation.NonMonotonicSumCategory: + return nil + } + + case sdkinstrument.CounterObserverKind: + switch cat { + case aggregation.NonMonotonicSumCategory, aggregation.MonotonicSumCategory: + return nil + } + + case sdkinstrument.GaugeObserverKind: + switch cat { + case aggregation.GaugeCategory: + return nil + } + } + + *aggPtr = view.StandardAggregationKind(ik) + return SemanticError{ + Instrument: ik, + Aggregation: agg, + } +} + +// viewDescriptor returns the modified sdkinstrument.Descriptor of a +// view. It retains the original instrument kind, numebr kind, and +// unit, while allowing the name and description to change. +func viewDescriptor(instrument sdkinstrument.Descriptor, v view.ClauseConfig) sdkinstrument.Descriptor { + ikind := instrument.Kind + nkind := instrument.NumberKind + name := instrument.Name + description := instrument.Description + unit := instrument.Unit + if v.HasName() { + name = v.Name() + } + if v.Description() != "" { + description = v.Description() + } + return sdkinstrument.NewDescriptor(name, ikind, nkind, description, unit) +} diff --git a/lightstep/sdk/metric/internal/viewstate/viewstate_test.go b/lightstep/sdk/metric/internal/viewstate/viewstate_test.go new file mode 100644 index 00000000..b9c43780 --- /dev/null +++ b/lightstep/sdk/metric/internal/viewstate/viewstate_test.go @@ -0,0 +1,1082 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/gauge" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/histogram" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/sum" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/test" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/sdk/instrumentation" +) + +var ( + testLib = instrumentation.Library{ + Name: "test", + } + + fooToBarView = view.WithClause( + view.MatchInstrumentName("foo"), + view.WithName("bar"), + ) + + testHistBoundaries = []float64{1, 2, 3} + + defaultAggregatorConfig = aggregator.Config{} + + altHistogramConfig = aggregator.Config{ + Histogram: aggregator.HistogramConfig{ + MaxSize: 15, + }, + } + + fooToBarAltHistView = view.WithClause( + view.MatchInstrumentName("foo"), + view.WithName("bar"), + view.WithAggregatorConfig(altHistogramConfig), + ) + + fooToBarFilteredView = view.WithClause( + view.MatchInstrumentName("foo"), + view.WithName("bar"), + view.WithKeys([]attribute.Key{"a", "b"}), + ) + + fooToBarDifferentFiltersViews = []view.Option{ + fooToBarFilteredView, + view.WithClause( + view.MatchInstrumentName("bar"), + view.WithKeys([]attribute.Key{"a"}), + ), + } + + fooToBarSameFiltersViews = []view.Option{ + fooToBarFilteredView, + view.WithClause( + view.MatchInstrumentName("bar"), + view.WithKeys([]attribute.Key{"a", "b"}), + ), + } + + dropHistInstView = view.WithClause( + view.MatchInstrumentKind(sdkinstrument.HistogramKind), + view.WithAggregation(aggregation.DropKind), + ) + + instrumentKinds = []sdkinstrument.Kind{ + sdkinstrument.HistogramKind, + sdkinstrument.GaugeObserverKind, + sdkinstrument.CounterKind, + sdkinstrument.UpDownCounterKind, + sdkinstrument.CounterObserverKind, + sdkinstrument.UpDownCounterObserverKind, + } + + numberKinds = []number.Kind{ + number.Int64Kind, + number.Float64Kind, + } + + endTime = time.Now() + middleTime = endTime.Add(-time.Millisecond) + startTime = endTime.Add(-2 * time.Millisecond) + + testSequence = data.Sequence{ + Start: startTime, + Last: middleTime, + Now: endTime, + } +) + +const ( + cumulative = aggregation.CumulativeTemporality + delta = aggregation.DeltaTemporality +) + +func testCompile(vc *Compiler, name string, ik sdkinstrument.Kind, nk number.Kind, opts ...instrument.Option) (Instrument, error) { + inst, conflicts := vc.Compile(test.Descriptor(name, ik, nk, opts...)) + return inst, conflicts.AsError() +} + +func testCollect(t *testing.T, vc *Compiler) []data.Instrument { + return test.CollectScope(t, vc.Collectors(), testSequence) +} + +func testCollectSequence(t *testing.T, vc *Compiler, seq data.Sequence) []data.Instrument { + return test.CollectScope(t, vc.Collectors(), seq) +} + +func testCollectSequenceReuse(t *testing.T, vc *Compiler, seq data.Sequence, output *data.Scope) []data.Instrument { + return test.CollectScopeReuse(t, vc.Collectors(), seq, output) +} + +// TestDeduplicateNoConflict verifies that two identical instruments +// have the same collector. +func TestDeduplicateNoConflict(t *testing.T) { + vc := New(testLib, view.New("test")) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err2) + require.NotNil(t, inst2) + + require.Equal(t, inst1, inst2) +} + +// TestDeduplicateRenameNoConflict verifies that one instrument can be renamed +// such that it becomes identical to another, so no conflict. +func TestDeduplicateRenameNoConflict(t *testing.T) { + vc := New(testLib, view.New("test", fooToBarView)) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "bar", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err2) + require.NotNil(t, inst2) + + require.Equal(t, inst1, inst2) +} + +// TestNoRenameNoConflict verifies that one instrument does not +// conflict with another differently-named instrument. +func TestNoRenameNoConflict(t *testing.T) { + vc := New(testLib, view.New("test")) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "bar", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err2) + require.NotNil(t, inst2) + + require.NotEqual(t, inst1, inst2) +} + +// TestDuplicateNumberConflict verifies that two same instruments +// except different number kind conflict. +func TestDuplicateNumberConflict(t *testing.T) { + vc := New(testLib, view.New("test")) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Float64Kind) + require.Error(t, err2) + require.NotNil(t, inst2) + require.True(t, errors.Is(err2, ViewConflictsError{})) + require.Equal(t, 1, len(err2.(ViewConflictsError))) + require.Equal(t, 1, len(err2.(ViewConflictsError)["test"])) + require.Equal(t, 2, len(err2.(ViewConflictsError)["test"][0].Duplicates)) + + require.NotEqual(t, inst1, inst2) +} + +// TestDuplicateSyncAsyncConflict verifies that two same instruments +// except one synchonous, one asynchronous conflict. +func TestDuplicateSyncAsyncConflict(t *testing.T) { + vc := New(testLib, view.New("test")) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "foo", sdkinstrument.CounterObserverKind, number.Float64Kind) + require.Error(t, err2) + require.NotNil(t, inst2) + require.True(t, errors.Is(err2, ViewConflictsError{})) + + require.NotEqual(t, inst1, inst2) +} + +// TestDuplicateUnitConflict verifies that two same instruments +// except different units conflict. +func TestDuplicateUnitConflict(t *testing.T) { + vc := New(testLib, view.New("test")) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Float64Kind, instrument.WithUnit("gal_us")) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Float64Kind, instrument.WithUnit("cft_i")) + require.Error(t, err2) + require.NotNil(t, inst2) + require.True(t, errors.Is(err2, ViewConflictsError{})) + require.Contains(t, err2.Error(), "test: name \"foo\" conflicts Counter-Float64-MonotonicSum-gal_us") + + require.NotEqual(t, inst1, inst2) +} + +// TestDuplicateMonotonicConflict verifies that two same instruments +// except different monotonic values. +func TestDuplicateMonotonicConflict(t *testing.T) { + vc := New(testLib, view.New("test")) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "foo", sdkinstrument.UpDownCounterKind, number.Float64Kind) + require.Error(t, err2) + require.NotNil(t, inst2) + require.True(t, errors.Is(err2, ViewConflictsError{})) + require.Contains(t, err2.Error(), "UpDownCounter-Float64-NonMonotonicSum") + + require.NotEqual(t, inst1, inst2) +} + +// TestDuplicateAggregatorConfigConflict verifies that two same instruments +// except different aggregator.Config values. +func TestDuplicateAggregatorConfigConflict(t *testing.T) { + vc := New(testLib, view.New("test", fooToBarAltHistView)) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.HistogramKind, number.Float64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "bar", sdkinstrument.HistogramKind, number.Float64Kind) + require.Error(t, err2) + require.NotNil(t, inst2) + require.True(t, errors.Is(err2, ViewConflictsError{})) + require.Contains(t, err2.Error(), "different aggregator configuration") + + require.NotEqual(t, inst1, inst2) +} + +// TestDuplicateAggregatorConfigNoConflict verifies that two same instruments +// with same aggregator.Config values configured in different ways. +func TestDuplicateAggregatorConfigNoConflict(t *testing.T) { + for _, nk := range numberKinds { + t.Run(nk.String(), func(t *testing.T) { + views := view.New( + "test", + view.WithDefaultAggregationConfigSelector( + func(_ sdkinstrument.Kind) (int64Config, float64Config aggregator.Config) { + if nk == number.Int64Kind { + return altHistogramConfig, aggregator.Config{} + } + return aggregator.Config{}, altHistogramConfig + }, + ), + fooToBarAltHistView, + ) + + vc := New(testLib, views) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.HistogramKind, nk) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "bar", sdkinstrument.HistogramKind, nk) + require.NoError(t, err2) + require.NotNil(t, inst2) + + require.Equal(t, inst1, inst2) + }) + } +} + +// TestDuplicateAggregationKindConflict verifies that two instruments +// with different aggregation kinds conflict. +func TestDuplicateAggregationKindConflict(t *testing.T) { + vc := New(testLib, view.New("test", fooToBarView)) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.HistogramKind, number.Int64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "bar", sdkinstrument.CounterKind, number.Int64Kind) + require.Error(t, err2) + require.NotNil(t, inst2) + require.True(t, errors.Is(err2, ViewConflictsError{})) + require.Contains(t, err2.Error(), "name \"bar\" (original \"foo\") conflicts Histogram-Int64-Histogram, Counter-Int64-MonotonicSum") + + require.NotEqual(t, inst1, inst2) +} + +// TestDuplicateAggregationKindNoConflict verifies that two +// instruments with different aggregation kinds do not conflict when +// the view drops one of the instruments. +func TestDuplicateAggregationKindNoConflict(t *testing.T) { + vc := New(testLib, view.New("test", dropHistInstView)) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.HistogramKind, number.Int64Kind) + require.NoError(t, err1) + require.Nil(t, inst1) // The viewstate.Instrument is nil, instruments become no-ops. + + inst2, err2 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err2) + require.NotNil(t, inst2) +} + +// TestDuplicateMultipleConflicts verifies that multiple duplicate +// instrument conflicts include sufficient explanatory information. +func TestDuplicateMultipleConflicts(t *testing.T) { + vc := New(testLib, view.New("test")) + + inst1, err1 := testCompile(vc, "foo", instrumentKinds[0], number.Float64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + for num, ik := range instrumentKinds[1:] { + inst2, err2 := testCompile(vc, "foo", ik, number.Float64Kind) + require.Error(t, err2) + require.NotNil(t, inst2) + require.True(t, errors.Is(err2, ViewConflictsError{})) + // The total number of conflicting definitions is 1 in + // the first place and num+1 for the iterations of this loop. + require.Equal(t, num+2, len(err2.(ViewConflictsError)["test"][0].Duplicates)) + + if num > 0 { + require.Contains(t, err2.Error(), fmt.Sprintf("and %d more", num)) + } + } +} + +// TestDuplicateFilterConflicts verifies several cases where +// instruments output the same metric w/ different filters create conflicts. +func TestDuplicateFilterConflicts(t *testing.T) { + for idx, vws := range [][]view.Option{ + // In the first case, foo has two attribute filters bar has 0. + {fooToBarFilteredView}, + // In the second case, foo has two attribute filters bar has 1. + fooToBarDifferentFiltersViews, + } { + t.Run(fmt.Sprint(idx), func(t *testing.T) { + vc := New(testLib, view.New("test", vws...)) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "bar", sdkinstrument.CounterKind, number.Int64Kind) + require.Error(t, err2) + require.NotNil(t, inst2) + + require.True(t, errors.Is(err2, ViewConflictsError{})) + require.Contains(t, err2.Error(), "name \"bar\" (original \"foo\") has conflicts: different attribute filters") + }) + } +} + +// TestDeduplicateSameFilters thests that when one instrument is +// renamed to match another exactly, including filters, they are not +// in conflict. +func TestDeduplicateSameFilters(t *testing.T) { + vc := New(testLib, view.New("test", fooToBarSameFiltersViews...)) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + inst2, err2 := testCompile(vc, "bar", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err2) + require.NotNil(t, inst2) + + require.Equal(t, inst1, inst2) +} + +// TestDuplicatesMergeDescriptor ensures that the longest description string is used. +func TestDuplicatesMergeDescriptor(t *testing.T) { + vc := New(testLib, view.New("test", fooToBarSameFiltersViews...)) + + inst1, err1 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err1) + require.NotNil(t, inst1) + + // This is the winning description: + inst2, err2 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind, instrument.WithDescription("very long")) + require.NoError(t, err2) + require.NotNil(t, inst2) + + inst3, err3 := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind, instrument.WithDescription("shorter")) + require.NoError(t, err3) + require.NotNil(t, inst3) + + require.Equal(t, inst1, inst2) + require.Equal(t, inst1, inst3) + + accUpp := inst1.NewAccumulator(attribute.NewSet()) + accUpp.(Updater[int64]).Update(1) + + accUpp.SnapshotAndProcess() + + output := testCollect(t, vc) + + require.Equal(t, 1, len(output)) + require.Equal(t, test.Instrument( + test.Descriptor("bar", sdkinstrument.CounterKind, number.Int64Kind, instrument.WithDescription("very long")), + test.Point(startTime, endTime, sum.NewMonotonicInt64(1), cumulative)), output[0], + ) +} + +// TestViewDescription ensures that a View can override the description. +func TestViewDescription(t *testing.T) { + views := view.New( + "test", + view.WithClause( + view.MatchInstrumentName("foo"), + view.WithDescription("something helpful"), + ), + ) + + vc := New(testLib, views) + + inst1, err1 := testCompile(vc, + "foo", sdkinstrument.CounterKind, number.Int64Kind, + instrument.WithDescription("other description"), + ) + require.NoError(t, err1) + require.NotNil(t, inst1) + + attrs := []attribute.KeyValue{ + attribute.String("K", "V"), + } + accUpp := inst1.NewAccumulator(attribute.NewSet(attrs...)) + accUpp.(Updater[int64]).Update(1) + + accUpp.SnapshotAndProcess() + + output := testCollect(t, vc) + + require.Equal(t, 1, len(output)) + require.Equal(t, + test.Instrument( + test.Descriptor( + "foo", sdkinstrument.CounterKind, number.Int64Kind, + instrument.WithDescription("something helpful"), + ), + test.Point(startTime, endTime, sum.NewMonotonicInt64(1), cumulative, attribute.String("K", "V")), + ), + output[0], + ) +} + +// TestKeyFilters verifies that keys are filtred and metrics are +// correctly aggregated. +func TestKeyFilters(t *testing.T) { + views := view.New("test", + view.WithClause(view.WithKeys([]attribute.Key{"a", "b"})), + ) + + vc := New(testLib, views) + + inst, err := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err) + require.NotNil(t, inst) + + accUpp1 := inst.NewAccumulator( + attribute.NewSet(attribute.String("a", "1"), attribute.String("b", "2"), attribute.String("c", "3")), + ) + accUpp2 := inst.NewAccumulator( + attribute.NewSet(attribute.String("a", "1"), attribute.String("b", "2"), attribute.String("d", "4")), + ) + + accUpp1.(Updater[int64]).Update(1) + accUpp2.(Updater[int64]).Update(1) + accUpp1.SnapshotAndProcess() + accUpp2.SnapshotAndProcess() + + output := testCollect(t, vc) + + require.Equal(t, 1, len(output)) + require.Equal(t, test.Instrument( + test.Descriptor("foo", sdkinstrument.CounterKind, number.Int64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicInt64(2), cumulative, + attribute.String("a", "1"), attribute.String("b", "2"), + )), output[0], + ) +} + +// TestTwoViewsOneInt64Instrument verifies that multiple int64 +// instrument behaviors work; in this case, viewing a Sum in each +// of three independent dimensions. +func TestTwoViewsOneInt64Instrument(t *testing.T) { + views := view.New( + "test", + view.WithClause( + view.MatchInstrumentName("foo"), + view.WithName("foo_a"), + view.WithKeys([]attribute.Key{"a"}), + ), + view.WithClause( + view.MatchInstrumentName("foo"), + view.WithName("foo_b"), + view.WithKeys([]attribute.Key{"b"}), + ), + view.WithClause( + view.MatchInstrumentName("foo"), + view.WithName("foo_c"), + view.WithKeys([]attribute.Key{"c"}), + ), + ) + + vc := New(testLib, views) + + inst, err := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err) + + for _, acc := range []Accumulator{ + inst.NewAccumulator(attribute.NewSet(attribute.String("a", "1"), attribute.String("b", "1"))), + inst.NewAccumulator(attribute.NewSet(attribute.String("a", "1"), attribute.String("b", "2"))), + inst.NewAccumulator(attribute.NewSet(attribute.String("a", "2"), attribute.String("b", "1"))), + inst.NewAccumulator(attribute.NewSet(attribute.String("a", "2"), attribute.String("b", "2"))), + } { + acc.(Updater[int64]).Update(1) + acc.SnapshotAndProcess() + } + + output := testCollect(t, vc) + + test.RequireEqualMetrics(t, + output, + test.Instrument( + test.Descriptor("foo_a", sdkinstrument.CounterKind, number.Int64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicInt64(2), cumulative, attribute.String("a", "1"), + ), + test.Point( + startTime, endTime, sum.NewMonotonicInt64(2), cumulative, attribute.String("a", "2"), + ), + ), + test.Instrument( + test.Descriptor("foo_b", sdkinstrument.CounterKind, number.Int64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicInt64(2), cumulative, attribute.String("b", "1"), + ), + test.Point( + startTime, endTime, sum.NewMonotonicInt64(2), cumulative, attribute.String("b", "2"), + ), + ), + test.Instrument( + test.Descriptor("foo_c", sdkinstrument.CounterKind, number.Int64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicInt64(4), cumulative, + ), + ), + ) +} + +// TestHistogramTwoAggregations verifies that two float64 instrument +// behaviors are correctly combined, in this case one sum and one histogram. +func TestHistogramTwoAggregations(t *testing.T) { + views := view.New( + "test", + view.WithClause( + view.MatchInstrumentName("foo"), + view.WithName("foo_sum"), + view.WithAggregation(aggregation.MonotonicSumKind), + view.WithKeys([]attribute.Key{}), + ), + view.WithClause( + view.MatchInstrumentName("foo"), + view.WithName("foo_hist"), + view.WithAggregation(aggregation.HistogramKind), + ), + ) + + vc := New(testLib, views) + + inst, err := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err) + + acc := inst.NewAccumulator(attribute.NewSet()) + acc.(Updater[float64]).Update(1) + acc.(Updater[float64]).Update(2) + acc.(Updater[float64]).Update(3) + acc.(Updater[float64]).Update(4) + acc.SnapshotAndProcess() + + output := testCollect(t, vc) + + test.RequireEqualMetrics(t, output, + test.Instrument( + test.Descriptor("foo_sum", sdkinstrument.CounterKind, number.Float64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(10), cumulative, + ), + ), + test.Instrument( + test.Descriptor("foo_hist", sdkinstrument.CounterKind, number.Float64Kind), + test.Point( + startTime, endTime, histogram.NewFloat64(defaultAggregatorConfig.Histogram, 1, 2, 3, 4), cumulative, + ), + ), + ) +} + +// TestAllKeysFilter tests that view.WithKeys([]attribute.Key{}) +// correctly erases all keys. +func TestAllKeysFilter(t *testing.T) { + views := view.New( + "test", + view.WithClause(view.WithKeys([]attribute.Key{})), + ) + + vc := New(testLib, views) + + inst, err := testCompile(vc, "foo", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err) + + acc1 := inst.NewAccumulator(attribute.NewSet(attribute.String("a", "1"))) + acc1.(Updater[float64]).Update(1) + acc1.SnapshotAndProcess() + + acc2 := inst.NewAccumulator(attribute.NewSet(attribute.String("b", "2"))) + acc2.(Updater[float64]).Update(1) + acc2.SnapshotAndProcess() + + output := testCollect(t, vc) + + test.RequireEqualMetrics(t, output, + test.Instrument( + test.Descriptor("foo", sdkinstrument.CounterKind, number.Float64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(2), cumulative, + ), + ), + ) +} + +// TestAnySumAggregation checks that the proper aggregation inference +// is performed for each of the inbstrument types when +// aggregation.AnySum kind is configured. +func TestAnySumAggregation(t *testing.T) { + views := view.New( + "test", + view.WithClause(view.WithAggregation(aggregation.AnySumKind)), + ) + + vc := New(testLib, views) + + for _, ik := range []sdkinstrument.Kind{ + sdkinstrument.CounterKind, + sdkinstrument.CounterObserverKind, + sdkinstrument.UpDownCounterKind, + sdkinstrument.UpDownCounterObserverKind, + sdkinstrument.HistogramKind, + sdkinstrument.GaugeObserverKind, + } { + inst, err := testCompile(vc, ik.String(), ik, number.Float64Kind) + if ik == sdkinstrument.GaugeObserverKind { + // semantic conflict, Gauge can't handle AnySum aggregation! + require.Error(t, err) + require.Contains(t, + err.Error(), + "GaugeObserver instrument incompatible with Undefined aggregation", + ) + } else { + require.NoError(t, err) + } + + acc := inst.NewAccumulator(attribute.NewSet()) + acc.(Updater[float64]).Update(1) + acc.SnapshotAndProcess() + } + + output := testCollect(t, vc) + + test.RequireEqualMetrics(t, output, + test.Instrument( + test.Descriptor("CounterKind", sdkinstrument.CounterKind, number.Float64Kind), + test.Point(startTime, endTime, sum.NewMonotonicFloat64(1), cumulative), // AnySum -> Monotonic + ), + test.Instrument( + test.Descriptor("CounterObserverKind", sdkinstrument.CounterObserverKind, number.Float64Kind), + test.Point(startTime, endTime, sum.NewMonotonicFloat64(1), cumulative), // AnySum -> Monotonic + ), + test.Instrument( + test.Descriptor("UpDownCounterKind", sdkinstrument.UpDownCounterKind, number.Float64Kind), + test.Point(startTime, endTime, sum.NewNonMonotonicFloat64(1), cumulative), // AnySum -> Non-Monotonic + ), + test.Instrument( + test.Descriptor("UpDownCounterObserverKind", sdkinstrument.UpDownCounterObserverKind, number.Float64Kind), + test.Point(startTime, endTime, sum.NewNonMonotonicFloat64(1), cumulative), // AnySum -> Non-Monotonic + ), + test.Instrument( + test.Descriptor("HistogramKind", sdkinstrument.HistogramKind, number.Float64Kind), + test.Point(startTime, endTime, sum.NewMonotonicFloat64(1), cumulative), // Histogram to Monotonic Sum + ), + test.Instrument( + test.Descriptor("GaugeObserverKind", sdkinstrument.GaugeObserverKind, number.Float64Kind), + test.Point(startTime, endTime, gauge.NewFloat64(1), cumulative), // This stays a Gauge! + ), + ) +} + +// TestDuplicateAsyncMeasurementsIngored tests that asynchronous +// instrument accumulators keep only the last observed value, while +// synchronous instruments correctly snapshotAndProcess them all. +func TestDuplicateAsyncMeasurementsIngored(t *testing.T) { + vc := New(testLib, view.New("test")) + + inst1, err := testCompile(vc, "async", sdkinstrument.CounterObserverKind, number.Float64Kind) + require.NoError(t, err) + + inst2, err := testCompile(vc, "sync", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err) + + for _, inst := range []Instrument{inst1, inst2} { + acc := inst.NewAccumulator(attribute.NewSet()) + acc.(Updater[float64]).Update(1) + acc.(Updater[float64]).Update(10) + acc.(Updater[float64]).Update(100) + acc.(Updater[float64]).Update(1000) + acc.(Updater[float64]).Update(10000) + acc.(Updater[float64]).Update(100000) + acc.SnapshotAndProcess() + } + + output := testCollect(t, vc) + + test.RequireEqualMetrics(t, output, + test.Instrument( + test.Descriptor("async", sdkinstrument.CounterObserverKind, number.Float64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(100000), cumulative, + ), + ), + test.Instrument( + test.Descriptor("sync", sdkinstrument.CounterKind, number.Float64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(111111), cumulative, + ), + ), + ) +} + +// TestCumulativeTemporality ensures that synchronous instruments +// snapshotAndProcess data over time, whereas asynchronous instruments do not. +func TestCumulativeTemporality(t *testing.T) { + views := view.New( + "test", + view.WithClause( + // Dropping all keys + view.WithKeys([]attribute.Key{}), + ), + view.WithDefaultAggregationTemporalitySelector(view.StandardTemporality), + ) + + vc := New(testLib, views) + + inst1, err := testCompile(vc, "sync", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err) + + inst2, err := testCompile(vc, "async", sdkinstrument.CounterObserverKind, number.Float64Kind) + require.NoError(t, err) + + setA := attribute.NewSet(attribute.String("A", "1")) + setB := attribute.NewSet(attribute.String("B", "1")) + + for rounds := 1; rounds <= 2; rounds++ { + for _, acc := range []Accumulator{ + inst1.NewAccumulator(setA), + inst1.NewAccumulator(setB), + inst2.NewAccumulator(setA), + inst2.NewAccumulator(setB), + } { + acc.(Updater[float64]).Update(1) + acc.SnapshotAndProcess() + } + + test.RequireEqualMetrics(t, testCollect(t, vc), + test.Instrument( + test.Descriptor("sync", sdkinstrument.CounterKind, number.Float64Kind), + test.Point( + // Because synchronous instruments snapshotAndProcess, the + // rounds multiplier is used here but not in the case below. + startTime, endTime, sum.NewMonotonicFloat64(float64(rounds)*2), cumulative, + ), + ), + test.Instrument( + test.Descriptor("async", sdkinstrument.CounterObserverKind, number.Float64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(2), cumulative, + ), + ), + ) + } +} + +// TestDeltaTemporality ensures that synchronous instruments +// snapshotAndProcess data over time, whereas asynchronous instruments do not. +func TestDeltaTemporalityCounter(t *testing.T) { + views := view.New( + "test", + view.WithClause( + // Dropping all keys + view.WithKeys([]attribute.Key{}), + ), + view.WithDefaultAggregationTemporalitySelector(view.DeltaPreferredTemporality), + ) + + vc := New(testLib, views) + + inst1, err := testCompile(vc, "sync", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err) + + inst2, err := testCompile(vc, "async", sdkinstrument.CounterObserverKind, number.Float64Kind) + require.NoError(t, err) + + setA := attribute.NewSet(attribute.String("A", "1")) + setB := attribute.NewSet(attribute.String("B", "1")) + + seq := testSequence + + for rounds := 1; rounds <= 3; rounds++ { + for _, acc := range []Accumulator{ + inst1.NewAccumulator(setA), + inst1.NewAccumulator(setB), + inst2.NewAccumulator(setA), + inst2.NewAccumulator(setB), + } { + acc.(Updater[float64]).Update(float64(rounds)) + acc.SnapshotAndProcess() + } + + test.RequireEqualMetrics(t, testCollectSequence(t, vc, seq), + test.Instrument( + test.Descriptor("sync", sdkinstrument.CounterKind, number.Float64Kind), + test.Point( + // By construction, the change is rounds per attribute set == 2*rounds + seq.Last, seq.Now, sum.NewMonotonicFloat64(2*float64(rounds)), delta, + ), + ), + test.Instrument( + test.Descriptor("async", sdkinstrument.CounterObserverKind, number.Float64Kind), + test.Point( + // By construction, the change is 1 per attribute set == 2 + seq.Last, seq.Now, sum.NewMonotonicFloat64(2), delta, + ), + ), + ) + + // Update the test sequence + seq.Last = seq.Now + seq.Now = time.Now() + } +} + +// TestDeltaTemporalityGauge ensures that the asynchronous gauge +// when used with delta temporalty only reports changed values. +func TestDeltaTemporalityGauge(t *testing.T) { + views := view.New( + "test", + view.WithDefaultAggregationTemporalitySelector(view.DeltaPreferredTemporality), + ) + + vc := New(testLib, views) + + instF, err := testCompile(vc, "gaugeF", sdkinstrument.GaugeObserverKind, number.Float64Kind) + require.NoError(t, err) + + instI, err := testCompile(vc, "gaugeI", sdkinstrument.GaugeObserverKind, number.Int64Kind) + require.NoError(t, err) + + set := attribute.NewSet() + + observe := func(x int) { + accI := instI.NewAccumulator(set) + accI.(Updater[int64]).Update(int64(x)) + accI.SnapshotAndProcess() + + accF := instF.NewAccumulator(set) + accF.(Updater[float64]).Update(float64(x)) + accF.SnapshotAndProcess() + } + + expectValues := func(x int, seq data.Sequence) { + test.RequireEqualMetrics(t, + testCollectSequence(t, vc, seq), + test.Instrument( + test.Descriptor("gaugeF", sdkinstrument.GaugeObserverKind, number.Float64Kind), + test.Point(seq.Last, seq.Now, gauge.NewFloat64(float64(x)), delta), + ), + test.Instrument( + test.Descriptor("gaugeI", sdkinstrument.GaugeObserverKind, number.Int64Kind), + test.Point(seq.Last, seq.Now, gauge.NewInt64(int64(x)), delta), + ), + ) + } + expectNone := func(seq data.Sequence) { + test.RequireEqualMetrics(t, + testCollectSequence(t, vc, seq), + test.Instrument( + test.Descriptor("gaugeF", sdkinstrument.GaugeObserverKind, number.Float64Kind), + ), + test.Instrument( + test.Descriptor("gaugeI", sdkinstrument.GaugeObserverKind, number.Int64Kind), + ), + ) + } + seq := testSequence + tick := func() { + // Update the test sequence + seq.Last = seq.Now + seq.Now = time.Now() + } + + observe(10) + expectValues(10, seq) + tick() + + observe(10) + expectNone(seq) + tick() + + observe(10) + expectNone(seq) + tick() + + observe(11) + expectValues(11, seq) + tick() + + observe(11) + expectNone(seq) + tick() + + observe(10) + expectValues(10, seq) + tick() +} + +// TestSyncDeltaTemporalityCounter ensures that counter and updowncounter +// are skip points with delta temporality and no change. +func TestSyncDeltaTemporalityCounter(t *testing.T) { + views := view.New( + "test", + view.WithDefaultAggregationTemporalitySelector( + func(ik sdkinstrument.Kind) aggregation.Temporality { + return aggregation.DeltaTemporality // Always delta + }), + ) + + vc := New(testLib, views) + + instCF, err := testCompile(vc, "counterF", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err) + + instCI, err := testCompile(vc, "counterI", sdkinstrument.CounterKind, number.Int64Kind) + require.NoError(t, err) + + instUF, err := testCompile(vc, "updowncounterF", sdkinstrument.UpDownCounterKind, number.Float64Kind) + require.NoError(t, err) + + instUI, err := testCompile(vc, "updowncounterI", sdkinstrument.UpDownCounterKind, number.Int64Kind) + require.NoError(t, err) + + set := attribute.NewSet() + + var output data.Scope + + observe := func(mono, nonMono int) { + accCI := instCI.NewAccumulator(set) + accCI.(Updater[int64]).Update(int64(mono)) + accCI.SnapshotAndProcess() + + accCF := instCF.NewAccumulator(set) + accCF.(Updater[float64]).Update(float64(mono)) + accCF.SnapshotAndProcess() + + accUI := instUI.NewAccumulator(set) + accUI.(Updater[int64]).Update(int64(nonMono)) + accUI.SnapshotAndProcess() + + accUF := instUF.NewAccumulator(set) + accUF.(Updater[float64]).Update(float64(nonMono)) + accUF.SnapshotAndProcess() + } + + expectValues := func(mono, nonMono int, seq data.Sequence) { + test.RequireEqualMetrics(t, + testCollectSequenceReuse(t, vc, seq, &output), + test.Instrument( + test.Descriptor("counterF", sdkinstrument.CounterKind, number.Float64Kind), + test.Point(seq.Last, seq.Now, sum.NewMonotonicFloat64(float64(mono)), delta), + ), + test.Instrument( + test.Descriptor("counterI", sdkinstrument.CounterKind, number.Int64Kind), + test.Point(seq.Last, seq.Now, sum.NewMonotonicInt64(int64(mono)), delta), + ), + test.Instrument( + test.Descriptor("updowncounterF", sdkinstrument.UpDownCounterKind, number.Float64Kind), + test.Point(seq.Last, seq.Now, sum.NewNonMonotonicFloat64(float64(nonMono)), delta), + ), + test.Instrument( + test.Descriptor("updowncounterI", sdkinstrument.UpDownCounterKind, number.Int64Kind), + test.Point(seq.Last, seq.Now, sum.NewNonMonotonicInt64(int64(nonMono)), delta), + ), + ) + } + expectNone := func(seq data.Sequence) { + test.RequireEqualMetrics(t, + testCollectSequenceReuse(t, vc, seq, &output), + test.Instrument( + test.Descriptor("counterF", sdkinstrument.CounterKind, number.Float64Kind), + ), + test.Instrument( + test.Descriptor("counterI", sdkinstrument.CounterKind, number.Int64Kind), + ), + test.Instrument( + test.Descriptor("updowncounterF", sdkinstrument.UpDownCounterKind, number.Float64Kind), + ), + test.Instrument( + test.Descriptor("updowncounterI", sdkinstrument.UpDownCounterKind, number.Int64Kind), + ), + ) + } + seq := testSequence + tick := func() { + // Update the test sequence + seq.Last = seq.Now + seq.Now = time.Now() + } + + observe(10, 10) + expectValues(10, 10, seq) + tick() + + observe(0, 100) + observe(0, -100) + expectNone(seq) + tick() + + observe(100, 100) + expectValues(100, 100, seq) + tick() +}