Skip to content

Commit

Permalink
admissionControl is a load shedding filter, that works by observing b…
Browse files Browse the repository at this point in the history
…ackend errors

    admissionControl(metricsSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, agression)
    admissionControl("myapp", "active", "1s", 5, 10, 0.9, 0.95, 2.0)

metricSuffix is the suffix key to expose reject counter, should be unique by filter instance
mode is one of "active", "inactive", "log"
windowSize is within [minWindowSize, maxWindowSize]
minRPS
successThreshold is within (0,1] and sets the lowest request success rate at which the filter will not reject requests.
maxRejectProbability is within (0,1] and sets the upper bound of reject probability.
aggression >= 1, 1: linear, 2: qudratic, .., see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#aggression

similar to https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
  • Loading branch information
szuecs committed Jun 30, 2022
1 parent ec1f324 commit 5ab2f3a
Show file tree
Hide file tree
Showing 4 changed files with 568 additions and 1 deletion.
4 changes: 3 additions & 1 deletion filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package filters

import (
"errors"
log "github.com/sirupsen/logrus"
"net/http"
"time"

log "github.com/sirupsen/logrus"

"github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -264,6 +265,7 @@ const (
ConsecutiveBreakerName = "consecutiveBreaker"
RateBreakerName = "rateBreaker"
DisableBreakerName = "disableBreaker"
AdmissionControlName = "admissionControl"
ClientRatelimitName = "clientRatelimit"
RatelimitName = "ratelimit"
ClusterClientRatelimitName = "clusterClientRatelimit"
Expand Down
362 changes: 362 additions & 0 deletions filters/shedder/admission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,362 @@
package shedder

import (
"context"
"math"
"math/rand"
"net/http"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/sirupsen/logrus"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/metrics"
"go.uber.org/atomic"
)

func getIntArg(a interface{}) (int, error) {
if i, ok := a.(int); ok {
return i, nil
}

if f, ok := a.(float64); ok {
return int(f), nil
}

return 0, filters.ErrInvalidFilterParameters
}

func getDurationArg(a interface{}) (time.Duration, error) {
if s, ok := a.(string); ok {
return time.ParseDuration(s)
}

i, err := getIntArg(a)
return time.Duration(i) * time.Millisecond, err
}

func getFloat64Arg(a interface{}) (float64, error) {
if f, ok := a.(float64); ok {
return f, nil
}

return 0, filters.ErrInvalidFilterParameters
}

func getModeArg(a interface{}) (mode, error) {
s, ok := a.(string)
if !ok {
return 0, filters.ErrInvalidFilterParameters
}
switch s {
case "active":
return active, nil
case "inactive":
return inactive, nil
case "log":
return log, nil
}

return 0, filters.ErrInvalidFilterParameters
}

type mode int

const (
inactive mode = iota
log
active
)

func (m mode) String() string {
switch m {
case active:
return "active"
case inactive:
return "inactive"
case log:
return "log"
}
return "unknown"
}

const (
counterPrefix = "shedder.admission_control."
admissionControlSpanName = "admission_control"
defaultDuration = time.Second
defaultMinRps = 10
defaultWindowSize = 10
minWindowSize = 1
maxWindowSize = 100
defaultThreshold = float64(1)
defaultMaxRejectProbability = float64(0.95)
defaultAgression = float64(1)
)

type Options struct {
Tracer opentracing.Tracer
}

type admissionControlSpec struct {
tracer opentracing.Tracer
}

type admissionControl struct {
mu sync.Mutex

metrics metrics.Metrics
metricSuffix string
tracer opentracing.Tracer

mode mode
windowSize int
minRps int
d time.Duration // example: 1s
successThreshold float64 // (0,1]
maxRejectProbability float64 // (0,1]
agression float64 // >= 1

totals []int64 // example: rps counters
success []int64 // example: rps counters
counter *atomic.Int64
successCounter *atomic.Int64
}

func NewAdmissionControl(o Options) filters.Spec {
tracer := o.Tracer
if tracer == nil {
tracer = &opentracing.NoopTracer{}
}
return &admissionControlSpec{
tracer: tracer,
}
}

func (*admissionControlSpec) Name() string { return filters.AdmissionControlName }

// CreateFilter creates a new admissionControl filter with passed configuration:
//
// admissionControl(metricSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, agression)
// admissionControl("$app", "active", "1s", 5, 10, 0.1, 0.95, 2.0)
//
// metricSuffix is the suffix key to expose reject counter, should be unique by filter instance
// mode is one of "active", "inactive", "log"
// active will reject traffic
// inactive will never reject traffic
// log will not reject traffic, but log to debug filter settings
// windowSize is within [minWindowSize, maxWindowSize]
// minRPS
// successThreshold is within (0,1] and sets the lowest request success rate at which the filter will not reject requests.
// maxRejectProbability is within (0,1] and sets the upper bound of reject probability.
// aggression >= 1, 1: linear, 2: qudratic, .., see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#aggression
//
// see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#admission-control
func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
var err error

metricSuffix := ""
if len(args) > 0 {
ok := false
metricSuffix, ok = args[0].(string)
if !ok {
return nil, filters.ErrInvalidFilterParameters
}
}

mode := inactive
if len(args) > 1 {
mode, err = getModeArg(args[1])
if err != nil {
logrus.Warnf("mode failed: %v", err)
return nil, filters.ErrInvalidFilterParameters
}
}

d := defaultDuration
if len(args) > 2 {
d, err = getDurationArg(args[2])
if err != nil {
logrus.Warnf("d failed: %v", err)
return nil, filters.ErrInvalidFilterParameters
}
}

windowSize := defaultWindowSize
if len(args) > 3 {
windowSize, err = getIntArg(args[3])
if err != nil {
logrus.Warnf("windowsize failed: %v", err)
return nil, filters.ErrInvalidFilterParameters
}
if windowSize < minWindowSize {
windowSize = minWindowSize
}
if windowSize > maxWindowSize {
windowSize = maxWindowSize
}
}

minRps := defaultMinRps
if len(args) > 4 {
minRps, err = getIntArg(args[4])
if err != nil {
logrus.Warnf("minRps failed: %v", err)
return nil, filters.ErrInvalidFilterParameters
}
}

threshold := defaultThreshold
if len(args) > 5 {
threshold, err = getFloat64Arg(args[5])
if err != nil {
logrus.Warnf("threshold failed %v", err)
return nil, filters.ErrInvalidFilterParameters
}
}

maxRejectProbability := defaultMaxRejectProbability
if len(args) > 6 {
maxRejectProbability, err = getFloat64Arg(args[6])
if err != nil {
logrus.Warnf("maxRejectProbability failed: %v", err)
return nil, filters.ErrInvalidFilterParameters
}
}

agression := defaultAgression
if len(args) > 7 {
agression, err = getFloat64Arg(args[7])
if err != nil {
logrus.Warnf("agression failed: %v", err)
return nil, filters.ErrInvalidFilterParameters
}
}

ac := &admissionControl{
metrics: metrics.Default,
metricSuffix: metricSuffix,
tracer: spec.tracer,

mode: mode,
d: d,
windowSize: windowSize,
minRps: minRps,
successThreshold: threshold,
maxRejectProbability: maxRejectProbability,
agression: agression,

totals: make([]int64, windowSize),
success: make([]int64, windowSize),
counter: atomic.NewInt64(0),
successCounter: atomic.NewInt64(0),
}
go ac.tickWindows()
return ac, nil
}

