diff --git a/docs/reference/filters.md b/docs/reference/filters.md index a1cb4a34ea..9ff96bf4e0 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -372,8 +372,8 @@ no-compression, 1 means best-speed and 11 means best-compression. Example: ``` The filter also checks the incoming request, if it accepts the supported encodings, -explicitly stated in the Accept-Encoding header. -The filter currently supports by default `gzip`, `deflate` and `br` (can be overridden with flag `compress-encodings`). +explicitly stated in the Accept-Encoding header. +The filter currently supports by default `gzip`, `deflate` and `br` (can be overridden with flag `compress-encodings`). It does not assume that the client accepts any encoding if the Accept-Encoding header is not set. It ignores * in the Accept-Encoding header. @@ -1848,6 +1848,79 @@ Path("/cheap") -> clusterLeakyBucketRatelimit("user-${request.cookie.Authori Path("/expensive") -> clusterLeakyBucketRatelimit("user-${request.cookie.Authorization}", 1, "1s", 5, 2) -> ... ``` +## shedder + +The basic idea of load shedding is to reduce errors by early stopping +partial of the ingress requests that create too much load and serve +the maximum throughput the system can do. + +There is a great talk by [Acacio Cruz from +Google](https://www.youtube.com/watch?v=XNEIkivvaV4&feature=youtu.be) +that explains the basic principles. + +### admissionControl + +Implements an admission control filter similar to [envoy admission control](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#admission-control). + +The probability of rejection is calculated by the following equation: + +$$ P_{reject} = ( { n_{total} - { n_{success} \over threshold } \over n_{total} + 1} )^{1 \over aggression } $$ + +Examples: + + admissionControl(metricSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, aggression) + admissionControl("myapp", "active", "1s", 5, 10, 0.95, 0.9, 2.0) + +Parameters: + +* metric suffix (string) +* mode (enum) +* d (time.Duration) +* window size (int) +* min requests (int) +* success threshold (float64) +* max reject probability (float64) +* aggression (float64) + +Metric suffix is the chosen suffix key to expose reject counter, +should be unique by filter instance + +Mode has 3 different possible values and defaults to "inactive": + +* "active" will reject traffic +* "inactive" will never reject traffic (default) +* "log" will not reject traffic, but log to debug filter settings + +D the time duration of a single slot for required counters in our +circular buffer of window size and default to 1s. + +Window size is the size of the circular buffer. It is used to snapshot +counters to calculate total requests and number of success. It is +within $[1, 100]$ and defaults to 10. + +Min requests is the minimum requests per $windowSize * d$ that have to pass this filter +otherwise it will not reject traffic. + +Success threshold sets the lowest request success rate at which the +filter will not reject requests. It is within $(0,1]$ and defaults to +0.95, which means an error rate of lower than 5% will not trigger +rejects. + +Max reject probability sets the upper bound of reject probability. It +is within (0,1] and defaults to 0.95, which means if backend errors +with 100% it will only reject up to 95%. + +aggression is used to dictate the rejection probability. The +calculation is done by $ p = p^{1 \over aggression} $ +The aggression value is within $(0,\infty]$: + +* 1: linear (default) +* 2: quadratic +* 3: cubic +* see also [envoy aggression](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#aggression) + +!!! This filter is experimental and defaults are likely to change + ## lua See [the scripts page](scripts.md) @@ -2557,7 +2630,7 @@ fadeIn("3m", 1.5) #### Warning on fadeIn and Rolling Restarts -Traffic fade-in has the potential to skew the traffic to your backend pods in case of a rolling restart +Traffic fade-in has the potential to skew the traffic to your backend pods in case of a rolling restart (`kubectl rollout restart`), because it is very likely that the rolling restart is going faster than the fade-in duration. The image below shows an example of a rolling restart for a four-pod deployment (A, B, C, D) into (E, F, G, H), and the traffic share of each pod over time. While the ramp-up of the new pods is ongoing, @@ -2565,7 +2638,7 @@ the remaining old pods will receive a largely increased traffic share (especiall example), as well as an over-propotional traffic share for the first pod in the rollout (E). To make rolling restarts safe, you need to slow them down by setting `spec.minReadySeconds` on the pod spec -of your deployment or stackset, according to your fadeIn duration. +of your deployment or stackset, according to your fadeIn duration. ![Rolling Restart and Fade-In](../img/fadein_traffic_skew.png) diff --git a/filters/filters.go b/filters/filters.go index ef805b59f2..52f92163b7 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -2,10 +2,11 @@ package filters import ( "errors" - log "github.com/sirupsen/logrus" "net/http" "time" + log "github.com/sirupsen/logrus" + "github.com/opentracing/opentracing-go" ) @@ -264,6 +265,7 @@ const ( ConsecutiveBreakerName = "consecutiveBreaker" RateBreakerName = "rateBreaker" DisableBreakerName = "disableBreaker" + AdmissionControlName = "admissionControl" ClientRatelimitName = "clientRatelimit" RatelimitName = "ratelimit" ClusterClientRatelimitName = "clusterClientRatelimit" diff --git a/filters/shedder/admission.go b/filters/shedder/admission.go new file mode 100644 index 0000000000..70cd39a1c5 --- /dev/null +++ b/filters/shedder/admission.go @@ -0,0 +1,366 @@ +package shedder + +import ( + "context" + "math" + "math/rand" + "net/http" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/sirupsen/logrus" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/metrics" + "go.uber.org/atomic" +) + +func getIntArg(a interface{}) (int, error) { + if i, ok := a.(int); ok { + return i, nil + } + + if f, ok := a.(float64); ok { + return int(f), nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +func getDurationArg(a interface{}) (time.Duration, error) { + if s, ok := a.(string); ok { + return time.ParseDuration(s) + } + + i, err := getIntArg(a) + return time.Duration(i) * time.Millisecond, err +} + +func getFloat64Arg(a interface{}) (float64, error) { + if f, ok := a.(float64); ok { + return f, nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +func getModeArg(a interface{}) (mode, error) { + s, ok := a.(string) + if !ok { + return 0, filters.ErrInvalidFilterParameters + } + switch s { + case "active": + return active, nil + case "inactive": + return inactive, nil + case "log": + return log, nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +type mode int + +const ( + inactive mode = iota + log + active +) + +func (m mode) String() string { + switch m { + case active: + return "active" + case inactive: + return "inactive" + case log: + return "log" + } + return "unknown" +} + +const ( + counterPrefix = "shedder.admission_control." + admissionControlSpanName = "admission_control" + defaultDuration = time.Second + defaultMinRps = 10 + defaultWindowSize = 10 + minWindowSize = 1 + maxWindowSize = 100 + defaultThreshold = float64(0.95) + defaultMaxRejectProbability = float64(0.95) + defaultAggression = float64(1) +) + +type Options struct { + Tracer opentracing.Tracer +} + +type admissionControlSpec struct { + tracer opentracing.Tracer +} + +type admissionControl struct { + mu sync.Mutex + + metrics metrics.Metrics + metricSuffix string + tracer opentracing.Tracer + + mode mode + windowSize int + minRequests int + d time.Duration + successThreshold float64 // (0,1] + maxRejectProbability float64 // (0,1] + aggression float64 // >0 + + totals []int64 + success []int64 + counter *atomic.Int64 + successCounter *atomic.Int64 +} + +func NewAdmissionControl(o Options) filters.Spec { + tracer := o.Tracer + if tracer == nil { + tracer = &opentracing.NoopTracer{} + } + return &admissionControlSpec{ + tracer: tracer, + } +} + +func (*admissionControlSpec) Name() string { return filters.AdmissionControlName } + +// CreateFilter creates a new admissionControl filter with passed configuration: +// +// admissionControl(metricSuffix, mode, d, windowSize, minRPS, successThreshold, maxRejectProbability, aggression) +// admissionControl("$app", "active", "1s", 5, 10, 0.1, 0.95, 2.0) +// +// metricSuffix is the suffix key to expose reject counter, should be unique by filter instance +// mode is one of "active", "inactive", "log" +// active will reject traffic +// inactive will never reject traffic +// log will not reject traffic, but log to debug filter settings +// windowSize is within [minWindowSize, maxWindowSize] +// minRPS +// successThreshold is within (0,1] and sets the lowest request success rate at which the filter will not reject requests. +// maxRejectProbability is within (0,1] and sets the upper bound of reject probability. +// aggression >= 1, 1: linear, 2: qudratic, .., see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#aggression +// +// see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#admission-control +func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filter, error) { + var err error + + metricSuffix := "" + if len(args) > 0 { + ok := false + metricSuffix, ok = args[0].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + } + + mode := inactive + if len(args) > 1 { + mode, err = getModeArg(args[1]) + if err != nil { + logrus.Warnf("mode failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + d := defaultDuration + if len(args) > 2 { + d, err = getDurationArg(args[2]) + if err != nil { + logrus.Warnf("d failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + windowSize := defaultWindowSize + if len(args) > 3 { + windowSize, err = getIntArg(args[3]) + if err != nil { + logrus.Warnf("windowsize failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + if windowSize < minWindowSize { + windowSize = minWindowSize + } + if windowSize > maxWindowSize { + windowSize = maxWindowSize + } + } + + minRequests := defaultMinRps + if len(args) > 4 { + minRequests, err = getIntArg(args[4]) + if err != nil { + logrus.Warnf("minRequests failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + threshold := defaultThreshold + if len(args) > 5 { + threshold, err = getFloat64Arg(args[5]) + if err != nil { + logrus.Warnf("threshold failed %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + maxRejectProbability := defaultMaxRejectProbability + if len(args) > 6 { + maxRejectProbability, err = getFloat64Arg(args[6]) + if err != nil { + logrus.Warnf("maxRejectProbability failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + + aggression := defaultAggression + if len(args) > 7 { + aggression, err = getFloat64Arg(args[7]) + if err != nil { + logrus.Warnf("aggression failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + if aggression <= 0.0 { + logrus.Warn("aggression should be >0") + return nil, filters.ErrInvalidFilterParameters + } + } + + ac := &admissionControl{ + metrics: metrics.Default, + metricSuffix: metricSuffix, + tracer: spec.tracer, + + mode: mode, + d: d, + windowSize: windowSize, + minRequests: minRequests, + successThreshold: threshold, + maxRejectProbability: maxRejectProbability, + aggression: aggression, + + totals: make([]int64, windowSize), + success: make([]int64, windowSize), + counter: atomic.NewInt64(0), + successCounter: atomic.NewInt64(0), + } + go ac.tickWindows() + return ac, nil +} + +func (ac *admissionControl) tickWindows() { + t := time.NewTicker(ac.d) + i := 0 + for range t.C { + val := ac.counter.Swap(0) + ok := ac.successCounter.Swap(0) + + ac.mu.Lock() + ac.totals[i] = val + ac.success[i] = ok + ac.mu.Unlock() + + i = (i + 1) % ac.windowSize + } +} + +func (ac *admissionControl) count() (float64, float64) { + ac.mu.Lock() + defer ac.mu.Unlock() + return float64(sum(ac.totals)), float64(sum(ac.success)) +} + +func sum(a []int64) int64 { + var result int64 + for _, v := range a { + result += v + } + return result +} + +// calculates P_{reject} see https://opensource.zalando.com/skipper/reference/filters/#admissioncontrol +func (ac *admissionControl) pReject() float64 { + var rejectP float64 + + total, success := ac.count() + factor := float64(ac.d) / float64(time.Second) + if total < float64(ac.minRequests)*factor { + return -1 + } + + s := success / ac.successThreshold + if ac.mode == log { + logrus.Infof("rejectP = (%0.2f - %0.2f) / (%0.2f + 1) --- success: %0.2f and threshold: %0.2f", total, s, total, success, ac.successThreshold) + } + if total < s { + return -1 + } + rejectP = (total - s) / (total + 1) + rejectP = math.Pow(rejectP, 1/ac.aggression) + + rejectP = math.Min(rejectP, ac.maxRejectProbability) + return math.Max(rejectP, 0.0) +} + +func (ac *admissionControl) Request(ctx filters.FilterContext) { + span := ac.startSpan(ctx.Request().Context()) + defer span.Finish() + + p := ac.pReject() // [0, ac.maxRejectProbability] and -1 to disable + /* #nosec */ + r := rand.Float64() // [0,1) + + if ac.mode == log { + logrus.Infof("p: %0.2f, r: %0.2f", p, r) + } + + if p > r { + ac.metrics.IncCounter(counterPrefix + ac.metricSuffix) + ext.Error.Set(span, true) + + ctx.StateBag()["Admission-Control"] = "reject" + + // shadow mode to measure data + if ac.mode != active { + return + } + ctx.Serve(&http.Response{ + StatusCode: http.StatusServiceUnavailable, + }) + } +} + +func (ac *admissionControl) Response(ctx filters.FilterContext) { + // we don't want to count ourselves + if ctx.StateBag()["Admission-Control"] == "reject" { + return + } + + code := ctx.Response().StatusCode + if code < 499 { + ac.successCounter.Inc() + } + ac.counter.Inc() +} + +func (ac *admissionControl) startSpan(ctx context.Context) (span opentracing.Span) { + parent := opentracing.SpanFromContext(ctx) + if parent != nil { + span = ac.tracer.StartSpan(admissionControlSpanName, opentracing.ChildOf(parent.Context())) + ext.Component.Set(span, "skipper") + ext.SpanKind.Set(span, "shedder") + } + return +} diff --git a/filters/shedder/admission_test.go b/filters/shedder/admission_test.go new file mode 100644 index 0000000000..9a6a1a28dc --- /dev/null +++ b/filters/shedder/admission_test.go @@ -0,0 +1,199 @@ +package shedder + +import ( + "math/rand" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/net" + "github.com/zalando/skipper/proxy/proxytest" +) + +func TestAdmissionControl(t *testing.T) { + for _, ti := range []struct { + msg string + mode string + d time.Duration + windowsize int + minRequests int + successThreshold float64 + maxrejectprobability float64 + aggression float64 + N int // iterations + pBackendErr float64 // [0,1] + pExpectedAdmissionShedding float64 // [0,1] + }{{ + msg: "no error", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.0, + pExpectedAdmissionShedding: 0.0, + }, { + msg: "only errors", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, // 1000.0 + N: 10000, + pBackendErr: 1.0, + pExpectedAdmissionShedding: 0.95, + }, { + msg: "smaller error rate, than threshold won't block", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.01, + pExpectedAdmissionShedding: 0.0, + }, { + msg: "tiny error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.99, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.1, + pExpectedAdmissionShedding: 0.1, + }, { + msg: "small error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.2, + pExpectedAdmissionShedding: 0.1, + }, { + msg: "medium error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.5, + pExpectedAdmissionShedding: 0.615, + }, { + msg: "large error rate and bigger than threshold will block some traffic", + mode: "active", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + aggression: 1.0, + N: 10000, + pBackendErr: 0.8, + pExpectedAdmissionShedding: 0.91, + }} { + t.Run(ti.msg, func(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + p := rand.Float64() + if p < ti.pBackendErr { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + })) + + spec := NewAdmissionControl(Options{}) + args := make([]interface{}, 0, 6) + args = append(args, "testmetric", ti.mode, ti.d.String(), ti.windowsize, ti.minRequests, ti.successThreshold, ti.maxrejectprobability, ti.aggression) + _, err := spec.CreateFilter(args) + if err != nil { + t.Logf("args: %+v", args...) + t.Fatalf("error creating filter: %v", err) + return + } + + fr := make(filters.Registry) + fr.Register(spec) + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + + proxy := proxytest.New(fr, r) + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Fatalf("Failed to parse url %s: %v", proxy.URL, err) + } + + client := net.NewClient(net.Options{}) + req, err := http.NewRequest("GET", reqURL.String(), nil) + if err != nil { + t.Error(err) + return + } + + failBackend := float64(0) + fail := float64(0) + ok := float64(0) + var N float64 + // iterations to make sure we have enough traffic + for j := 0; j < 10; j++ { + for i := 0; i < 1+(2*ti.windowsize); i++ { + until := time.Now().Add(ti.d) + for { + if time.Now().After(until) { + break + } + N++ + rsp, err := client.Do(req) + if err != nil { + t.Error(err) + } + switch rsp.StatusCode { + case http.StatusInternalServerError: + failBackend += 1 + case http.StatusServiceUnavailable: + fail += 1 + case http.StatusOK: + ok += 1 + default: + t.Logf("Unexpected status code %d %s", rsp.StatusCode, rsp.Status) + } + rsp.Body.Close() + } + } + } + t.Logf("ok: %0.2f, fail: %0.2f, failBackend: %0.2f", ok, fail, failBackend) + + epsilon := 0.05 * N // maybe 5% instead of 0.1% + expectedFails := (N - failBackend) * ti.pExpectedAdmissionShedding + + if expectedFails-epsilon > fail || fail > expectedFails+epsilon { + t.Errorf("Failed to get expected fails should be in: %0.2f < %0.2f < %0.2f", expectedFails-epsilon, fail, expectedFails+epsilon) + } + + // TODO(sszuecs) how to calculate expected oks? + // expectedOKs := N - (N-failBackend)*ti.pExpectedAdmissionShedding + // if ok < expectedOKs-epsilon || expectedOKs+epsilon < ok { + // t.Errorf("Failed to get expected ok should be in: %0.2f < %0.2f < %0.2f", expectedOKs-epsilon, ok, expectedOKs+epsilon) + // } + }) + } +} diff --git a/skipper.go b/skipper.go index e1e0d14cef..059926bfe3 100644 --- a/skipper.go +++ b/skipper.go @@ -35,6 +35,7 @@ import ( "github.com/zalando/skipper/filters/fadein" logfilter "github.com/zalando/skipper/filters/log" ratelimitfilters "github.com/zalando/skipper/filters/ratelimit" + "github.com/zalando/skipper/filters/shedder" "github.com/zalando/skipper/innkeeper" "github.com/zalando/skipper/loadbalancer" "github.com/zalando/skipper/logging" @@ -1365,6 +1366,9 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { o.ApiUsageMonitoringClientKeys, o.ApiUsageMonitoringRealmsTrackingPattern, ), + shedder.NewAdmissionControl(shedder.Options{ + Tracer: tracer, + }), ) var swarmer ratelimit.Swarmer