Skip to content

Commit

Permalink
Optimizes ewma to reduce lock contention
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralph Caraveo committed Apr 5, 2018
1 parent 0201454 commit b08b742
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 13 deletions.
46 changes: 33 additions & 13 deletions ewma.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,15 @@ func (NilEWMA) Update(n int64) {}
type StandardEWMA struct {
uncounted int64 // /!\ this should be the first member to ensure 64-bit alignment
alpha float64
rate float64
init bool
rate uint64
init uint32
mutex sync.Mutex
}

// Rate returns the moving average rate of events per second.
func (a *StandardEWMA) Rate() float64 {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.rate * float64(1e9)
currentRate := math.Float64frombits(atomic.LoadUint64(&a.rate)) * float64(1e9)
return currentRate
}

// Snapshot returns a read-only copy of the EWMA.
Expand All @@ -99,17 +98,38 @@ func (a *StandardEWMA) Snapshot() EWMA {
// Tick ticks the clock to update the moving average. It assumes it is called
// every five seconds.
func (a *StandardEWMA) Tick() {
// Optimization to avoid mutex locking in the hot-path.
if atomic.LoadUint32(&a.init) == 1 {
a.updateRate(a.fetchInstantRate())
} else {
// Slow-path: this is only needed on the first Tick() and preserves transactional updating
// of init and rate in the else block. The first conditional is needed below because
// a different thread could have set a.init = 1 between the time of the first atomic load and when
// the lock was acquired.
a.mutex.Lock()
if atomic.LoadUint32(&a.init) == 1 {
// The fetchInstantRate() uses atomic loading, which is unecessary in this critical section
// but again, this section is only invoked on the first successful Tick() operation.
a.updateRate(a.fetchInstantRate())
} else {
atomic.StoreUint32(&a.init, 1)
atomic.StoreUint64(&a.rate, math.Float64bits(a.fetchInstantRate()))
}
a.mutex.Unlock()
}
}

func (a *StandardEWMA) fetchInstantRate() float64 {
count := atomic.LoadInt64(&a.uncounted)
atomic.AddInt64(&a.uncounted, -count)
instantRate := float64(count) / float64(5e9)
a.mutex.Lock()
defer a.mutex.Unlock()
if a.init {
a.rate += a.alpha * (instantRate - a.rate)
} else {
a.init = true
a.rate = instantRate
}
return instantRate
}

func (a *StandardEWMA) updateRate(instantRate float64) {
currentRate := math.Float64frombits(atomic.LoadUint64(&a.rate))
currentRate += a.alpha * (instantRate - currentRate)
atomic.StoreUint64(&a.rate, math.Float64bits(currentRate))
}

// Update adds n uncounted events.
Expand Down
12 changes: 12 additions & 0 deletions ewma_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ func BenchmarkEWMA(b *testing.B) {
}
}

func BenchmarkEWMAParallel(b *testing.B) {
a := NewEWMA1()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Update(1)
a.Tick()
}
})
}

func TestEWMA1(t *testing.T) {
a := NewEWMA1()
a.Update(3)
Expand Down

0 comments on commit b08b742

Please sign in to comment.