Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reservoir timers #171

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
72bc838
prototype a simple reservoir for timers
maciuszek Jan 8, 2025
f67aca4
clarify comment
maciuszek Jan 8, 2025
5cc34d9
Move s.timerCount inside timer.Range
bshramin Jan 8, 2025
f7fc55e
fix test messages
maciuszek Jan 8, 2025
4a0662a
fix spelling
maciuszek Jan 8, 2025
f35471d
Merge remote-tracking branch 'origin/master' into mattkuzminski/add-t…
maciuszek Jan 10, 2025
57ef42b
re-design with timer reservoirs correctly independent per mertic
maciuszek Jan 10, 2025
4610f55
add some more todos
maciuszek Jan 10, 2025
cc908b5
clean up redundant code
maciuszek Jan 10, 2025
b1a2def
some more clean up
maciuszek Jan 11, 2025
8eb942d
address todos
maciuszek Jan 13, 2025
5dd8757
fix comment
maciuszek Jan 13, 2025
0d3fb45
ensure memory and flush management for timers
maciuszek Jan 13, 2025
ea5ae6a
optimize reservoirTimer by utilizing a ring buffer
maciuszek Jan 14, 2025
e81d603
correct how we flush reusable timer entries
maciuszek Jan 15, 2025
74a26a1
add test for reused timer map after flushing
maciuszek Jan 15, 2025
6d2687c
correct the ring buffer implementation to utilize bitwise benefits
maciuszek Jan 15, 2025
d067744
improve reservoirTimer property access
maciuszek Jan 15, 2025
7e5a451
make reservoir tests more dynamic
maciuszek Jan 16, 2025
a54db1a
improve comments
maciuszek Jan 16, 2025
18c0e57
optimize reservoir timer flush
maciuszek Jan 16, 2025
bf0ef63
block never flush edge cases when stores are constructed outside of N…
maciuszek Jan 16, 2025
8dad5ed
fix typo in comment
maciuszek Jan 16, 2025
9762152
add test for reservoir automatic flushing
maciuszek Jan 16, 2025
2641924
add test for concurrent reservoir writes and flushing
maciuszek Jan 16, 2025
858a3fd
fix typo in comment
maciuszek Jan 16, 2025
4e9611d
protect writes while flushing
maciuszek Jan 16, 2025
70cc61c
dont export controls that can result in a deadlock or datarace
maciuszek Jan 16, 2025
b352a4f
add critical optimization todo
maciuszek Jan 16, 2025
ef8cf0b
simplify reservoir processing
maciuszek Jan 17, 2025
8cceead
unexport RingSize and document immutability
maciuszek Jan 17, 2025
4cae9ac
print to stdout for testing
maciuszek Jan 17, 2025
7fe2893
improve test logging
maciuszek Jan 17, 2025
c2738fa
temporarily make logging a bit better at the sacrifice of performance
maciuszek Jan 20, 2025
236e2cc
remove test logging
maciuszek Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions logging_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge

func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) }

func (s *loggingSink) FlushSampledTimer(name string, value, _ float64) {
s.FlushTimer(name, value)
}

func (s *loggingSink) Flush() { s.log("", "all stats", 0) }

// Logger
Expand Down
6 changes: 6 additions & 0 deletions mock/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (s *Sink) FlushTimer(name string, val float64) {
atomic.AddInt64(&p.count, 1)
}

// FlushSampledTimer implements the stats.Sink.FlushSampledTimer method and adds val to
// stat name.
func (s *Sink) FlushSampledTimer(name string, val, _ float64) {
s.FlushTimer(name, val)
}

// LoadCounter returns the value for stat name and if it was found.
func (s *Sink) LoadCounter(name string) (uint64, bool) {
v, ok := s.counters().Load(name)
Expand Down
36 changes: 35 additions & 1 deletion net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,36 @@ func (s *netSink) FlushTimer(name string, value float64) {
}
}

func (s *netSink) FlushSampledTimer(name string, value, sampleRate float64) {
timerSuffix := "|ms"
sampleSuffix := "|@"
metricSuffix := "\n"

// todo: see if we can dedup code here without the expense of efficiency
var writeValue func(*buffer)
if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value {
writeValue = func(b *buffer) { b.WriteUnit64(uint64(value)) }
} else {
writeValue = func(b *buffer) { b.WriteFloat64(value) }
}

b := pbFree.Get().(*buffer)

b.WriteString(name)
b.WriteChar(':')
writeValue(b)
b.WriteString(timerSuffix)

b.WriteString(sampleSuffix)
b.writeFloat64WithPrecision(sampleRate, 2) // todo: deteremine how many decimal places we need
b.WriteString(metricSuffix)

s.writeBuffer(b)

b.Reset()
pbFree.Put(b)
}

