Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427)
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 10x. (#7443)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
11 changes: 9 additions & 2 deletions sdk/metric/exemplar/fixed_size_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,15 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
}

var sum float64
for _, m := range r.measurements {
sum += m.Value.Float64()
for _, val := range r.measurements {
loaded := val.Load()
if loaded == nil {
continue
}
m := loaded.(*measurement)
if m != nil {
sum += m.Value.Float64()
}
}
mean := sum / float64(sampleSize)

Expand Down
7 changes: 1 addition & 6 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
panic("unknown value type")
}

idx := sort.SearchFloat64s(r.bounds, n)
m := newMeasurement(ctx, t, v, a)

r.mu.Lock()
defer r.mu.Unlock()
r.store(idx, m)
r.store(sort.SearchFloat64s(r.bounds, n), newMeasurement(ctx, t, v, a))
}

// Collect returns all the held exemplars.
Expand Down
61 changes: 42 additions & 19 deletions sdk/metric/exemplar/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -17,15 +19,18 @@ type storage struct {
//
// This does not use []metricdata.Exemplar because it potentially would
// require an allocation for trace and span IDs in the hot path of Offer.
measurements []measurement
measurements []atomic.Value
}

func newStorage(n int) *storage {
return &storage{measurements: make([]measurement, n)}
return &storage{measurements: make([]atomic.Value, n)}
}

func (r *storage) store(idx int, m measurement) {
r.measurements[idx] = m
func (r *storage) store(idx int, m *measurement) {
old := r.measurements[idx].Swap(m)
if old != nil {
mPool.Put(old)
}
}

// Collect returns all the held exemplars.
Expand All @@ -34,7 +39,18 @@ func (r *storage) store(idx int, m measurement) {
func (r *storage) Collect(dest *[]Exemplar) {
*dest = reset(*dest, len(r.measurements), len(r.measurements))
var n int
for _, m := range r.measurements {
for _, val := range r.measurements {
// For performance reasons, this iterates over measurements
// concurrently with new measurements being written. This means we do
// not get a point-in-time snapshot of the state of the reservoir.
// This means that for sequential Offer calls, a later Offer call may
// be collected and an earlier call not collected if they are written
// to different indices.
loaded := val.Load()
if loaded == nil {
continue
}
m := loaded.(*measurement)
if !m.valid {
continue
}
Expand All @@ -53,21 +69,27 @@ type measurement struct {
Time time.Time
// Value is the value of the measurement.
Value Value
// SpanContext is the SpanContext active when a measurement was made.
SpanContext trace.SpanContext
// Ctx is the active context when a measurement was made.
Ctx context.Context

valid bool
}

var mPool = sync.Pool{
New: func() any {
return &measurement{}
},
}

// newMeasurement returns a new non-empty Measurement.
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement {
return measurement{
FilteredAttributes: droppedAttr,
Time: ts,
Value: v,
SpanContext: trace.SpanContextFromContext(ctx),
valid: true,
}
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) *measurement {
m := mPool.Get().(*measurement)
m.FilteredAttributes = droppedAttr
m.Time = ts
m.Value = v
m.Ctx = ctx
m.valid = true
return m
}

// exemplar returns m as an [Exemplar].
Expand All @@ -76,15 +98,16 @@ func (m measurement) exemplar(dest *Exemplar) {
dest.Time = m.Time
dest.Value = m.Value

if m.SpanContext.HasTraceID() {
traceID := m.SpanContext.TraceID()
sc := trace.SpanContextFromContext(m.Ctx)
if sc.HasTraceID() {
traceID := sc.TraceID()
dest.TraceID = traceID[:]
} else {
dest.TraceID = dest.TraceID[:0]
}

if m.SpanContext.HasSpanID() {
spanID := m.SpanContext.SpanID()
if sc.HasSpanID() {
spanID := sc.SpanID()
dest.SpanID = spanID[:]
} else {
dest.SpanID = dest.SpanID[:0]
Expand Down