Skip to content

Commit

Permalink
Use inst ID for agg cache key
Browse files Browse the repository at this point in the history
Resolve open-telemetry#4201

The specification requires the duplicate instrument conflicts to be
identified based on the instrument identifying fields:

- name
- instrument kind
- unit
- description
- language-level features such as the number type (int64 and float64)

Currently, the conflict detection and aggregation caching are done based
on the stream IDs which include an aggregation name, monotonicity, and
temporality instead of the instrument kind.

This changes the conflict detection and aggregation caching to use the
OpenTelemetry specified fields. This is effectively a no-op given there
is a 1-to-1 mapping of aggregation-name/monotonicity/temporality to
instrument kind (they are all resolved based on the instrument kind).

Additionally, this adds a stringer representation of the
`InstrumentKind`. This is needed for the logging of duplicate instrument
conflicts.
  • Loading branch information
MrAlias committed Jul 18, 2023
1 parent 9b0c4d2 commit 36ecb4b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 47 deletions.
18 changes: 6 additions & 12 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate stringer -type=InstrumentKind -trimprefix=InstrumentKind

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
Expand All @@ -25,7 +27,6 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

var (
Expand Down Expand Up @@ -172,23 +173,16 @@ func (s Stream) attributeFilter() attribute.Filter {
}
}

// streamID are the identifying properties of a stream.
type streamID struct {
// instID are the identifying properties of a instrument.
type instID struct {
// Name is the name of the stream.
Name string
// Description is the description of the stream.
Description string
// Kind defines the functional group of the instrument.
Kind InstrumentKind
// Unit is the unit of the stream.
Unit string
// Aggregation is the aggregation data type of the stream.
Aggregation string
// Monotonic is the monotonicity of an instruments data type. This field is
// not used for all data types, so a zero value needs to be understood in the
// context of Aggregation.
Monotonic bool
// Temporality is the temporality of a stream's data type. This field is
// not used by some data types.
Temporality metricdata.Temporality
// Number is the number type of the stream.
Number string
}
Expand Down
29 changes: 29 additions & 0 deletions sdk/metric/instrumentkind_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type meter struct {
func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]
var viewCache cache[string, instID]

return &meter{
scope: s,
Expand Down Expand Up @@ -447,7 +447,7 @@ type int64InstProvider struct {
resolve resolver[int64]
}

func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *int64InstProvider {
func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, instID]) *int64InstProvider {
return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)}
}

Expand Down Expand Up @@ -475,7 +475,7 @@ type float64InstProvider struct {
resolve resolver[float64]
}

func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *float64InstProvider {
func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, instID]) *float64InstProvider {
return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)}
}

Expand Down
40 changes: 16 additions & 24 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,24 +187,24 @@ type inserter[N int64 | float64] struct {
// cache ensures no duplicate aggregate functions are inserted into the
// reader pipeline and if a new request during an instrument creation asks
// for the same aggregate function input the same instance is returned.
aggregators *cache[streamID, aggVal[N]]
aggregators *cache[instID, aggVal[N]]

// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, streamID]
views *cache[string, instID]

pipeline *pipeline
}

func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] {
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] {
if vc == nil {
vc = &cache[string, streamID]{}
vc = &cache[string, instID]{}
}
return &inserter[N]{
aggregators: &cache[streamID, aggVal[N]]{},
aggregators: &cache[instID, aggVal[N]]{},
views: vc,
pipeline: p,
}
Expand Down Expand Up @@ -320,12 +320,14 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
)
}

id := i.streamID(kind, stream)
id := i.instID(kind, stream)
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
cv := i.aggregators.Lookup(id, func() aggVal[N] {
b := aggregate.Builder[N]{Temporality: id.Temporality}
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
}
if len(stream.AllowAttributeKeys) > 0 {
b.Filter = stream.attributeFilter()
}
Expand All @@ -350,8 +352,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum

// logConflict validates if an instrument with the same name as id has already
// been created. If that instrument conflicts with id, a warning is logged.
func (i *inserter[N]) logConflict(id streamID) {
existing := i.views.Lookup(id.Name, func() streamID { return id })
func (i *inserter[N]) logConflict(id instID) {
existing := i.views.Lookup(id.Name, func() instID { return id })
if id == existing {
return
}
Expand All @@ -360,31 +362,21 @@ func (i *inserter[N]) logConflict(id streamID) {
"duplicate metric stream definitions",
"names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
"descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
"kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind),
"units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit),
"numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number),
"aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation),
"monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic),
"temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()),
)
}

func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID {
func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
var zero N
id := streamID{
return instID{
Name: stream.Name,
Description: stream.Description,
Unit: stream.Unit,
Aggregation: fmt.Sprintf("%T", stream.Aggregation),
Temporality: i.pipeline.reader.temporality(kind),
Kind: kind,
Number: fmt.Sprintf("%T", zero),
}

switch kind {
case InstrumentKindObservableCounter, InstrumentKindCounter, InstrumentKindHistogram:
id.Monotonic = true
}

return id
}

// aggregateFunc returns new aggregate functions matching agg, kind, and
Expand Down Expand Up @@ -513,7 +505,7 @@ type resolver[N int64 | float64] struct {
inserters []*inserter[N]
}

func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] {
func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter[N](p[i], vc)
Expand Down
14 changes: 7 additions & 7 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var c cache[string, streamID]
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views)
i := newInserter[N](p, &c)
input, err := i.Instrument(tt.inst)
Expand All @@ -371,7 +371,7 @@ func TestCreateAggregators(t *testing.T) {
}

func testInvalidInstrumentShouldPanic[N int64 | float64]() {
var c cache[string, streamID]
var c cache[string, instID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
inst := Instrument{
Name: "foo",
Expand All @@ -391,7 +391,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) {
require.Len(t, pipes, 2, "created pipelines")

inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[int64](pipes, &c)
aggs, err := r.Aggregators(inst)
require.NoError(t, err, "resolved Aggregators error")
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[int64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
Expand All @@ -478,7 +478,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo

func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[float64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
Expand All @@ -505,7 +505,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}

var vc cache[string, streamID]
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(inst)
assert.Error(t, err)
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {

p := newPipelines(resource.Empty(), readers, views)

var vc cache[string, streamID]
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(fooInst)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var c cache[string, streamID]
var c cache[string, instID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
require.NoError(t, err)
Expand Down

0 comments on commit 36ecb4b

Please sign in to comment.