Skip to content

Commit

Permalink
optimize: improve breaker algorithm on recovery time
Browse files Browse the repository at this point in the history
  • Loading branch information
kevwan committed Apr 16, 2024
1 parent 164f5aa commit f9775b6
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 118 deletions.
48 changes: 48 additions & 0 deletions core/breaker/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package breaker

const (
success = iota
fail
drop
)

// bucket defines the bucket that holds sum and num of additions.
type bucket struct {
Sum int64
Success int64
Failure int64
Drop int64
}

func (b *bucket) Add(v int64) {
switch v {
case fail:
b.fail()
case drop:
b.drop()
default:
b.succeed()
}
}

func (b *bucket) Reset() {
b.Sum = 0
b.Success = 0
b.Failure = 0
b.Drop = 0
}

func (b *bucket) drop() {
b.Sum++
b.Drop++
}

func (b *bucket) fail() {
b.Sum++
b.Failure++
}

func (b *bucket) succeed() {
b.Sum++
b.Success++
}
43 changes: 43 additions & 0 deletions core/breaker/bucket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package breaker

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBucketAdd(t *testing.T) {
b := &bucket{}

// Test succeed
b.Add(0) // Using 0 for success
assert.Equal(t, int64(1), b.Sum, "Sum should be incremented")
assert.Equal(t, int64(1), b.Success, "Success should be incremented")
assert.Equal(t, int64(0), b.Failure, "Failure should not be incremented")
assert.Equal(t, int64(0), b.Drop, "Drop should not be incremented")

// Test failure
b.Add(fail)
assert.Equal(t, int64(2), b.Sum, "Sum should be incremented")
assert.Equal(t, int64(1), b.Failure, "Failure should be incremented")
assert.Equal(t, int64(0), b.Drop, "Drop should not be incremented")

// Test drop
b.Add(drop)
assert.Equal(t, int64(3), b.Sum, "Sum should be incremented")
assert.Equal(t, int64(1), b.Drop, "Drop should be incremented")
}

func TestBucketReset(t *testing.T) {
b := &bucket{
Sum: 3,
Success: 1,
Failure: 1,
Drop: 1,
}
b.Reset()
assert.Equal(t, int64(0), b.Sum, "Sum should be reset to 0")
assert.Equal(t, int64(0), b.Success, "Success should be reset to 0")
assert.Equal(t, int64(0), b.Failure, "Failure should be reset to 0")
assert.Equal(t, int64(0), b.Drop, "Drop should be reset to 0")
}
89 changes: 62 additions & 27 deletions core/breaker/googlebreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,38 @@ import (

const (
// 250ms for bucket duration
window = time.Second * 10
buckets = 40
k = 1.5
protection = 5
window = time.Second * 10
buckets = 40
maxFailBucketsToDecreaseK = 30
minSuccessBucketsToRecover = 3
k = 1.5
minK = 1.1
recoveryK = 3 - k
protection = 5
)

// googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct {
k float64
stat *collection.RollingWindow
proba *mathx.Proba
}
type (
googleBreaker struct {
k float64
stat *collection.RollingWindow[int64, *bucket]
proba *mathx.Proba
}

windowResult struct {
accepts int64
total int64
failingBuckets int64
workingBuckets int64
}
)

