Skip to content

Commit

Permalink
feature: add preprocessor to remove filter duplicates, last wins
Browse files Browse the repository at this point in the history
fix: cleanup and cleanup test case

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
  • Loading branch information
szuecs committed Jul 5, 2022
1 parent 2165e1c commit ec20ccc
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 106 deletions.
70 changes: 57 additions & 13 deletions filters/shedder/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/metrics"
"github.com/zalando/skipper/routing"
Expand Down Expand Up @@ -94,13 +95,12 @@ type Options struct {
Tracer opentracing.Tracer
}

type admissionControlSpec struct {
mu sync.Mutex
tracer opentracing.Tracer
filters map[string]*admissionControl
type AdmissionControlSpec struct {
tracer opentracing.Tracer
}

type admissionControl struct {
once sync.Once
mu sync.Mutex
quit chan struct{}
closed bool
Expand Down Expand Up @@ -129,23 +129,49 @@ func NewAdmissionControl(o Options) filters.Spec {
if tracer == nil {
tracer = &opentracing.NoopTracer{}
}
return &admissionControlSpec{
tracer: tracer,
filters: make(map[string]*admissionControl),
return &AdmissionControlSpec{
tracer: tracer,
}
}

// Do removes duplcate filters, because we can only handle one in a
// chain. The last one will override the others.
func (spec *admissionControlPre) Do(routes []*eskip.Route) []*eskip.Route {
for _, r := range routes {
foundAt := -1
toDelete := make([]int, 0)
for i, f := range r.Filters {
if f.Name == filters.AdmissionControlName {
if foundAt != -1 {
toDelete = append(toDelete, foundAt)
}
foundAt = i
}
}

// loop from last to be safe with indexes
for i := len(toDelete) - 1; i >= 0; i-- {
// drop filter at position i
filters := append(r.Filters[i+1 : len(r.Filters)]) // maybe off by one if i is last
r.Filters = append(r.Filters[0:i], filters...)
}
}

return routes
}

// Do implements routing.PostProcessor and makes it possible to close goroutines.
func (spec *admissionControlSpec) Do(routes []*routing.Route) []*routing.Route {
func (spec *admissionControlPost) Do(routes []*routing.Route) []*routing.Route {
inUse := make(map[string]struct{})

spec.mu.Lock()
defer spec.mu.Unlock()

for _, r := range routes {
oldAc, okOld := spec.filters[r.Id]

for _, f := range r.Filters {
if ac, ok := f.Filter.(*admissionControl); ok {
oldAc, okOld := spec.filters[r.Id]
if okOld {
// replace: close the old one
oldAc.Close()
Expand All @@ -165,7 +191,23 @@ func (spec *admissionControlSpec) Do(routes []*routing.Route) []*routing.Route {
return routes
}

func (*admissionControlSpec) Name() string { return filters.AdmissionControlName }
type admissionControlPre struct{}
type admissionControlPost struct {
mu sync.Mutex
filters map[string]*admissionControl
}

func (*AdmissionControlSpec) PreProcessor() *admissionControlPre {
return &admissionControlPre{}
}

func (*AdmissionControlSpec) PostProcessor() *admissionControlPost {
return &admissionControlPost{
filters: make(map[string]*admissionControl),
}
}

func (*AdmissionControlSpec) Name() string { return filters.AdmissionControlName }

// CreateFilter creates a new admissionControl filter with passed configuration:
//
Expand All @@ -184,7 +226,7 @@ func (*admissionControlSpec) Name() string { return filters.AdmissionControlName
// exponent >0, 1: linear, 1/2: qudratic, 1/3: cubic, ..
//
// see also https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/admission_control_filter#admission-control
func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
func (spec *AdmissionControlSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
var err error

if len(args) != 8 {
Expand Down Expand Up @@ -250,6 +292,8 @@ func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filt
averageRpsFactor := float64(time.Second) / (float64(d) * float64(windowSize))

ac := &admissionControl{
once: sync.Once{},

quit: make(chan struct{}),
metrics: metrics.Default,
metricSuffix: metricSuffix,
Expand All @@ -275,10 +319,10 @@ func (spec *admissionControlSpec) CreateFilter(args []interface{}) (filters.Filt

// Close stops the background goroutine. The filter keeps working on stale data.
func (ac *admissionControl) Close() {
if !ac.closed {
ac.once.Do(func() {
ac.closed = true
close(ac.quit)
}
})
}

func (ac *admissionControl) tickWindows(d time.Duration) {
Expand Down
169 changes: 79 additions & 90 deletions filters/shedder/admission_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shedder

import (
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -212,18 +211,58 @@ func TestAdmissionControlCleanup(t *testing.T) {
mode: logInactive.String(),
}} {
t.Run(ti.msg, func(t *testing.T) {
// spec := NewAdmissionControl(Options{})
// postProcessor, ok := spec.(routing.PostProcessor)
// if !ok {
// t.Fatal("AdmissionControl is not a PostProcessor")
// }
// args := make([]interface{}, 0, 6)
// args = append(args, "testmetric", ti.mode, "10ms", 5, 1, 0.1, 0.95, 0.5)
// _, err := spec.CreateFilter(args)
// if err != nil {
// t.Fatalf("error creating filter: %v", err)
// return
// }

// fr := make(filters.Registry)
// fr.Register(spec)
// r := &eskip.Route{
// Id: "r",
// Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}},
// }
// r.BackendType = eskip.ShuntBackend

// acs, ok := spec.(*admissionControlSpec)
// if ok {
// acs.mu.Lock()

// deleteIDs := []string{}

// for _, id := range deleteIDs {
// if ac, ok := acs.filters[id]; ok {
// if !ac.closed {
// }
// }
// }
// acs.mu.Unlock()
// }
//postProcessor.Do([]*routing.Route{r})

backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

spec := NewAdmissionControl(Options{})
postProcessor, ok := spec.(routing.PostProcessor)
fspec := NewAdmissionControl(Options{})
spec, ok := fspec.(*AdmissionControlSpec)
if !ok {
t.Fatal("AdmissionControl is not a PostProcessor")
t.Fatal("FilterSpec is not a AdmissionControlSpec")
}
preProcessor := spec.PreProcessor()
postProcessor := spec.PostProcessor()

args := make([]interface{}, 0, 6)
args = append(args, "testmetric", ti.mode, "10ms", 5, 1, 0.1, 0.95, 0.5)
_, err := spec.CreateFilter(args)
Expand All @@ -234,107 +273,57 @@ func TestAdmissionControlCleanup(t *testing.T) {

fr := make(filters.Registry)
fr.Register(spec)
r := &eskip.Route{
Id: "r",

r1 := &eskip.Route{
Id: "r1",
Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}},
Backend: backend1.URL,
}
dc := testdataclient.New([]*eskip.Route{r})
r2 := &eskip.Route{
Id: "r2",
Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}},
Backend: backend2.URL,
}

dc := testdataclient.New([]*eskip.Route{r1})
proxy := proxytest.WithRoutingOptions(fr, routing.Options{
DataClients: []routing.DataClient{dc},
PostProcessors: []routing.PostProcessor{postProcessor},
}, r)

reqURL, err := url.Parse(proxy.URL)
if err != nil {
t.Fatalf("Failed to parse url %s: %v", proxy.URL, err)
}

client := net.NewClient(net.Options{
Timeout: 100 * time.Millisecond,
ResponseHeaderTimeout: 100 * time.Millisecond,
TLSHandshakeTimeout: 100 * time.Millisecond,
})

req, err := http.NewRequest("GET", reqURL.String(), nil)
if err != nil {
t.Error(err)
return
}
PreProcessors: []routing.PreProcessor{preProcessor},
}, r1)
defer proxy.Close()

quit := make(chan struct{})
errCH := make(chan error)
go func() {
ticker := time.NewTicker(time.Millisecond)
i := 0
for range ticker.C {
select {
case <-quit:
return
default:
}
i++
// flapping backend
deleteIDs := []string{}
switch {
case i%5 == 0:
deleteIDs = append(deleteIDs, r.Id)
r.Id = r.Id + "1"
case i%2 == 1:
r.Backend = backend2.URL
case i%2 == 0:
r.Backend = backend1.URL
}
dc.Update([]*eskip.Route{r}, deleteIDs)
dc.Update([]*eskip.Route{r1}, nil)

if i%5 == 0 {
go func() {
time.Sleep(10 * time.Millisecond)

acs, ok := spec.(*admissionControlSpec)
if ok {
acs.mu.Lock()
defer acs.mu.Unlock()

for _, id := range deleteIDs {
if ac, ok := acs.filters[id]; ok {
if !ac.closed {
errCH <- fmt.Errorf("filter should be closed routeID: %s", id)
}
}
}
}
}()
deletedIDs := []string{r1.Id}
dc.Update([]*eskip.Route{r2}, deletedIDs)
time.Sleep(time.Second)
postProcessor.mu.Lock()
for _, id := range deletedIDs {
if ac, ok := postProcessor.filters[id]; ok {
if !ac.closed {
t.Errorf("filter should be closed routeID: %s", id)
}
}
}()
}
postProcessor.mu.Unlock()

go func() {
i := 0
for {
select {
case <-quit:
return
default:
}
// preProcessor will only apply r2 (last wins)
dc.Update([]*eskip.Route{r1, r2}, nil)

i++
rsp, err := client.Do(req)
if err != nil {
errCH <- err
} else {
rsp.Body.Close()
deletedIDs = []string{r2.Id}
dc.Update([]*eskip.Route{}, deletedIDs)
time.Sleep(time.Second)
postProcessor.mu.Lock()
for _, id := range deletedIDs {
if ac, ok := postProcessor.filters[id]; ok {
if !ac.closed {
t.Errorf("filter should be closed routeID: %s", id)
}
}
}()

time.Sleep(time.Second)
select {
case err := <-errCH:
t.Fatalf("Failed to execute test: %v", err)
default:
}
close(quit)
postProcessor.mu.Unlock()

})
}
}
12 changes: 9 additions & 3 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,9 +1343,13 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
Tracer: tracer,
}

admissionControlSpec := shedder.NewAdmissionControl(shedder.Options{
admissionControlFilter := shedder.NewAdmissionControl(shedder.Options{
Tracer: tracer,
})
admissionControlSpec, ok := admissionControlFilter.(*shedder.AdmissionControlSpec)
if !ok {
log.Fatal("Failed to cast admission control filter to spec")
}

o.CustomFilters = append(o.CustomFilters,
logfilter.NewAuditLog(o.MaxAuditBody),
Expand All @@ -1370,7 +1374,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
o.ApiUsageMonitoringClientKeys,
o.ApiUsageMonitoringRealmsTrackingPattern,
),
admissionControlSpec,
admissionControlFilter,
)

var swarmer ratelimit.Swarmer
Expand Down Expand Up @@ -1598,7 +1602,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
schedulerRegistry,
builtin.NewRouteCreationMetrics(mtr),
fadein.NewPostProcessor(),
admissionControlSpec.(routing.PostProcessor),
admissionControlSpec.PostProcessor(),
},
SignalFirstLoad: o.WaitFirstRouteLoad,
}
Expand All @@ -1625,6 +1629,8 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
ro.PreProcessors = append(ro.PreProcessors, o.CustomRoutingPreProcessors...)
}

ro.PreProcessors = append(ro.PreProcessors, admissionControlSpec.PreProcessor())

routing := routing.New(ro)
defer routing.Close()

Expand Down

0 comments on commit ec20ccc

Please sign in to comment.