From ce6eeea146810a96776a3156b2e7d14420432945 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 22 Jun 2022 17:01:19 +0200 Subject: [PATCH] admissionControl is a load shedding filter, that works by observing backend errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit admissionControl(metricsSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, agression) admissionControl("myapp", "active", "1s", 5, 10, 0.9, 0.95, 2.0) metricSuffix is the suffix key to expose reject counter, should be unique by filter instance mode is one of "active", "inactive", "log" windowSize is within [minWindowSize, maxWindowSize] minRPS successThreshold is within (0,1] and sets the lowest request success rate at which the filter will not reject requests. maxRejectProbability is within (0,1] and sets the upper bound of reject probability. aggression >= 1, 1: linear, 2: qudratic, .. see also https://opensource.zalando.com/skipper/reference/filters/#admissioncontrol Signed-off-by: Sandor Szücs --- docs/reference/filters.md | 78 ++++++- filters/filters.go | 4 +- filters/shedder/admission.go | 366 ++++++++++++++++++++++++++++++ filters/shedder/admission_test.go | 199 ++++++++++++++++ skipper.go | 4 + 5 files changed, 646 insertions(+), 5 deletions(-) create mode 100644 filters/shedder/admission.go create mode 100644 filters/shedder/admission_test.go diff --git a/docs/reference/filters.md b/docs/reference/filters.md index a1cb4a34ea..dfe5e3474d 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -372,8 +372,8 @@ no-compression, 1 means best-speed and 11 means best-compression. Example: ``` The filter also checks the incoming request, if it accepts the supported encodings, -explicitly stated in the Accept-Encoding header. -The filter currently supports by default `gzip`, `deflate` and `br` (can be overridden with flag `compress-encodings`). +explicitly stated in the Accept-Encoding header. +The filter currently supports by default `gzip`, `deflate` and `br` (can be overridden with flag `compress-encodings`). It does not assume that the client accepts any encoding if the Accept-Encoding header is not set. It ignores * in the Accept-Encoding header. @@ -1848,6 +1848,76 @@ Path("/cheap") -> clusterLeakyBucketRatelimit("user-${request.cookie.Authori Path("/expensive") -> clusterLeakyBucketRatelimit("user-${request.cookie.Authorization}", 1, "1s", 5, 2) -> ... ``` +## shedder + +The basic idea of load shedding is to reduce errors by early stopping +partial of the ingress requests that create too much load and serve +the maximum throughput the system can do. + +There is a great talk by [Acacio Cruz from +Google](https://www.youtube.com/watch?v=XNEIkivvaV4&feature=youtu.be) +that explains the basic principles. + +### admissionControl + +Implements admission control filter similar to [envoy admission control](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#admission-control). + +The probability of rejection is calculated by the following equation: + +$$ P_{reject} = ( { n_{total} - { n_{success} \over threshold } \over n_{total} + 1} )^{1 \over aggression } $$ + +Examples: + + admissionControl(metricSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, aggression) + admissionControl("myapp", "active", "1s", 5, 10, 0.95, 0.9, 2.0) + +Parameters: + +* metric suffix (string) +* mode (enum) +* d (time.Duration) +* window size (int) +* min RPS (int) +* success threshold (float64) +* max reject probability (float64) +* aggression (float64) + +Metric suffix is the chosen suffix key to expose reject counter, +should be unique by filter instance + +Mode has 3 different possible values and defaults to "inactive": + +* "active" will reject traffic +* "inactive" will never reject traffic (default) +* "log" will not reject traffic, but log to debug filter settings + +D the time duration of a single slot for required counters in our +circular buffer of window size and default to 1s. + +Window size is used to snapshot counters to calculate total requests +and number of success. It is within [1, 100] and defaults to 10. + +minRPS is the minimum requests per second that has to pass this filter +otherwise it will not reject traffic. + +Success threshold sets the lowest request success rate at which the +filter will not reject requests. It is within (0,1] and defaults to +0.95, which means an error rate of lower than 5% will not trigger +rejects. + +Max reject probability sets the upper bound of reject probability. It +is within (0,1] and defaults to 0.95, which means if backend errors +with 100% it will only reject up to 95%. + +aggression is used to dictate the rejection probability. The +calculation is done by $$ p = p^{1 \over aggression} $$ +The aggression value is within (0,inf]: + +* 1: linear (default) +* 2: quadratic +* 3: cubic +* see also [envoy aggression](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#aggression) + ## lua See [the scripts page](scripts.md) @@ -2557,7 +2627,7 @@ fadeIn("3m", 1.5) #### Warning on fadeIn and Rolling Restarts -Traffic fade-in has the potential to skew the traffic to your backend pods in case of a rolling restart +Traffic fade-in has the potential to skew the traffic to your backend pods in case of a rolling restart (`kubectl rollout restart`), because it is very likely that the rolling restart is going faster than the fade-in duration. The image below shows an example of a rolling restart for a four-pod deployment (A, B, C, D) into (E, F, G, H), and the traffic share of each pod over time. While the ramp-up of the new pods is ongoing, @@ -2565,7 +2635,7 @@ the remaining old pods will receive a largely increased traffic share (especiall example), as well as an over-propotional traffic share for the first pod in the rollout (E). To make rolling restarts safe, you need to slow them down by setting `spec.minReadySeconds` on the pod spec -of your deployment or stackset, according to your fadeIn duration. +of your deployment or stackset, according to your fadeIn duration. ![Rolling Restart and Fade-In](../img/fadein_traffic_skew.png) diff --git a/filters/filters.go b/filters/filters.go index ef805b59f2..52f92163b7 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -2,10 +2,11 @@ package filters import ( "errors" - log "github.com/sirupsen/logrus" "net/http" "time" + log "github.com/sirupsen/logrus" + "github.com/opentracing/opentracing-go" ) @@ -264,6 +265,7 @@ const ( ConsecutiveBreakerName = "consecutiveBreaker" RateBreakerName = "rateBreaker" DisableBreakerName = "disableBreaker" + AdmissionControlName = "admissionControl" ClientRatelimitName = "clientRatelimit" RatelimitName = "ratelimit" ClusterClientRatelimitName = "clusterClientRatelimit" diff --git a/filters/shedder/admission.go b/filters/shedder/admission.go new file mode 100644 index 0000000000..f4d75f8b8e --- /dev/null +++ b/filters/shedder/admission.go @@ -0,0 +1,366 @@ +package shedder + +import ( + "context" + "math" + "math/rand" + "net/http" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/sirupsen/logrus" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/metrics" + "go.uber.org/atomic" +) + +func getIntArg(a interface{}) (int, error) { + if i, ok := a.(int); ok { + return i, nil + } + + if f, ok := a.(float64); ok { + return int(f), nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +func getDurationArg(a interface{}) (time.Duration, error) { + if s, ok := a.(string); ok { + return time.ParseDuration(s) + } + + i, err := getIntArg(a) + return time.Duration(i) * time.Millisecond, err +} + +func getFloat64Arg(a interface{}) (float64, error) { + if f, ok := a.(float64); ok { + return f, nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +func getModeArg(a interface{}) (mode, error) { + s, ok := a.(string) + if !ok { + return 0, filters.ErrInvalidFilterParameters + } + switch s { + case "active": + return active, nil + case "inactive": + return inactive, nil + case "log": + return log, nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +type mode int + +const ( + inactive mode = iota + log + active +) + +func (m mode) String() string { + switch m { + case active: + return "active" + case inactive: + return "inactive" + case log: + return "log" + } + return "unknown" +} + +const ( + counterPrefix = "shedder.admission_control." + admissionControlSpanName = "admission_control" + defaultDuration = time.Second + defaultMinRps = 10 + defaultWindowSize = 10 + minWindowSize = 1 + maxWindowSize = 100 + defaultThreshold = float64(0.95) + defaultMaxRejectProbability = float64(0.95) + defaultAggression = float64(1) +) + +type Options struct { + Tracer opentracing.Tracer +} + +type admissionControlSpec struct { + tracer opentracing.Tracer +} + +type admissionControl struct { + mu sync.Mutex + + metrics metrics.Metrics + metricSuffix string + tracer opentracing.Tracer + + mode mode + windowSize int + minRps int + d time.Duration // example: 1s + successThreshold float64 // (0,1] + maxRejectProbability float64 // (0,1] + aggression float64 // >0 + + totals []int64 // example: rps counters + success []int64 // example: rps counters + counter *atomic.Int64 + successCounter *atomic.Int64 +} + +func NewAdmissionControl(o Options) filters.Spec { + tracer := o.Tracer + if tracer == nil { + tracer = &opentracing.NoopTracer{} + } + return &admissionControlSpec{ + tracer: tracer, + } +} + +func (*admissionControlSpec) Name() string { return filters.AdmissionControlName } + +// CreateFilter creates a new admissionControl filter with passed configuration: +// +// admissionControl(metricSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, aggression) +// admissionControl("$app", "active", "1s", 5, 10, 0.1, 0.95, 2.0) +// +// metricSuffix is the suffix key to expose reject counter, should be unique by filter instance +// mode is one of "active", "inactive", "log" +// active will reject traffic +// inactive will never reject traffic +// log will not reject traffic, but log to debug filter settings +// windowSize is within [minWindowSize, maxWindowSize] +// minRPS +// successThreshold is within (0,1] and sets the lowest request success rate at which the filter will not reject requests. +// maxRejectProbability is within (0,1] and sets the upper bound of reject probability. +// aggression >= 1, 1: linear, 2: qudratic, .., see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#aggression +// +// see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#admission-control +func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filter, error) { + var err error + + metricSuffix := "" + if len(args) > 0 { + ok := false + metricSuffix, ok = args[0].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + } + + mode := inactive + if len(args) > 1 { + mode, err = getModeArg(args[1]) + if err != nil { + logrus.Warnf("mode failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + d := defaultDuration + if len(args) > 2 { + d, err = getDurationArg(args[2]) + if err != nil { + logrus.Warnf("d failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + windowSize := defaultWindowSize + if len(args) > 3 { + windowSize, err = getIntArg(args[3]) + if err != nil { + logrus.Warnf("windowsize failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + if windowSize < minWindowSize { + windowSize = minWindowSize + } + if windowSize > maxWindowSize { + windowSize = maxWindowSize + } + } + + minRps := defaultMinRps + if len(args) > 4 { + minRps, err = getIntArg(args[4]) + if err != nil { + logrus.Warnf("minRps failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + threshold := defaultThreshold + if len(args) > 5 { + threshold, err = getFloat64Arg(args[5]) + if err != nil { + logrus.Warnf("threshold failed %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + maxRejectProbability := defaultMaxRejectProbability + if len(args) > 6 { + maxRejectProbability, err = getFloat64Arg(args[6]) + if err != nil { + logrus.Warnf("maxRejectProbability failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + aggression := defaultAggression + if len(args) > 7 { + aggression, err = getFloat64Arg(args[7]) + if err != nil { + logrus.Warnf("aggression failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + if aggression <= 0.0 { + logrus.Warn("aggression should be >0") + return nil, filters.ErrInvalidFilterParameters + } + } + + ac := &admissionControl{ + metrics: metrics.Default, + metricSuffix: metricSuffix, + tracer: spec.tracer, + + mode: mode, + d: d, + windowSize: windowSize, + minRps: minRps, + successThreshold: threshold, + maxRejectProbability: maxRejectProbability, + aggression: aggression, + + totals: make([]int64, windowSize), + success: make([]int64, windowSize), + counter: atomic.NewInt64(0), + successCounter: atomic.NewInt64(0), + } + go ac.tickWindows() + return ac, nil +} + +func (ac *admissionControl) tickWindows() { + t := time.NewTicker(ac.d) + i := 0 + for range t.C { + val := ac.counter.Swap(0) + ok := ac.successCounter.Swap(0) + + ac.mu.Lock() + ac.totals[i] = val + ac.success[i] = ok + ac.mu.Unlock() + + i = (i + 1) % ac.windowSize + } +} + +func (ac *admissionControl) count() (float64, float64) { + ac.mu.Lock() + defer ac.mu.Unlock() + return float64(sum(ac.totals)), float64(sum(ac.success)) +} + +func sum(a []int64) int64 { + var result int64 + for _, v := range a { + result += v + } + return result +} + +// calculates P_{reject} see https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#overview +func (ac *admissionControl) pReject() float64 { + var rejectP float64 + + total, success := ac.count() + factor := float64(ac.d) / float64(time.Second) + if total < float64(ac.minRps)*factor { + return -1 + } + + s := success / ac.successThreshold + if ac.mode == log { + logrus.Infof("rejectP = (%0.2f - %0.2f) / (%0.2f + 1) --- success: %0.2f and threshold: %0.2f", total, s, total, success, ac.successThreshold) + } + if total < s { + return -1 + } + rejectP = (total - s) / (total + 1) + rejectP = math.Pow(rejectP, 1/ac.aggression) + + rejectP = math.Min(rejectP, ac.maxRejectProbability) + return math.Max(rejectP, 0.0) +} + +func (ac *admissionControl) Request(ctx filters.FilterContext) { + span := ac.startSpan(ctx.Request().Context()) + defer span.Finish() + + p := ac.pReject() // [0, ac.maxRejectProbability] and -1 to disable + /* #nosec */ + r := rand.Float64() // [0,1) + + if ac.mode == log { + logrus.Infof("p: %0.2f, r: %0.2f", p, r) + } + + if p > r { + ac.metrics.IncCounter(counterPrefix + ac.metricSuffix) + ext.Error.Set(span, true) + + ctx.StateBag()["Admission-Control"] = "reject" + + // shadow mode to measure data + if ac.mode != active { + return + } + ctx.Serve(&http.Response{ + StatusCode: http.StatusServiceUnavailable, + }) + } +} + +func (ac *admissionControl) Response(ctx filters.FilterContext) { + // we don't want to count ourselves + if ctx.StateBag()["Admission-Control"] == "reject" { + return + } + + code := ctx.Response().StatusCode + if code < 499 { + ac.successCounter.Inc() + } + ac.counter.Inc() +} + +func (ac *admissionControl) startSpan(ctx context.Context) (span opentracing.Span) { + parent := opentracing.SpanFromContext(ctx) + if parent != nil { + span = ac.tracer.StartSpan(admissionControlSpanName, opentracing.ChildOf(parent.Context())) + ext.Component.Set(span, "skipper") + ext.SpanKind.Set(span, "shedder") + } + return +} diff --git a/filters/shedder/admission_test.go b/filters/shedder/admission_test.go new file mode 100644 index 0000000000..f22e4fb59b --- /dev/null +++ b/filters/shedder/admission_test.go @@ -0,0 +1,199 @@ +package shedder + +import ( + "math/rand" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/net" + "github.com/zalando/skipper/proxy/proxytest" +) + +func TestAdmissionControl(t *testing.T) { + for _, ti := range []struct { + msg string + mode string + d time.Duration + windowsize int + minRPS int + successThreshold float64 + maxrejectprobability float64 + aggression float64 + N int // iterations + pBackendErr float64 // [0,1] + pExpectedAdmissionShedding float64 // [0,1] + }{{ + msg: "no error", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRPS: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.0, + pExpectedAdmissionShedding: 0.0, + }, { + msg: "only errors", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRPS: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, // 1000.0 + N: 10000, + pBackendErr: 1.0, + pExpectedAdmissionShedding: 0.95, + }, { + msg: "smaller error rate, than threshold won't block", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRPS: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.01, + pExpectedAdmissionShedding: 0.0, + }, { + msg: "tiny error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRPS: 10, + successThreshold: 0.99, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.1, + pExpectedAdmissionShedding: 0.1, + }, { + msg: "small error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRPS: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.2, + pExpectedAdmissionShedding: 0.1, + }, { + msg: "medium error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRPS: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.5, + pExpectedAdmissionShedding: 0.615, + }, { + msg: "large error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRPS: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.8, + pExpectedAdmissionShedding: 0.91, + }} { + t.Run(ti.msg, func(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + p := rand.Float64() + if p < ti.pBackendErr { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + })) + + spec := NewAdmissionControl(Options{}) + args := make([]interface{}, 0, 6) + args = append(args, "testmetric", ti.mode, ti.d.String(), ti.windowsize, ti.minRPS, ti.successThreshold, ti.maxrejectprobability, ti.aggression) + _, err := spec.CreateFilter(args) + if err != nil { + t.Logf("args: %+v", args...) + t.Fatalf("error creating filter: %v", err) + return + } + + fr := make(filters.Registry) + fr.Register(spec) + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + + proxy := proxytest.New(fr, r) + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Fatalf("Failed to parse url %s: %v", proxy.URL, err) + } + + client := net.NewClient(net.Options{}) + req, err := http.NewRequest("GET", reqURL.String(), nil) + if err != nil { + t.Error(err) + return + } + + failBackend := float64(0) + fail := float64(0) + ok := float64(0) + var N float64 + // iterations to make sure we have enough traffic + for j := 0; j < 10; j++ { + for i := 0; i < 1+(2*ti.windowsize); i++ { + until := time.Now().Add(ti.d) + for { + if time.Now().After(until) { + break + } + N++ + rsp, err := client.Do(req) + if err != nil { + t.Error(err) + } + switch rsp.StatusCode { + case http.StatusInternalServerError: + failBackend += 1 + case http.StatusServiceUnavailable: + fail += 1 + case http.StatusOK: + ok += 1 + default: + t.Logf("Unexpected status code %d %s", rsp.StatusCode, rsp.Status) + } + rsp.Body.Close() + } + } + } + t.Logf("ok: %0.2f, fail: %0.2f, failBackend: %0.2f", ok, fail, failBackend) + + epsilon := 0.05 * N // maybe 5% instead of 0.1% + expectedFails := (N - failBackend) * ti.pExpectedAdmissionShedding + + if expectedFails-epsilon > fail || fail > expectedFails+epsilon { + t.Errorf("Failed to get expected fails should be in: %0.2f < %0.2f < %0.2f", expectedFails-epsilon, fail, expectedFails+epsilon) + } + + // TODO(sszuecs) how to calculate expected oks? + // expectedOKs := N - (N-failBackend)*ti.pExpectedAdmissionShedding + // if ok < expectedOKs-epsilon || expectedOKs+epsilon < ok { + // t.Errorf("Failed to get expected ok should be in: %0.2f < %0.2f < %0.2f", expectedOKs-epsilon, ok, expectedOKs+epsilon) + // } + }) + } +} diff --git a/skipper.go b/skipper.go index e1e0d14cef..059926bfe3 100644 --- a/skipper.go +++ b/skipper.go @@ -35,6 +35,7 @@ import ( "github.com/zalando/skipper/filters/fadein" logfilter "github.com/zalando/skipper/filters/log" ratelimitfilters "github.com/zalando/skipper/filters/ratelimit" + "github.com/zalando/skipper/filters/shedder" "github.com/zalando/skipper/innkeeper" "github.com/zalando/skipper/loadbalancer" "github.com/zalando/skipper/logging" @@ -1365,6 +1366,9 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { o.ApiUsageMonitoringClientKeys, o.ApiUsageMonitoringRealmsTrackingPattern, ), + shedder.NewAdmissionControl(shedder.Options{ + Tracer: tracer, + }), ) var swarmer ratelimit.Swarmer