Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421)
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 10x. (#7443)
- Improve the concurrent performance of `FixedSizeReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 3x. (#7447)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
14 changes: 11 additions & 3 deletions sdk/metric/exemplar/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func BenchmarkFixedSizeReservoirOffer(b *testing.B) {
i := 0
for pb.Next() {
reservoir.Offer(ctx, ts, val, nil)
// Periodically trigger a reset, because the algorithm for fixed-size
// reservoirs records exemplars very infrequently after a large
// number of collect calls.
// Periodically trigger a reset, because the algorithm records
// exemplars very infrequently after a large number of collect
// calls.
if i%100 == 99 {
reservoir.mu.Lock()
reservoir.reset()
Expand All @@ -44,6 +44,14 @@ func BenchmarkHistogramReservoirOffer(b *testing.B) {
i := 0
for pb.Next() {
res.Offer(ctx, ts, values[i%len(values)], nil)
// Periodically trigger a reset, because the algorithm records
// exemplars very infrequently after a large number of collect
// calls.
if i%100 == 99 {
for i := range res.trackers {
res.trackers[i].reset()
}
}
i++
}
})
Expand Down
145 changes: 87 additions & 58 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"math"
"math/rand/v2"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -25,7 +27,13 @@ func FixedSizeReservoirProvider(k int) ReservoirProvider {
// sample each one. If there are more than k, the Reservoir will then randomly
// sample all additional measurement with a decreasing probability.
func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
return newFixedSizeReservoir(newStorage(k))
if k < 0 {
k = 0
}
return &FixedSizeReservoir{
storage: newStorage(k),
nextTracker: newNextTracker(k),
}
}

var _ Reservoir = &FixedSizeReservoir{}
Expand All @@ -37,39 +45,7 @@ var _ Reservoir = &FixedSizeReservoir{}
type FixedSizeReservoir struct {
reservoir.ConcurrentSafe
*storage

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}

func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
r := &FixedSizeReservoir{
storage: s,
}
r.reset()
return r
}

// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (*FixedSizeReservoir) randomFloat64() float64 {
// TODO: Use an algorithm that avoids rejection sampling. For example:
//
// const precision = 1 << 53 // 2^53
// // Generate an integer in [1, 2^53 - 1]
// v := rand.Uint64() % (precision - 1) + 1
// return float64(v) / float64(precision)
f := rand.Float64()
for f == 0 {
f = rand.Float64()
}
return f
*nextTracker
}

// Offer accepts the parameters associated with a measurement. The
Expand Down Expand Up @@ -125,25 +101,58 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
// https://github.com/MrAlias/reservoir-sampling for a performance
// comparison of reservoir sampling algorithms.

r.mu.Lock()
defer r.mu.Unlock()
if int(r.count) < cap(r.measurements) {
r.store(int(r.count), newMeasurement(ctx, t, n, a))
} else if r.count == r.next {
count, next := r.incrementCount()
intCount := int(count) // nolint:gosec // count is at most 32 bits in length
if intCount < r.k {
r.store(intCount, newMeasurement(ctx, t, n, a))
} else if count == next {
// Overwrite a random existing measurement with the one offered.
idx := int(rand.Int64N(int64(cap(r.measurements))))
idx := rand.IntN(r.k)
r.store(idx, newMeasurement(ctx, t, n, a))
r.wMu.Lock()
defer r.wMu.Unlock()
r.advance()
}
r.count++
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
r.reset()
}

func newNextTracker(k int) *nextTracker {
nt := &nextTracker{k: k}
nt.reset()
return nt
}

type nextTracker struct {
// count is the number of measurement seen, and is in the lower 32 bits.
// once the reservoir has been filled, and is in the upper 32 bits.
countAndNext atomic.Uint64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
// wMu ensures w is kept consistent with next during advance and reset.
wMu sync.Mutex
// k is the number of measurements that can be stored in the reservoir.
k int
}

// reset resets r to the initial state.
func (r *FixedSizeReservoir) reset() {
func (r *nextTracker) reset() {
r.wMu.Lock()
defer r.wMu.Unlock()
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
r.next = int64(cap(r.measurements))
r.setCountAndNext(0, uint64(r.k)) // nolint:gosec // we ensure k is 1 or greater.

// Initial random number in the series used to generate r.next.
//
Expand All @@ -154,14 +163,30 @@ func (r *FixedSizeReservoir) reset() {
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements)))
r.w = math.Exp(math.Log(randomFloat64()) / float64(r.k))

r.advance()
}

// returns the count before the increment and next value.
func (r *nextTracker) incrementCount() (uint64, uint64) {
n := r.countAndNext.Add(1)
return n&((1<<32)-1) - 1, n >> 32
}

// returns the count before the increment and next value.
func (r *nextTracker) incrementNext(inc uint64) {
r.countAndNext.Add(inc << 32)
}

// returns the count before the increment and next value.
func (r *nextTracker) setCountAndNext(count, next uint64) {
r.countAndNext.Store(next<<32 + count)
}

// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *FixedSizeReservoir) advance() {
func (r *nextTracker) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
Expand All @@ -174,7 +199,7 @@ func (r *FixedSizeReservoir) advance() {
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements)))
r.w *= math.Exp(math.Log(randomFloat64()) / float64(r.k))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
Expand All @@ -185,17 +210,21 @@ func (r *FixedSizeReservoir) advance() {
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
r.incrementNext(uint64(math.Log(randomFloat64())/math.Log(1-r.w)) + 1)
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
r.reset()
// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func randomFloat64() float64 {
// TODO: Use an algorithm that avoids rejection sampling. For example:
//
// const precision = 1 << 53 // 2^53
// // Generate an integer in [1, 2^53 - 1]
// v := rand.Uint64() % (precision - 1) + 1
// return float64(v) / float64(precision)
f := rand.Float64()
for f == 0 {
f = rand.Float64()
}
return f
}
27 changes: 25 additions & 2 deletions sdk/metric/exemplar/fixed_size_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,35 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
}

var sum float64
for _, m := range r.measurements {
sum += m.Value.Float64()
for _, val := range r.measurements {
loaded := val.Load()
if loaded == nil {
continue
}
m := loaded.(*measurement)
if m != nil {
sum += m.Value.Float64()
}
}
mean := sum / float64(sampleSize)

// Check the intensity/rate of the sampled distribution is preserved
// ensuring no bias in our random sampling algorithm.
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
}

func TestNextTrackerAtomics(t *testing.T) {
capacity := 10
nt := newNextTracker(capacity)
nt.setCountAndNext(0, 11)
count, next := nt.incrementCount()
assert.Equal(t, uint64(0), count)
assert.Equal(t, uint64(11), next)
count, secondNext := nt.incrementCount()
assert.Equal(t, uint64(1), count)
assert.Equal(t, next, secondNext)
nt.setCountAndNext(50, 100)
count, next = nt.incrementCount()
assert.Equal(t, uint64(50), count)
assert.Equal(t, uint64(100), next)
}
33 changes: 26 additions & 7 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
// The passed bounds must be sorted before calling this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
bounds: bounds,
storage: newStorage(len(bounds) + 1),
trackers: make([]nextTracker, len(bounds)+1),
}
}

Expand All @@ -43,6 +44,8 @@ type HistogramReservoir struct {
reservoir.ConcurrentSafe
*storage

trackers []nextTracker

// bounds are bucket bounds in ascending order.
bounds []float64
}
Expand All @@ -68,11 +71,27 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
default:
panic("unknown value type")
}

idx := sort.SearchFloat64s(r.bounds, n)
m := newMeasurement(ctx, t, v, a)

r.mu.Lock()
defer r.mu.Unlock()
r.store(idx, m)
count, next := r.trackers[idx].incrementCount()
if count == 0 || count == next {
r.store(idx, newMeasurement(ctx, t, v, a))
r.trackers[idx].wMu.Lock()
defer r.trackers[idx].wMu.Unlock()
r.trackers[idx].advance()
}
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
for i := range r.trackers {
r.trackers[i].reset()
}
}
Loading