func (s *netSink) run() {
addr := net.JoinHostPort(s.conf.StatsdHost, strconv.Itoa(s.conf.StatsdPort))

Expand Down Expand Up @@ -417,5 +447,9 @@ func (b *buffer) WriteUnit64(val uint64) {
}

func (b *buffer) WriteFloat64(val float64) {
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
b.writeFloat64WithPrecision(val, 6)
}

func (b *buffer) writeFloat64WithPrecision(val float64, precision int) {
*b = strconv.AppendFloat(*b, val, 'f', precision, 64)
}
6 changes: 6 additions & 0 deletions net_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (s *testStatSink) FlushTimer(name string, value float64) {
s.Unlock()
}

func (s *testStatSink) FlushSampledTimer(name string, value, sampleRate float64) {
s.Lock()
s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate)
s.Unlock()
}

func TestCreateTimer(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)
Expand Down
8 changes: 8 additions & 0 deletions net_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ func (s *netTestSink) String() string {
return str
}

func (s *netTestSink) Pull() string {
s.mu.Lock()
str := s.buf.String()
s.buf.Reset()
s.mu.Unlock()
return str
}

func (s *netTestSink) Host(t testing.TB) string {
t.Helper()
host, _, err := net.SplitHostPort(s.conn.Address().String())
Expand Down
2 changes: 2 additions & 0 deletions null_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive

func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive

func (s nullSink) FlushSampledTimer(name string, value, sampleRate float64) {} //nolint:revive

func (s nullSink) Flush() {}
17 changes: 17 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
// DefaultUseReservoirTimer defines if all timers should be reservoir timers by default.
DefaultUseReservoirTimer = false
// FixedTimerReservoirSize is the max capacity of the reservoir for reservoir timers.
// note: needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128
// todo: see if it's worth an efficiency trade off to reduce tech debt of this magic number and allowing any number.
// we could determine the difference between the defined size and next power of two
// and use that to offset the counter when ANDing it against the mask,
// once the result is 0 we just increment offset by "original offset"
FixedTimerReservoirSize = 128
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -38,6 +47,8 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
// Make all timers reservoir timers with implied sampling under flush interval of FlushIntervalS
UseReservoirTimer bool `envconfig:"GOSTATS_USE_RESERVOIR_TIMER" default:"false"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -84,6 +95,7 @@ func envBool(key string, def bool) (bool, error) {
}

// GetSettings returns the Settings gostats will run with.
// todo: can we optimize this by storing the result for subsequent calls
func GetSettings() Settings {
useStatsd, err := envBool("USE_STATSD", DefaultUseStatsd)
if err != nil {
Expand All @@ -101,13 +113,18 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
useReservoirTimer, err := envBool("GOSTATS_USE_RESERVOIR_TIMER", DefaultUseReservoirTimer)
if err != nil {
panic(err)
}
return Settings{
UseStatsd: useStatsd,
StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost),
StatsdProtocol: envOr("STATSD_PROTOCOL", DefaultStatsdProtocol),
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
UseReservoirTimer: useReservoirTimer,
}
}

Expand Down
1 change: 1 addition & 0 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Sink interface {
FlushCounter(name string, value uint64)
FlushGauge(name string, value uint64)
FlushTimer(name string, value float64)
FlushSampledTimer(name string, value, sampleRate float64)
}

// FlushableSink is an extension of Sink that provides a Flush() function that
Expand Down
163 changes: 146 additions & 17 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,34 @@ type StatGenerator interface {
// NewStore returns an Empty store that flushes to Sink passed as an argument.
// Note: the export argument is unused.
func NewStore(sink Sink, _ bool) Store {
return &statStore{sink: sink}
return &statStore{
sink: sink,
timerType: standard,
}
}

func newStatStore(sink Sink, export bool, conf Settings) *statStore {
store := NewStore(sink, export).(*statStore)
if conf.UseReservoirTimer {
store.timerType = reservoir
}
return store
}

// NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer.
// note: this is the only way to use reservoir timers as they rely on the store flush loop
func NewDefaultStore() Store {
var newStore Store
settings := GetSettings()
if !settings.UseStatsd {
if settings.LoggingSinkDisabled {
newStore = NewStore(NewNullSink(), false)
newStore = newStatStore(NewNullSink(), false, settings)
} else {
newStore = NewStore(NewLoggingSink(), false)
newStore = newStatStore(NewLoggingSink(), false, settings)
}
go newStore.Start(time.NewTicker(10 * time.Second))
} else {
newStore = NewStore(NewTCPStatsdSink(), false)
newStore = newStatStore(NewTCPStatsdSink(), false, settings)
go newStore.Start(time.NewTicker(time.Duration(settings.FlushIntervalS) * time.Second))
}
return newStore
Expand Down Expand Up @@ -298,30 +310,104 @@ func (c *gauge) Value() uint64 {
return atomic.LoadUint64(&c.value)
}

type timer struct {
type timerType int

const (
standard timerType = iota
reservoir
)

type timer interface {
time(time.Duration)
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
Empty() ([]float64, uint64)
}

type standardTimer struct {
base time.Duration
name string
sink Sink
}

func (t *timer) time(dur time.Duration) {
func (t *standardTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *timer) AddDuration(dur time.Duration) {
func (t *standardTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *timer) AddValue(value float64) {
func (t *standardTimer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value)
}

func (t *timer) AllocateSpan() Timespan {
func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

// values are not collected for this timer
func (t *standardTimer) Empty() ([]float64, uint64) {
return nil, 0
}

type reservoirTimer struct {
mu sync.Mutex
base time.Duration
name string
ringSize uint64 // immutable
ringMask uint64 // immutable
values []float64
count uint64
}

func (t *reservoirTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *reservoirTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

t.values[t.count&t.ringMask] = value
t.count++
}

func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

// resets the reservoir returning all valeus collected and a count of the total inflow,
// including what exceeded the collection size and was dropped
func (t *reservoirTimer) Empty() ([]float64, uint64) {
t.mu.Lock()
defer t.mu.Unlock()

count := t.count

var accumulation uint64
if count > t.ringSize {
accumulation = t.ringSize
} else {
accumulation = count
}

// make a copy to avoid data races
values := make([]float64, accumulation)
copy(values, t.values[:accumulation]) // since the slice memory is reused only copy what we accumulated in the current processing itteration

t.count = 0 // new values can start being written to the slice

return values, count
}

type timespan struct {
timer *timer
timer timer
start time.Time
}

Expand All @@ -336,9 +422,11 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
}

type statStore struct {
counters sync.Map
gauges sync.Map
timers sync.Map
// these maps may grow indefinitely however slots in this maps are reused as stats names are stable over the lifetime of the process
counters sync.Map
gauges sync.Map
timers sync.Map
timerType timerType

mu sync.RWMutex
statGenerators []StatGenerator
Expand Down Expand Up @@ -393,6 +481,26 @@ func (s *statStore) Flush() {
return true
})

s.timers.Range(func(key, v interface{}) bool {
if timer, ok := v.(*reservoirTimer); ok {
values, count := timer.Empty()
reservoirSize := timer.ringSize // assuming this is immutable

var sampleRate float64
if count <= reservoirSize {
sampleRate = 1.0
} else {
sampleRate = float64(reservoirSize) / float64(count)
}

for _, value := range values {
s.sink.FlushSampledTimer(key.(string), value, sampleRate)
}
}

return true
})

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
Expand Down Expand Up @@ -490,14 +598,35 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau
return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags))
}

func (s *statStore) newTimer(serializedName string, base time.Duration) *timer {
func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
if v, ok := s.timers.Load(serializedName); ok {
return v.(*timer)
return v.(timer)
}
t := &timer{name: serializedName, sink: s.sink, base: base}

var t timer
switch s.timerType {
case reservoir:
t = &reservoirTimer{
name: serializedName,
base: base,
ringSize: FixedTimerReservoirSize,
ringMask: FixedTimerReservoirSize - 1,
values: make([]float64, FixedTimerReservoirSize),
}
case standard: // this should allow backward compatible a backwards compatible fallback as standard is the zero value of s.timerType
fallthrough
default:
t = &standardTimer{
name: serializedName,
sink: s.sink,
base: base,
}
}

if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(*timer)
return v.(timer)
}

return t
}

Expand Down
Loading
Loading