From 3edd5a7e2d0bc618911192fcad209e8cc80de497 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 22 Jun 2022 17:01:19 +0200 Subject: [PATCH] admissionControl is a load shedding filter, that works by observing backend errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit admissionControl(metricsSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, exponent) admissionControl("myapp", "active", "1s", 5, 10, 0.9, 0.95, 2.0) metricSuffix is the suffix key to expose reject counter, should be unique by filter instance mode is one of "active", "inactive", "log" windowSize is within [minWindowSize, maxWindowSize] minRPS successThreshold is within (0,1] and sets the lowest request success rate at which the filter will not reject requests. maxRejectProbability is within (0,1] and sets the upper bound of reject probability. exponent >0, 1: linear, 1/2: qudratic, .. see also https://opensource.zalando.com/skipper/reference/filters/#admissioncontrol Signed-off-by: Sandor Szücs --- docs/reference/filters.md | 82 ++++++- filters/filters.go | 4 +- filters/shedder/admission.go | 384 ++++++++++++++++++++++++++++++ filters/shedder/admission_test.go | 196 +++++++++++++++ skipper.go | 4 + 5 files changed, 665 insertions(+), 5 deletions(-) create mode 100644 filters/shedder/admission.go create mode 100644 filters/shedder/admission_test.go diff --git a/docs/reference/filters.md b/docs/reference/filters.md index a1cb4a34ea..9cd1065820 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -372,8 +372,8 @@ no-compression, 1 means best-speed and 11 means best-compression. Example: ``` The filter also checks the incoming request, if it accepts the supported encodings, -explicitly stated in the Accept-Encoding header. -The filter currently supports by default `gzip`, `deflate` and `br` (can be overridden with flag `compress-encodings`). +explicitly stated in the Accept-Encoding header. +The filter currently supports by default `gzip`, `deflate` and `br` (can be overridden with flag `compress-encodings`). It does not assume that the client accepts any encoding if the Accept-Encoding header is not set. It ignores * in the Accept-Encoding header. @@ -1848,6 +1848,80 @@ Path("/cheap") -> clusterLeakyBucketRatelimit("user-${request.cookie.Authori Path("/expensive") -> clusterLeakyBucketRatelimit("user-${request.cookie.Authorization}", 1, "1s", 5, 2) -> ... ``` +## shedder + +The basic idea of load shedding is to reduce errors by early stopping +partial of the ingress requests that create too much load and serve +the maximum throughput the system can do. + +There is a great talk by [Acacio Cruz from +Google](https://www.youtube.com/watch?v=XNEIkivvaV4&feature=youtu.be) +that explains the basic principles. + +### admissionControl + +Implements an admission control filter, that rejects traffic by +observed error rate and probability. + +The probability of rejection is calculated by the following equation: + +$$ P_{reject} = ( { n_{total} - { n_{success} \over threshold } \over n_{total} + 1} )^{ exponent } $$ + +Examples: + + admissionControl(metricSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, exponent) + admissionControl("myapp", "active", "1s", 5, 10, 0.95, 0.9, 0.5) + +Parameters: + +* metric suffix (string) +* mode (enum) +* d (time.Duration) +* window size (int) +* min requests (int) +* success threshold (float64) +* max reject probability (float64) +* exponent (float64) + +Metric suffix is the chosen suffix key to expose reject counter, +should be unique by filter instance + +Mode has 3 different possible values and defaults to "inactive": + +* "active" will reject traffic +* "inactive" will never reject traffic (default) +* "logInactive" will not reject traffic, but log to debug filter settings + +D the time duration of a single slot for required counters in our +circular buffer of window size and default to 1s. + +Window size is the size of the circular buffer. It is used to snapshot +counters to calculate total requests and number of success. It is +within $[1, 100]$ and defaults to 10. + +Min requests is the minimum requests per $windowSize * d$ that have to pass this filter +otherwise it will not reject traffic. + +Success threshold sets the lowest request success rate at which the +filter will not reject requests. It is within $(0,1]$ and defaults to +0.95, which means an error rate of lower than 5% will not trigger +rejects. + +Max reject probability sets the upper bound of reject probability. It +is within (0,1] and defaults to 0.95, which means if backend errors +with 100% it will only reject up to 95%. + +exponent is used to dictate the rejection probability. The +calculation is done by $p = p^{exponent}$ +The exponent value is within $(0,\infty]$, to increase rejection +probability you have to use values lower than 1: + +* 1: linear (default) +* 1/2: quadratic +* 1/3: cubic + +!!! This filter is experimental and defaults are likely to change + ## lua See [the scripts page](scripts.md) @@ -2557,7 +2631,7 @@ fadeIn("3m", 1.5) #### Warning on fadeIn and Rolling Restarts -Traffic fade-in has the potential to skew the traffic to your backend pods in case of a rolling restart +Traffic fade-in has the potential to skew the traffic to your backend pods in case of a rolling restart (`kubectl rollout restart`), because it is very likely that the rolling restart is going faster than the fade-in duration. The image below shows an example of a rolling restart for a four-pod deployment (A, B, C, D) into (E, F, G, H), and the traffic share of each pod over time. While the ramp-up of the new pods is ongoing, @@ -2565,7 +2639,7 @@ the remaining old pods will receive a largely increased traffic share (especiall example), as well as an over-propotional traffic share for the first pod in the rollout (E). To make rolling restarts safe, you need to slow them down by setting `spec.minReadySeconds` on the pod spec -of your deployment or stackset, according to your fadeIn duration. +of your deployment or stackset, according to your fadeIn duration. ![Rolling Restart and Fade-In](../img/fadein_traffic_skew.png) diff --git a/filters/filters.go b/filters/filters.go index ef805b59f2..52f92163b7 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -2,10 +2,11 @@ package filters import ( "errors" - log "github.com/sirupsen/logrus" "net/http" "time" + log "github.com/sirupsen/logrus" + "github.com/opentracing/opentracing-go" ) @@ -264,6 +265,7 @@ const ( ConsecutiveBreakerName = "consecutiveBreaker" RateBreakerName = "rateBreaker" DisableBreakerName = "disableBreaker" + AdmissionControlName = "admissionControl" ClientRatelimitName = "clientRatelimit" RatelimitName = "ratelimit" ClusterClientRatelimitName = "clusterClientRatelimit" diff --git a/filters/shedder/admission.go b/filters/shedder/admission.go new file mode 100644 index 0000000000..80f2f176cf --- /dev/null +++ b/filters/shedder/admission.go @@ -0,0 +1,384 @@ +package shedder + +import ( + "context" + "math" + "math/rand" + "net/http" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/metrics" + "go.uber.org/atomic" +) + +func getIntArg(a interface{}) (int, error) { + if i, ok := a.(int); ok { + return i, nil + } + + if f, ok := a.(float64); ok { + return int(f), nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +func getDurationArg(a interface{}) (time.Duration, error) { + if s, ok := a.(string); ok { + return time.ParseDuration(s) + } + return 0, filters.ErrInvalidFilterParameters +} + +func getFloat64Arg(a interface{}) (float64, error) { + if f, ok := a.(float64); ok { + return f, nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +func getModeArg(a interface{}) (mode, error) { + s, ok := a.(string) + if !ok { + return 0, filters.ErrInvalidFilterParameters + } + switch s { + case "active": + return active, nil + case "inactive": + return inactive, nil + case "logInactive": + return logInactive, nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +type mode int + +const ( + inactive mode = iota + logInactive + active +) + +func (m mode) String() string { + switch m { + case active: + return "active" + case inactive: + return "inactive" + case logInactive: + return "logInactive" + } + return "unknown" +} + +const ( + counterPrefix = "shedder.admission_control." + admissionControlSpanName = "admission_control" + admissionControlKey = "shedder:admission_control" + defaultDuration = time.Second + defaultMinRps = 10 + defaultWindowSize = 10 + minWindowSize = 1 + maxWindowSize = 100 + defaultThreshold = float64(0.95) + defaultMaxRejectProbability = float64(0.95) + defaultExponent = float64(1) +) + +type Options struct { + Tracer opentracing.Tracer +} + +type admissionControlSpec struct { + tracer opentracing.Tracer +} + +type admissionControl struct { + mu sync.Mutex + quit chan struct{} + closed bool + + metrics metrics.Metrics + metricSuffix string + tracer opentracing.Tracer + + mode mode + windowSize int + minRequests int + d time.Duration + successThreshold float64 // (0,1] + maxRejectProbability float64 // (0,1] + exponent float64 // >0 + + totals []int64 + success []int64 + counter *atomic.Int64 + successCounter *atomic.Int64 +} + +func NewAdmissionControl(o Options) filters.Spec { + tracer := o.Tracer + if tracer == nil { + tracer = &opentracing.NoopTracer{} + } + return &admissionControlSpec{ + tracer: tracer, + } +} + +func (*admissionControlSpec) Name() string { return filters.AdmissionControlName } + +// CreateFilter creates a new admissionControl filter with passed configuration: +// +// admissionControl(metricSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, exponent) +// admissionControl("$app", "active", "1s", 5, 10, 0.1, 0.95, 0.5) +// +// metricSuffix is the suffix key to expose reject counter, should be unique by filter instance +// mode is one of "active", "inactive", "logInactive" +// active will reject traffic +// inactive will never reject traffic +// logInactive will not reject traffic, but log to debug filter settings +// windowSize is within [minWindowSize, maxWindowSize] +// minRPS +// successThreshold is within (0,1] and sets the lowest request success rate at which the filter will not reject requests. +// maxRejectProbability is within (0,1] and sets the upper bound of reject probability. +// exponent >0, 1: linear, 1/2: qudratic, 1/3: cubic, .. +// +// see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#admission-control +func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filter, error) { + var err error + + metricSuffix := "" + if len(args) > 0 { + ok := false + metricSuffix, ok = args[0].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + } + + mode := inactive + if len(args) > 1 { + mode, err = getModeArg(args[1]) + if err != nil { + log.Warnf("mode failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + d := defaultDuration + if len(args) > 2 { + d, err = getDurationArg(args[2]) + if err != nil { + log.Warnf("d failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + windowSize := defaultWindowSize + if len(args) > 3 { + windowSize, err = getIntArg(args[3]) + if err != nil { + log.Warnf("windowsize failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + if windowSize < minWindowSize { + windowSize = minWindowSize + } + if windowSize > maxWindowSize { + windowSize = maxWindowSize + } + } + + minRequests := defaultMinRps + if len(args) > 4 { + minRequests, err = getIntArg(args[4]) + if err != nil { + log.Warnf("minRequests failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + threshold := defaultThreshold + if len(args) > 5 { + threshold, err = getFloat64Arg(args[5]) + if err != nil { + log.Warnf("threshold failed %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + maxRejectProbability := defaultMaxRejectProbability + if len(args) > 6 { + maxRejectProbability, err = getFloat64Arg(args[6]) + if err != nil { + log.Warnf("maxRejectProbability failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + exponent := defaultExponent + if len(args) > 7 { + exponent, err = getFloat64Arg(args[7]) + if err != nil { + log.Warnf("exponent failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + if exponent <= 0.0 { + log.Warn("exponent should be >0") + return nil, filters.ErrInvalidFilterParameters + } + } + + ac := &admissionControl{ + quit: make(chan struct{}), + metrics: metrics.Default, + metricSuffix: metricSuffix, + tracer: spec.tracer, + + mode: mode, + d: d, + windowSize: windowSize, + minRequests: minRequests, + successThreshold: threshold, + maxRejectProbability: maxRejectProbability, + exponent: exponent, + + totals: make([]int64, windowSize), + success: make([]int64, windowSize), + counter: atomic.NewInt64(0), + successCounter: atomic.NewInt64(0), + } + go ac.tickWindows() + return ac, nil +} + +func (ac *admissionControl) Close() { + if !ac.closed { + close(ac.quit) + } +} + +func (ac *admissionControl) tickWindows() { + t := time.NewTicker(ac.d) + i := 0 + for range t.C { + select { + case <-ac.quit: + return + default: + } + val := ac.counter.Swap(0) + ok := ac.successCounter.Swap(0) + + ac.mu.Lock() + ac.totals[i] = val + ac.success[i] = ok + ac.mu.Unlock() + + i = (i + 1) % ac.windowSize + } +} + +func (ac *admissionControl) count() (float64, float64) { + ac.mu.Lock() + defer ac.mu.Unlock() + return float64(sum(ac.totals)), float64(sum(ac.success)) +} + +func sum(a []int64) int64 { + var result int64 + for _, v := range a { + result += v + } + return result +} + +// calculates P_{reject} see https://opensource.zalando.com/skipper/reference/filters/#admissioncontrol +func (ac *admissionControl) pReject() float64 { + var rejectP float64 + + total, success := ac.count() + factor := float64(ac.d) / float64(time.Second) + if total < float64(ac.minRequests)*factor { + return -1 + } + + s := success / ac.successThreshold + if ac.mode == logInactive { + log.Infof("%s: rejectP = (%0.2f - %0.2f) / (%0.2f + 1) --- success: %0.2f and threshold: %0.2f", filters.AdmissionControlName, total, s, total, success, ac.successThreshold) + } + if total < s { + return -1 + } + rejectP = (total - s) / (total + 1) + rejectP = math.Pow(rejectP, ac.exponent) + + rejectP = math.Min(rejectP, ac.maxRejectProbability) + return math.Max(rejectP, 0.0) +} + +func (ac *admissionControl) shouldReject() bool { + p := ac.pReject() // [0, ac.maxRejectProbability] and -1 to disable + /* #nosec */ + r := rand.Float64() // [0,1) + + if ac.mode == logInactive { + log.Infof("%s: p: %0.2f, r: %0.2f", filters.AdmissionControlName, p, r) + } + + return p > r +} + +func (ac *admissionControl) Request(ctx filters.FilterContext) { + span := ac.startSpan(ctx.Request().Context()) + defer span.Finish() + + if ac.shouldReject() { + ac.metrics.IncCounter(counterPrefix + ac.metricSuffix) + ext.Error.Set(span, true) + + ctx.StateBag()[admissionControlKey] = "reject" + + // shadow mode to measure data + if ac.mode != active { + return + } + ctx.Serve(&http.Response{ + StatusCode: http.StatusServiceUnavailable, + }) + } +} + +func (ac *admissionControl) Response(ctx filters.FilterContext) { + // we don't want to count ourselves + if ctx.StateBag()[admissionControlKey] == "reject" { + return + } + + code := ctx.Response().StatusCode + if code < 499 { + ac.successCounter.Inc() + } + ac.counter.Inc() +} + +func (ac *admissionControl) startSpan(ctx context.Context) (span opentracing.Span) { + parent := opentracing.SpanFromContext(ctx) + if parent != nil { + span = ac.tracer.StartSpan(admissionControlSpanName, opentracing.ChildOf(parent.Context())) + ext.Component.Set(span, "skipper") + ext.SpanKind.Set(span, "shedder") + span.SetTag("mode", ac.mode.String()) + } + return +} diff --git a/filters/shedder/admission_test.go b/filters/shedder/admission_test.go new file mode 100644 index 0000000000..742aa2b3b8 --- /dev/null +++ b/filters/shedder/admission_test.go @@ -0,0 +1,196 @@ +package shedder + +import ( + "math/rand" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/net" + "github.com/zalando/skipper/proxy/proxytest" +) + +func TestAdmissionControl(t *testing.T) { + for _, ti := range []struct { + msg string + mode string + d time.Duration + windowsize int + minRequests int + successThreshold float64 + maxrejectprobability float64 + exponent float64 + N float64 // iterations + pBackendErr float64 // [0,1] + pExpectedAdmissionShedding float64 // [0,1] + }{{ + msg: "no error", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + exponent: 1.0, + N: 10000, + pBackendErr: 0.0, + pExpectedAdmissionShedding: 0.0, + }, { + msg: "only errors", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + exponent: 1.0, // 1000.0 + N: 10000, + pBackendErr: 1.0, + pExpectedAdmissionShedding: 0.95, + }, { + msg: "smaller error rate, than threshold won't block", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + exponent: 1.0, + N: 10000, + pBackendErr: 0.01, + pExpectedAdmissionShedding: 0.0, + }, { + msg: "tiny error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.99, + maxrejectprobability: 0.95, + exponent: 1.0, + N: 10000, + pBackendErr: 0.1, + pExpectedAdmissionShedding: 0.1, + }, { + msg: "small error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + exponent: 1.0, + N: 10000, + pBackendErr: 0.2, + pExpectedAdmissionShedding: 0.1, + }, { + msg: "medium error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + exponent: 1.0, + N: 10000, + pBackendErr: 0.5, + pExpectedAdmissionShedding: 0.615, + }, { + msg: "large error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + exponent: 1.0, + N: 10000, + pBackendErr: 0.8, + pExpectedAdmissionShedding: 0.91, + }} { + t.Run(ti.msg, func(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + p := rand.Float64() + if p < ti.pBackendErr { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + })) + + spec := NewAdmissionControl(Options{}) + args := make([]interface{}, 0, 6) + args = append(args, "testmetric", ti.mode, ti.d.String(), ti.windowsize, ti.minRequests, ti.successThreshold, ti.maxrejectprobability, ti.exponent) + _, err := spec.CreateFilter(args) + if err != nil { + t.Logf("args: %+v", args...) + t.Fatalf("error creating filter: %v", err) + return + } + + fr := make(filters.Registry) + fr.Register(spec) + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + + proxy := proxytest.New(fr, r) + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Fatalf("Failed to parse url %s: %v", proxy.URL, err) + } + + client := net.NewClient(net.Options{}) + req, err := http.NewRequest("GET", reqURL.String(), nil) + if err != nil { + t.Error(err) + return + } + + var failBackend, fail, ok, N float64 + // iterations to make sure we have enough traffic + for N < ti.N { + for i := 0; i < 1+(2*ti.windowsize); i++ { + until := time.Now().Add(ti.d) + for { + if time.Now().After(until) { + break + } + N++ + rsp, err := client.Do(req) + if err != nil { + t.Error(err) + } + switch rsp.StatusCode { + case http.StatusInternalServerError: + failBackend += 1 + case http.StatusServiceUnavailable: + fail += 1 + case http.StatusOK: + ok += 1 + default: + t.Logf("Unexpected status code %d %s", rsp.StatusCode, rsp.Status) + } + rsp.Body.Close() + } + } + } + t.Logf("ok: %0.2f, fail: %0.2f, failBackend: %0.2f", ok, fail, failBackend) + + epsilon := 0.05 * N // maybe 5% instead of 0.1% + expectedFails := (N - failBackend) * ti.pExpectedAdmissionShedding + + if expectedFails-epsilon > fail || fail > expectedFails+epsilon { + t.Errorf("Failed to get expected fails should be in: %0.2f < %0.2f < %0.2f", expectedFails-epsilon, fail, expectedFails+epsilon) + } + + // TODO(sszuecs) how to calculate expected oks? + // expectedOKs := N - (N-failBackend)*ti.pExpectedAdmissionShedding + // if ok < expectedOKs-epsilon || expectedOKs+epsilon < ok { + // t.Errorf("Failed to get expected ok should be in: %0.2f < %0.2f < %0.2f", expectedOKs-epsilon, ok, expectedOKs+epsilon) + // } + }) + } +} diff --git a/skipper.go b/skipper.go index e1e0d14cef..059926bfe3 100644 --- a/skipper.go +++ b/skipper.go @@ -35,6 +35,7 @@ import ( "github.com/zalando/skipper/filters/fadein" logfilter "github.com/zalando/skipper/filters/log" ratelimitfilters "github.com/zalando/skipper/filters/ratelimit" + "github.com/zalando/skipper/filters/shedder" "github.com/zalando/skipper/innkeeper" "github.com/zalando/skipper/loadbalancer" "github.com/zalando/skipper/logging" @@ -1365,6 +1366,9 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { o.ApiUsageMonitoringClientKeys, o.ApiUsageMonitoringRealmsTrackingPattern, ), + shedder.NewAdmissionControl(shedder.Options{ + Tracer: tracer, + }), ) var swarmer ratelimit.Swarmer