func (ac *admissionControl) tickWindows() {
t := time.NewTicker(ac.d)
i := 0
for range t.C {
val := ac.counter.Swap(0)
ok := ac.successCounter.Swap(0)

ac.mu.Lock()
ac.totals[i] = val
ac.success[i] = ok
ac.mu.Unlock()

i = (i + 1) % ac.windowSize
}
}

func (ac *admissionControl) count() (float64, float64) {
ac.mu.Lock()
defer ac.mu.Unlock()
return float64(sum(ac.totals)), float64(sum(ac.success))
}

func sum(a []int64) int64 {
var result int64
for _, v := range a {
result += v
}
return result
}

// calculates P_{reject} see https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#overview
func (ac *admissionControl) pReject() float64 {
var rejectP float64

total, success := ac.count()
factor := float64(ac.d) / float64(time.Second)
if total < float64(ac.minRps)*factor {
return -1
}

s := success / ac.successThreshold
if ac.mode == log {
logrus.Infof("rejectP = (%0.2f - %0.2f) / (%0.2f + 1) --- success: %0.2f and threshold: %0.2f", total, s, total, success, ac.successThreshold)
}
if total < s {
return -1
}
rejectP = (total - s) / (total + 1)
rejectP = math.Pow(rejectP, 1/ac.agression)

rejectP = math.Min(rejectP, ac.maxRejectProbability)
return math.Max(rejectP, 0.0)
}

func (ac *admissionControl) Request(ctx filters.FilterContext) {
span := ac.startSpan(ctx.Request().Context())
defer span.Finish()

p := ac.pReject() // [0, ac.maxRejectProbability] and -1 to disable
/* #nosec */
r := rand.Float64() // [0,1)

if ac.mode == log {
logrus.Infof("p: %0.2f, r: %0.2f", p, r)
}

if p > r {
ac.metrics.IncCounter(counterPrefix + ac.metricSuffix)
ext.Error.Set(span, true)

ctx.StateBag()["Admission-Control"] = "reject"

// shadow mode to measure data
if ac.mode != active {
return
}
ctx.Serve(&http.Response{
StatusCode: http.StatusServiceUnavailable,
})
}
}

func (ac *admissionControl) Response(ctx filters.FilterContext) {
// we don't want to count ourselves
if ctx.StateBag()["Admission-Control"] == "reject" {
return
}

code := ctx.Response().StatusCode
if code < 499 {
ac.successCounter.Inc()
}
ac.counter.Inc()
}

func (ac *admissionControl) startSpan(ctx context.Context) (span opentracing.Span) {
parent := opentracing.SpanFromContext(ctx)
if parent != nil {
span = ac.tracer.StartSpan(admissionControlSpanName, opentracing.ChildOf(parent.Context()))
ext.Component.Set(span, "skipper")
ext.SpanKind.Set(span, "shedder")
}
return
}
Loading

0 comments on commit 5ab2f3a

Please sign in to comment.