func newGoogleBreaker() *googleBreaker {
bucketDuration := time.Duration(int64(window) / int64(buckets))
st := collection.NewRollingWindow(buckets, bucketDuration)
st := collection.NewRollingWindow[int64, *bucket](func() *bucket {
return new(bucket)
}, buckets, bucketDuration)
return &googleBreaker{
stat: st,
k: k,
Expand All @@ -34,15 +49,22 @@ func newGoogleBreaker() *googleBreaker {
}

func (b *googleBreaker) accept() error {
accepts, total := b.history()
weightedAccepts := b.k * float64(accepts)
history := b.history()
w := mathx.AtLeast(b.k-float64(history.failingBuckets-1)*(b.k-minK)/maxFailBucketsToDecreaseK, minK)
weightedAccepts := w * float64(history.accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
// for better performance, no need to care about negative ratio
dropRatio := (float64(total-protection) - weightedAccepts) / float64(total+1)
// for better performance, no need to care about the negative ratio
dropRatio := (float64(history.total-protection) - weightedAccepts) / float64(history.total+1)
if dropRatio <= 0 {
return nil
}

// If we have more than 2 working buckets, we are in recovery mode,
// the latest bucket is the current one, so we ignore it.
if history.workingBuckets >= minSuccessBucketsToRecover {
dropRatio /= recoveryK

Check warning on line 65 in core/breaker/googlebreaker.go

View check run for this annotation

Codecov / codecov/patch

core/breaker/googlebreaker.go#L65

Added line #L65 was not covered by tests
}

if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
Expand All @@ -52,7 +74,7 @@ func (b *googleBreaker) accept() error {

func (b *googleBreaker) allow() (internalPromise, error) {
if err := b.accept(); err != nil {
b.markFailure()
b.markDrop()
return nil, err
}

Expand All @@ -63,18 +85,18 @@ func (b *googleBreaker) allow() (internalPromise, error) {

func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Acceptable) error {
if err := b.accept(); err != nil {
b.markFailure()
b.markDrop()
if fallback != nil {
return fallback(err)
}

return err
}

var success bool
var succ bool
defer func() {
// if req() panic, success is false, mark as failure
if success {
if succ {
b.markSuccess()
} else {
b.markFailure()
Expand All @@ -83,27 +105,40 @@ func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Ac

err := req()
if acceptable(err) {
success = true
succ = true
}

return err
}

func (b *googleBreaker) markSuccess() {
b.stat.Add(1)
func (b *googleBreaker) markDrop() {
b.stat.Add(drop)
}

func (b *googleBreaker) markFailure() {
b.stat.Add(0)
b.stat.Add(fail)
}

func (b *googleBreaker) markSuccess() {
b.stat.Add(success)
}

func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts += int64(b.Sum)
total += b.Count
func (b *googleBreaker) history() windowResult {
var result windowResult

b.stat.Reduce(func(b *bucket) {
result.accepts += b.Success
result.total += b.Sum
if b.Failure > 0 {
result.workingBuckets = 0
result.failingBuckets++
} else if b.Success > 0 {
result.workingBuckets++
result.failingBuckets = 0
}
})

return
return result
}

type googlePromise struct {
Expand Down
39 changes: 19 additions & 20 deletions core/breaker/googlebreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func init() {
}

func getGoogleBreaker() *googleBreaker {
st := collection.NewRollingWindow(testBuckets, testInterval)
st := collection.NewRollingWindow[int64, *bucket](func() *bucket {
return new(bucket)
}, testBuckets, testInterval)
return &googleBreaker{
stat: st,
k: 5,
Expand Down Expand Up @@ -164,41 +166,38 @@ func TestGoogleBreakerSelfProtection(t *testing.T) {
}

func TestGoogleBreakerHistory(t *testing.T) {
var b *googleBreaker
var accepts, total int64

sleep := testInterval
t.Run("accepts == total", func(t *testing.T) {
b = getGoogleBreaker()
b := getGoogleBreaker()
markSuccessWithDuration(b, 10, sleep/2)
accepts, total = b.history()
assert.Equal(t, int64(10), accepts)
assert.Equal(t, int64(10), total)
result := b.history()
assert.Equal(t, int64(10), result.accepts)
assert.Equal(t, int64(10), result.total)
})

t.Run("fail == total", func(t *testing.T) {
b = getGoogleBreaker()
b := getGoogleBreaker()
markFailedWithDuration(b, 10, sleep/2)
accepts, total = b.history()
assert.Equal(t, int64(0), accepts)
assert.Equal(t, int64(10), total)
result := b.history()
assert.Equal(t, int64(0), result.accepts)
assert.Equal(t, int64(10), result.total)
})

t.Run("accepts = 1/2 * total, fail = 1/2 * total", func(t *testing.T) {
b = getGoogleBreaker()
b := getGoogleBreaker()
markFailedWithDuration(b, 5, sleep/2)
markSuccessWithDuration(b, 5, sleep/2)
accepts, total = b.history()
assert.Equal(t, int64(5), accepts)
assert.Equal(t, int64(10), total)
result := b.history()
assert.Equal(t, int64(5), result.accepts)
assert.Equal(t, int64(10), result.total)
})

t.Run("auto reset rolling counter", func(t *testing.T) {
b = getGoogleBreaker()
b := getGoogleBreaker()
time.Sleep(testInterval * testBuckets)
accepts, total = b.history()
assert.Equal(t, int64(0), accepts)
assert.Equal(t, int64(0), total)
result := b.history()
assert.Equal(t, int64(0), result.accepts)
assert.Equal(t, int64(0), result.total)
})
}

Expand Down
Loading

0 comments on commit f9775b6

Please sign in to comment.