Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transient descriptor support in OTel Metric SDK #59

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func (inst *instrument) Descriptor() metric.Descriptor {
return inst.descriptor
}

func (inst *instrument) Unref() {
}

// MeterProvider interface and delegation

func newMeterProvider() *meterProvider {
Expand Down
3 changes: 3 additions & 0 deletions metric/metric_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func (noopInstrument) Implementation() interface{} {
return nil
}

func (noopInstrument) Unref() {
}

func (noopInstrument) Descriptor() Descriptor {
return Descriptor{}
}
Expand Down
3 changes: 3 additions & 0 deletions metric/metric_sdkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type InstrumentImpl interface {

// Descriptor returns a copy of the instrument's Descriptor.
Descriptor() Descriptor

// @@@
Unref()
}

// SyncImpl is the implementation-level interface to a generic
Expand Down
3 changes: 3 additions & 0 deletions oteltest/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (i Instrument) Descriptor() metric.Descriptor {
return i.descriptor
}

func (i Instrument) Unref() {
}

func (a *Async) Implementation() interface{} {
return a
}
Expand Down
20 changes: 12 additions & 8 deletions sdk/metric/refcount_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,36 @@ import (
"sync/atomic"
)

// refcountMapped atomically counts the number of references (usages) of an entry
// RefcountMapped atomically counts the number of references (usages) of an entry
// while also keeping a state of mapped/unmapped into a different data structure
// (an external map or list for example).
//
// refcountMapped uses an atomic value where the least significant bit is used to
// RefcountMapped uses an atomic value where the least significant bit is used to
// keep the state of mapping ('1' is used for unmapped and '0' is for mapped) and
// the rest of the bits are used for refcounting.
type refcountMapped struct {
type RefcountMapped struct {
// refcount has to be aligned for 64-bit atomic operations.
value int64
}

// ref returns true if the entry is still mapped and increases the
// Ref returns true if the entry is still mapped and increases the
// reference usages, if unmapped returns false.
func (rm *refcountMapped) ref() bool {
func (rm *RefcountMapped) Ref() bool {
// Check if this entry was marked as unmapped between the moment
// we got a reference to it (or will be removed very soon) and here.
return atomic.AddInt64(&rm.value, 2)&1 == 0
}

func (rm *refcountMapped) unref() {
func (rm *RefcountMapped) Unref() {
atomic.AddInt64(&rm.value, -2)
}

// tryUnmap flips the mapped bit to "unmapped" state and returns true if both of the
// TryUnmap flips the mapped bit to "unmapped" state and returns true if both of the
// following conditions are true upon entry to this function:
// * There are no active references;
// * The mapped bit is in "mapped" state.
// Otherwise no changes are done to mapped bit and false is returned.
func (rm *refcountMapped) tryUnmap() bool {
func (rm *RefcountMapped) TryUnmap() bool {
if atomic.LoadInt64(&rm.value) != 0 {
return false
}
Expand All @@ -57,3 +57,7 @@ func (rm *refcountMapped) tryUnmap() bool {
1,
)
}

func InitRefcountMapped() RefcountMapped {
return RefcountMapped{value: 2}
}
18 changes: 11 additions & 7 deletions sdk/metric/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type (
record struct {
// refMapped keeps track of refcounts and the mapping state to the
// Accumulator.current map.
refMapped refcountMapped
refMapped RefcountMapped

// updateCount is incremented on every Update.
updateCount int64
Expand Down Expand Up @@ -154,6 +154,10 @@ func (inst *instrument) Descriptor() metric.Descriptor {
return inst.descriptor
}

func (inst *instrument) Unref() {
// This is a no-op for the standard Accumulator.
}

func (a *asyncInstrument) Implementation() interface{} {
return a
}
Expand Down Expand Up @@ -235,7 +239,7 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set
if actual, ok := s.meter.current.Load(mk); ok {
// Existing record case.
existingRec := actual.(*record)
if existingRec.refMapped.ref() {
if existingRec.refMapped.Ref() {
// At this moment it is guaranteed that the entry is in
// the map and will not be removed.
return existingRec
Expand All @@ -247,7 +251,7 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set
rec = &record{}
rec.labels = labelPtr
}
rec.refMapped = refcountMapped{value: 2}
rec.refMapped = InitRefcountMapped()
rec.inst = s

s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint)
Expand All @@ -259,7 +263,7 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set
// Existing record case. Cannot change rec here because if fail
// will try to add rec again to avoid new allocations.
oldRec := actual.(*record)
if oldRec.refMapped.ref() {
if oldRec.refMapped.Ref() {
// At this moment it is guaranteed that the entry is in
// the map and will not be removed.
return oldRec
Expand Down Expand Up @@ -372,7 +376,7 @@ func (m *Accumulator) collectSyncInstruments() int {
}

// Having no updates since last collection, try to unmap:
if unmapped := inuse.refMapped.tryUnmap(); !unmapped {
if unmapped := inuse.refMapped.TryUnmap(); !unmapped {
// The record is referenced by a binding, continue.
return true
}
Expand All @@ -383,7 +387,7 @@ func (m *Accumulator) collectSyncInstruments() int {
m.current.Delete(inuse.mapkey())

// There's a potential race between `LoadInt64` and
// `tryUnmap` in this function. Since this is the
// `TryUnmap` in this function. Since this is the
// last we'll see of this record, checkpoint
mods = atomic.LoadInt64(&inuse.updateCount)
if mods != coll {
Expand Down Expand Up @@ -516,7 +520,7 @@ func (r *record) RecordOne(ctx context.Context, num number.Number) {

// Unbind implements metric.SyncImpl.
func (r *record) Unbind() {
r.refMapped.unref()
r.refMapped.Unref()
}

func (r *record) mapkey() mapkey {
Expand Down
183 changes: 183 additions & 0 deletions sdk/metric/transient/transient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transient

import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/label"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
)

type (
Accumulator struct {
standard *sdk.Accumulator
lock sync.Mutex
instruments sync.Map
}

syncImpl struct {
standard metric.SyncImpl
refMapped sdk.RefcountMapped

updateCount int64
collectedCount int64
}

boundSyncImpl struct {
standard metric.BoundSyncImpl
impl *syncImpl
}
)

var (
ErrAsyncNotSupported = fmt.Errorf("transient asynchronous instruments not supported")
ErrInvalidBindInstrument = fmt.Errorf("invalid bind instrument")

_ metric.MeterImpl = &Accumulator{}
_ metric.SyncImpl = &syncImpl{}
_ metric.BoundSyncImpl = &boundSyncImpl{}
)

func NewAccumulator(processor export.Processor, opts ...sdk.Option) *Accumulator {
return &Accumulator{
standard: sdk.NewAccumulator(processor, opts...),
}
}

func (a *Accumulator) NewSyncInstrument(descriptor metric.Descriptor) (metric.SyncImpl, error) {
if actual, ok := a.instruments.Load(descriptor); ok {
existing := actual.(*syncImpl)
if existing.refMapped.Ref() {
return existing, nil
}
}

s, err := a.standard.NewSyncInstrument(descriptor)
if err != nil {
return nil, err
}

impl := &syncImpl{
standard: s,
refMapped: sdk.InitRefcountMapped(),
}

for {
if actual, loaded := a.instruments.LoadOrStore(descriptor, impl); loaded {
existing := actual.(*syncImpl)
if existing.refMapped.Ref() {
return existing, nil
}
runtime.Gosched()
continue
}

return impl, nil
}
}

func (a *Accumulator) NewAsyncInstrument(metric.Descriptor, metric.AsyncRunner) (metric.AsyncImpl, error) {
return metric.NoopAsync{}, ErrAsyncNotSupported
}

func (a *Accumulator) RecordBatch(ctx context.Context, labels []label.KeyValue, measurements ...metric.Measurement) {
for _, m := range measurements {
impl := m.SyncImpl().(*syncImpl)
atomic.AddInt64(&impl.updateCount, 1)
}
a.standard.RecordBatch(ctx, labels, measurements...)
}

func (a *Accumulator) Collect(ctx context.Context) int {
a.lock.Lock()
defer a.lock.Unlock()
defer a.purgeInstruments()
return a.standard.Collect(ctx)
}

func (a *Accumulator) purgeInstruments() {
a.instruments.Range(func(key, value interface{}) bool {
impl := value.(*syncImpl)

uses := atomic.LoadInt64(&impl.updateCount)
coll := impl.collectedCount

if uses != coll {
impl.collectedCount = uses
return true
}

if unmapped := impl.refMapped.TryUnmap(); !unmapped {
return true
}

a.instruments.Delete(key)
return true
})
}

func (s *syncImpl) Implementation() interface{} {
return s.standard.Implementation()
}

func (s *syncImpl) Descriptor() metric.Descriptor {
return s.standard.Descriptor()
}

func (s *syncImpl) Unref() {
s.refMapped.Unref()
}

func (s *syncImpl) Bind(labels []label.KeyValue) metric.BoundSyncImpl {
valid := s.refMapped.Ref()
if !valid {
panic(ErrInvalidBindInstrument)
}
return &boundSyncImpl{
standard: s.standard.Bind(labels),
impl: s,
}
}

func (s *syncImpl) RecordOne(ctx context.Context, number metric.Number, labels []label.KeyValue) {
atomic.AddInt64(&s.updateCount, 1)
s.standard.RecordOne(ctx, number, labels)
}

func (b *boundSyncImpl) RecordOne(ctx context.Context, number metric.Number) {
atomic.AddInt64(&b.impl.updateCount, 1)
b.standard.RecordOne(ctx, number)
}

func (b *boundSyncImpl) Unbind() {
b.standard.Unbind()
b.impl.refMapped.Unref()
}

func (a *Accumulator) Size() int {
sz := 0
a.instruments.Range(func(_, _ interface{}) bool {
sz++
return true
})
return sz
}
Loading