diff --git a/filters/shedder/admission.go b/filters/shedder/admission.go index 2668dd188b..1ba1ba3d45 100644 --- a/filters/shedder/admission.go +++ b/filters/shedder/admission.go @@ -11,6 +11,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/routing" @@ -94,13 +95,12 @@ type Options struct { Tracer opentracing.Tracer } -type admissionControlSpec struct { - mu sync.Mutex - tracer opentracing.Tracer - filters map[string]*admissionControl +type AdmissionControlSpec struct { + tracer opentracing.Tracer } type admissionControl struct { + once sync.Once mu sync.Mutex quit chan struct{} closed bool @@ -129,23 +129,49 @@ func NewAdmissionControl(o Options) filters.Spec { if tracer == nil { tracer = &opentracing.NoopTracer{} } - return &admissionControlSpec{ - tracer: tracer, - filters: make(map[string]*admissionControl), + return &AdmissionControlSpec{ + tracer: tracer, + } +} + +// Do removes duplcate filters, because we can only handle one in a +// chain. The last one will override the others. +func (spec *admissionControlPre) Do(routes []*eskip.Route) []*eskip.Route { + for _, r := range routes { + foundAt := -1 + toDelete := make([]int, 0) + for i, f := range r.Filters { + if f.Name == filters.AdmissionControlName { + if foundAt != -1 { + toDelete = append(toDelete, foundAt) + } + foundAt = i + } + } + + // loop from last to be safe with indexes + for i := len(toDelete) - 1; i >= 0; i-- { + // drop filter at position i + filters := append(r.Filters[i+1 : len(r.Filters)]) // maybe off by one if i is last + r.Filters = append(r.Filters[0:i], filters...) + } } + + return routes } // Do implements routing.PostProcessor and makes it possible to close goroutines. -func (spec *admissionControlSpec) Do(routes []*routing.Route) []*routing.Route { +func (spec *admissionControlPost) Do(routes []*routing.Route) []*routing.Route { inUse := make(map[string]struct{}) spec.mu.Lock() defer spec.mu.Unlock() for _, r := range routes { - oldAc, okOld := spec.filters[r.Id] + for _, f := range r.Filters { if ac, ok := f.Filter.(*admissionControl); ok { + oldAc, okOld := spec.filters[r.Id] if okOld { // replace: close the old one oldAc.Close() @@ -165,7 +191,23 @@ func (spec *admissionControlSpec) Do(routes []*routing.Route) []*routing.Route { return routes } -func (*admissionControlSpec) Name() string { return filters.AdmissionControlName } +type admissionControlPre struct{} +type admissionControlPost struct { + mu sync.Mutex + filters map[string]*admissionControl +} + +func (*AdmissionControlSpec) PreProcessor() *admissionControlPre { + return &admissionControlPre{} +} + +func (*AdmissionControlSpec) PostProcessor() *admissionControlPost { + return &admissionControlPost{ + filters: make(map[string]*admissionControl), + } +} + +func (*AdmissionControlSpec) Name() string { return filters.AdmissionControlName } // CreateFilter creates a new admissionControl filter with passed configuration: // @@ -184,7 +226,7 @@ func (*admissionControlSpec) Name() string { return filters.AdmissionControlName // exponent >0, 1: linear, 1/2: qudratic, 1/3: cubic, .. // // see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#admission-control -func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filter, error) { +func (spec *AdmissionControlSpec) CreateFilter(args []interface{}) (filters.Filter, error) { var err error if len(args) != 8 { @@ -250,6 +292,8 @@ func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filt averageRpsFactor := float64(time.Second) / (float64(d) * float64(windowSize)) ac := &admissionControl{ + once: sync.Once{}, + quit: make(chan struct{}), metrics: metrics.Default, metricSuffix: metricSuffix, @@ -275,10 +319,10 @@ func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filt // Close stops the background goroutine. The filter keeps working on stale data. func (ac *admissionControl) Close() { - if !ac.closed { + ac.once.Do(func() { ac.closed = true close(ac.quit) - } + }) } func (ac *admissionControl) tickWindows(d time.Duration) { diff --git a/filters/shedder/admission_test.go b/filters/shedder/admission_test.go index dfcd5a5bcd..9e78bca10c 100644 --- a/filters/shedder/admission_test.go +++ b/filters/shedder/admission_test.go @@ -1,7 +1,6 @@ package shedder import ( - "fmt" "math/rand" "net/http" "net/http/httptest" @@ -212,6 +211,43 @@ func TestAdmissionControlCleanup(t *testing.T) { mode: logInactive.String(), }} { t.Run(ti.msg, func(t *testing.T) { + // spec := NewAdmissionControl(Options{}) + // postProcessor, ok := spec.(routing.PostProcessor) + // if !ok { + // t.Fatal("AdmissionControl is not a PostProcessor") + // } + // args := make([]interface{}, 0, 6) + // args = append(args, "testmetric", ti.mode, "10ms", 5, 1, 0.1, 0.95, 0.5) + // _, err := spec.CreateFilter(args) + // if err != nil { + // t.Fatalf("error creating filter: %v", err) + // return + // } + + // fr := make(filters.Registry) + // fr.Register(spec) + // r := &eskip.Route{ + // Id: "r", + // Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, + // } + // r.BackendType = eskip.ShuntBackend + + // acs, ok := spec.(*admissionControlSpec) + // if ok { + // acs.mu.Lock() + + // deleteIDs := []string{} + + // for _, id := range deleteIDs { + // if ac, ok := acs.filters[id]; ok { + // if !ac.closed { + // } + // } + // } + // acs.mu.Unlock() + // } + //postProcessor.Do([]*routing.Route{r}) + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) })) @@ -219,11 +255,14 @@ func TestAdmissionControlCleanup(t *testing.T) { w.WriteHeader(http.StatusOK) })) - spec := NewAdmissionControl(Options{}) - postProcessor, ok := spec.(routing.PostProcessor) + fspec := NewAdmissionControl(Options{}) + spec, ok := fspec.(*AdmissionControlSpec) if !ok { - t.Fatal("AdmissionControl is not a PostProcessor") + t.Fatal("FilterSpec is not a AdmissionControlSpec") } + preProcessor := spec.PreProcessor() + postProcessor := spec.PostProcessor() + args := make([]interface{}, 0, 6) args = append(args, "testmetric", ti.mode, "10ms", 5, 1, 0.1, 0.95, 0.5) _, err := spec.CreateFilter(args) @@ -234,107 +273,57 @@ func TestAdmissionControlCleanup(t *testing.T) { fr := make(filters.Registry) fr.Register(spec) - r := &eskip.Route{ - Id: "r", + + r1 := &eskip.Route{ + Id: "r1", Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend1.URL, } - dc := testdataclient.New([]*eskip.Route{r}) + r2 := &eskip.Route{ + Id: "r2", + Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, + Backend: backend2.URL, + } + + dc := testdataclient.New([]*eskip.Route{r1}) proxy := proxytest.WithRoutingOptions(fr, routing.Options{ DataClients: []routing.DataClient{dc}, PostProcessors: []routing.PostProcessor{postProcessor}, - }, r) - - reqURL, err := url.Parse(proxy.URL) - if err != nil { - t.Fatalf("Failed to parse url %s: %v", proxy.URL, err) - } - - client := net.NewClient(net.Options{ - Timeout: 100 * time.Millisecond, - ResponseHeaderTimeout: 100 * time.Millisecond, - TLSHandshakeTimeout: 100 * time.Millisecond, - }) - - req, err := http.NewRequest("GET", reqURL.String(), nil) - if err != nil { - t.Error(err) - return - } + PreProcessors: []routing.PreProcessor{preProcessor}, + }, r1) + defer proxy.Close() - quit := make(chan struct{}) - errCH := make(chan error) - go func() { - ticker := time.NewTicker(time.Millisecond) - i := 0 - for range ticker.C { - select { - case <-quit: - return - default: - } - i++ - // flapping backend - deleteIDs := []string{} - switch { - case i%5 == 0: - deleteIDs = append(deleteIDs, r.Id) - r.Id = r.Id + "1" - case i%2 == 1: - r.Backend = backend2.URL - case i%2 == 0: - r.Backend = backend1.URL - } - dc.Update([]*eskip.Route{r}, deleteIDs) + dc.Update([]*eskip.Route{r1}, nil) - if i%5 == 0 { - go func() { - time.Sleep(10 * time.Millisecond) - - acs, ok := spec.(*admissionControlSpec) - if ok { - acs.mu.Lock() - defer acs.mu.Unlock() - - for _, id := range deleteIDs { - if ac, ok := acs.filters[id]; ok { - if !ac.closed { - errCH <- fmt.Errorf("filter should be closed routeID: %s", id) - } - } - } - } - }() + deletedIDs := []string{r1.Id} + dc.Update([]*eskip.Route{r2}, deletedIDs) + time.Sleep(time.Second) + postProcessor.mu.Lock() + for _, id := range deletedIDs { + if ac, ok := postProcessor.filters[id]; ok { + if !ac.closed { + t.Errorf("filter should be closed routeID: %s", id) } } - }() + } + postProcessor.mu.Unlock() - go func() { - i := 0 - for { - select { - case <-quit: - return - default: - } + // preProcessor will only apply r2 (last wins) + dc.Update([]*eskip.Route{r1, r2}, nil) - i++ - rsp, err := client.Do(req) - if err != nil { - errCH <- err - } else { - rsp.Body.Close() + deletedIDs = []string{r2.Id} + dc.Update([]*eskip.Route{}, deletedIDs) + time.Sleep(time.Second) + postProcessor.mu.Lock() + for _, id := range deletedIDs { + if ac, ok := postProcessor.filters[id]; ok { + if !ac.closed { + t.Errorf("filter should be closed routeID: %s", id) } } - }() - - time.Sleep(time.Second) - select { - case err := <-errCH: - t.Fatalf("Failed to execute test: %v", err) - default: } - close(quit) + postProcessor.mu.Unlock() + }) } } diff --git a/skipper.go b/skipper.go index 9698899924..5ee129caf4 100644 --- a/skipper.go +++ b/skipper.go @@ -1343,9 +1343,13 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { Tracer: tracer, } - admissionControlSpec := shedder.NewAdmissionControl(shedder.Options{ + admissionControlFilter := shedder.NewAdmissionControl(shedder.Options{ Tracer: tracer, }) + admissionControlSpec, ok := admissionControlFilter.(*shedder.AdmissionControlSpec) + if !ok { + log.Fatal("Failed to cast admission control filter to spec") + } o.CustomFilters = append(o.CustomFilters, logfilter.NewAuditLog(o.MaxAuditBody), @@ -1370,7 +1374,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { o.ApiUsageMonitoringClientKeys, o.ApiUsageMonitoringRealmsTrackingPattern, ), - admissionControlSpec, + admissionControlFilter, ) var swarmer ratelimit.Swarmer @@ -1598,7 +1602,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { schedulerRegistry, builtin.NewRouteCreationMetrics(mtr), fadein.NewPostProcessor(), - admissionControlSpec.(routing.PostProcessor), + admissionControlSpec.PostProcessor(), }, SignalFirstLoad: o.WaitFirstRouteLoad, } @@ -1625,6 +1629,8 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { ro.PreProcessors = append(ro.PreProcessors, o.CustomRoutingPreProcessors...) } + ro.PreProcessors = append(ro.PreProcessors, admissionControlSpec.PreProcessor()) + routing := routing.New(ro) defer routing.Close()