Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 44 additions & 55 deletions metrics/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,17 @@ type Registry interface {

// Unregister the metric with the given name.
Unregister(string)

// Unregister all metrics. (Mostly for testing.)
UnregisterAll()
}

// The standard implementation of a Registry is a mutex-protected map
// The standard implementation of a Registry uses sync.map
// of names to metrics.
type StandardRegistry struct {
metrics map[string]interface{}
mutex sync.Mutex
metrics sync.Map
}

// Create a new registry.
func NewRegistry() Registry {
return &StandardRegistry{metrics: make(map[string]interface{})}
return &StandardRegistry{}
}

// Call the given function for each registered metric.
Expand All @@ -71,45 +67,57 @@ func (r *StandardRegistry) Each(f func(string, interface{})) {

// Get the metric by the given name or nil if none is registered.
func (r *StandardRegistry) Get(name string) interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.metrics[name]
item, _ := r.metrics.Load(name)
return item
}

// Gets an existing metric or creates and registers a new one. Threadsafe
// alternative to calling Get and Register on failure.
// The interface can be the metric to register if not found in registry,
// or a function returning the metric for lazy instantiation.
func (r *StandardRegistry) GetOrRegister(name string, i interface{}) interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
if metric, ok := r.metrics[name]; ok {
return metric
// fast path
cached, ok := r.metrics.Load(name)
if ok {
return cached
}
if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
i = v.Call(nil)[0].Interface()
}
r.register(name, i)
return i
item, _, ok := r.loadOrRegister(name, i)
if !ok {
return i
}
return item
}

// Register the given metric under the given name. Returns a DuplicateMetric
// if a metric by the given name is already registered.
func (r *StandardRegistry) Register(name string, i interface{}) error {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.register(name, i)
// fast path
_, ok := r.metrics.Load(name)
if ok {
return DuplicateMetric(name)
}

if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
i = v.Call(nil)[0].Interface()
}
_, loaded, _ := r.loadOrRegister(name, i)
if loaded {
return DuplicateMetric(name)
}
return nil
}

// Run all registered healthchecks.
func (r *StandardRegistry) RunHealthchecks() {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, i := range r.metrics {
if h, ok := i.(Healthcheck); ok {
r.metrics.Range(func(key, value any) bool {
if h, ok := value.(Healthcheck); ok {
h.Check()
}
}
return true
})
}

// GetAll metrics in the Registry
Expand Down Expand Up @@ -177,45 +185,31 @@ func (r *StandardRegistry) GetAll() map[string]map[string]interface{} {

// Unregister the metric with the given name.
func (r *StandardRegistry) Unregister(name string) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.stop(name)
delete(r.metrics, name)
}

// Unregister all metrics. (Mostly for testing.)
func (r *StandardRegistry) UnregisterAll() {
r.mutex.Lock()
defer r.mutex.Unlock()
for name := range r.metrics {
r.stop(name)
delete(r.metrics, name)
}
r.metrics.LoadAndDelete(name)
}

func (r *StandardRegistry) register(name string, i interface{}) error {
if _, ok := r.metrics[name]; ok {
return DuplicateMetric(name)
}
func (r *StandardRegistry) loadOrRegister(name string, i interface{}) (interface{}, bool, bool) {
switch i.(type) {
case Counter, CounterFloat64, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer:
r.metrics[name] = i
default:
return nil, false, false
}
return nil
item, loaded := r.metrics.LoadOrStore(name, i)
return item, loaded, true
}

func (r *StandardRegistry) registered() map[string]interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
metrics := make(map[string]interface{}, len(r.metrics))
for name, i := range r.metrics {
metrics[name] = i
}
metrics := make(map[string]interface{})
r.metrics.Range(func(key, value any) bool {
metrics[key.(string)] = value
return true
})
return metrics
}

func (r *StandardRegistry) stop(name string) {
if i, ok := r.metrics[name]; ok {
if i, ok := r.metrics.Load(name); ok {
if s, ok := i.(Stoppable); ok {
s.Stop()
}
Expand Down Expand Up @@ -308,11 +302,6 @@ func (r *PrefixedRegistry) Unregister(name string) {
r.underlying.Unregister(realName)
}

// Unregister all metrics. (Mostly for testing.)
func (r *PrefixedRegistry) UnregisterAll() {
r.underlying.UnregisterAll()
}

var (
DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry()
Expand Down
25 changes: 25 additions & 0 deletions metrics/registry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"sync"
"testing"
)

Expand All @@ -13,6 +14,30 @@ func BenchmarkRegistry(b *testing.B) {
}
}

func BenchmarkRegistryGetOrRegisterParallel_8(b *testing.B) {
benchmarkRegistryGetOrRegisterParallel(b, 8)
}

func BenchmarkRegistryGetOrRegisterParallel_32(b *testing.B) {
benchmarkRegistryGetOrRegisterParallel(b, 32)
}

func benchmarkRegistryGetOrRegisterParallel(b *testing.B, amount int) {
r := NewRegistry()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < amount; i++ {
wg.Add(1)
go func() {
for i := 0; i < b.N; i++ {
r.GetOrRegister("foo", NewMeter)
}
wg.Done()
}()
}
wg.Wait()
}

func TestRegistry(t *testing.T) {
r := NewRegistry()
r.Register("foo", NewCounter())
Expand Down