Skip to content

Commit

Permalink
add lifo filters that can be used in a per route style grouping also …
Browse files Browse the repository at this point in the history
…from -default-filters-prepend and -default-filters-append

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
  • Loading branch information
szuecs committed Apr 25, 2019
1 parent 530a791 commit 32ad2d6
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 10 deletions.
1 change: 1 addition & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func MakeRegistry() filters.Registry {
accesslog.NewDisableAccessLog(),
accesslog.NewEnableAccessLog(),
auth.NewForwardToken(),
scheduler.NewLIFO(),
scheduler.NewLIFOGroup(),
} {
r.Register(s)
Expand Down
148 changes: 139 additions & 9 deletions filters/scheduler/lifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ import (
// that it's incompatible with MarkServed().

type (
lifoSpec struct{}
lifoGroupSpec struct{}

lifoFilter struct {
key string
config scheduler.Config
stack *scheduler.Stack
}
lifoGroupFilter struct {
key string
config scheduler.Config
Expand All @@ -30,17 +36,24 @@ type (
)

const (
LIFOName = "lifo"
LIFOGroupName = "lifoGroup"

lifoKey = "lifo-done"
lifoGroupKey = "lifo-group-done"

defaultMaxConcurreny = 100
defaultMaxStackSize = 100
defaultTimeout = 10 * time.Second
)

// TODO: get rid of global
var configStore groupConfig

func NewLIFO() filters.Spec {
return &lifoSpec{}
}

func NewLIFOGroup() filters.Spec {
return &lifoGroupSpec{}
}
Expand All @@ -65,6 +78,71 @@ func durationArg(a interface{}) (time.Duration, error) {
}
}

func (s *lifoSpec) Name() string { return LIFOName }

// CreateFilter creates a lifoFilter, that will use a stack based
// queue for handling requests instead of the fifo queue. The first
// parameter is MaxConcurrency the second MaxStackSize and the third
// Timeout.
//
// The implementation is based on
// https://godoc.org/github.com/aryszka/jobstack, which provides more
// detailed documentation.
//
// All parameters are optional and defaults to
// MaxConcurrency 100, MaxStackSize 100, Timeout 10s.
//
// The total maximum number of requests has to be computed by adding
// MaxConcurrency and MaxStackSize: total max = MaxConcurrency + MaxStackSize
//
// Min values are 1 for MaxConcurrency and MaxStackSize, and 1ms for
// Timeout. All configration that is below will be set to these min
// values.
func (s *lifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
var l lifoFilter

// set defaults
l.config.MaxConcurrency = defaultMaxConcurreny
l.config.MaxStackSize = defaultMaxStackSize
l.config.Timeout = defaultTimeout

if len(args) > 0 {
c, err := intArg(args[0])
if err != nil {
return nil, err
}
if c >= 1 {
l.config.MaxConcurrency = c
}
}

if len(args) > 1 {
c, err := intArg(args[1])
if err != nil {
return nil, err
}
if c >= 0 {
l.config.MaxStackSize = c
}
}

if len(args) > 2 {
d, err := durationArg(args[2])
if err != nil {
return nil, err
}
if d >= 1*time.Millisecond {
l.config.Timeout = d
}
}

if len(args) > 3 {
return nil, filters.ErrInvalidFilterParameters
}

return &l, nil
}

func (s *lifoGroupSpec) Name() string { return LIFOGroupName }

// CreateFilter creates a lifoGroupFilter, that will use a stack based
Expand Down Expand Up @@ -146,6 +224,48 @@ func (s *lifoGroupSpec) CreateFilter(args []interface{}) (filters.Filter, error)
return l, nil
}

// Config returns the scheduler configuration for the given filter
func (l *lifoFilter) Config() scheduler.Config {
return l.config
}

// SetStack binds the stack to the current filter context
func (l *lifoFilter) SetStack(s *scheduler.Stack) {
l.stack = s
}

// GetStack is only used in tests
func (l *lifoFilter) GetStack() *scheduler.Stack {
return l.stack
}

// Key returns the scheduler string
func (l *lifoFilter) Key() string {
return l.key
}

// SetKey sets the scheduler string, which should be called from the
// PostProcessor
func (l *lifoFilter) SetKey(k string) {
l.key = k
}

// Request is the filter.Filter interface implementation. Request will
// increase the number of inflight requests and respond to the caller,
// if the bounded stack returns an error. Status code by Error:
//
// - 503 if jobstack.ErrStackFull
// - 502 if jobstack.ErrTimeout
func (l *lifoFilter) Request(ctx filters.FilterContext) {
request(l.GetStack(), lifoKey, ctx)
}

// Response is the filter.Filter interface implementation. Response
// will decrease the number of inflight requests.
func (l *lifoFilter) Response(ctx filters.FilterContext) {
response(lifoKey, ctx)
}

// Config returns the scheduler configuration for the given filter
func (l *lifoGroupFilter) Config() scheduler.Config {
cfg, _ := l.getConfig()
Expand Down Expand Up @@ -174,19 +294,32 @@ func (l *lifoGroupFilter) Key() string {
return l.key
}

// SetKey is a noop to implement the LIFOFilter interface
func (*lifoGroupFilter) SetKey(string) {}

// Request is the filter.Filter interface implementation. Request will
// increase the number of inflight requests and respond to the caller,
// if the bounded stack returns an error. Status code by Error:
//
// - 503 if jobstack.ErrStackFull
// - 502 if jobstack.ErrTimeout
func (l *lifoGroupFilter) Request(ctx filters.FilterContext) {
if l.stack == nil {
log.Warningf("Unexpected scheduler.Stack is nil for key %s", lifoGroupKey)
request(l.GetStack(), lifoGroupKey, ctx)
}

// Response is the filter.Filter interface implementation. Response
// will decrease the number of inflight requests.
func (l *lifoGroupFilter) Response(ctx filters.FilterContext) {
response(lifoGroupKey, ctx)
}

func request(s *scheduler.Stack, key string, ctx filters.FilterContext) {
if s == nil {
log.Warningf("Unexpected scheduler.Stack is nil for key %s", key)
return
}

done, err := l.stack.Ready()
done, err := s.Ready()
if err != nil {
// TODO:
// - replace the log with metrics
Expand All @@ -207,14 +340,11 @@ func (l *lifoGroupFilter) Request(ctx filters.FilterContext) {
return
}

ctx.StateBag()[lifoGroupKey] = done
ctx.StateBag()[key] = done

}

// Response is the filter.Filter interface implementation. Response
// will decrease the number of inflight requests.
func (l *lifoGroupFilter) Response(ctx filters.FilterContext) {
done := ctx.StateBag()[lifoGroupKey]
func response(key string, ctx filters.FilterContext) {
done := ctx.StateBag()[key]
if done == nil {
return
}
Expand Down
2 changes: 2 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type LIFOFilter interface {
GetStack() *Stack
Config() Config
Key() string
SetKey(string)
}

func newStack(c Config) *Stack {
Expand Down Expand Up @@ -83,6 +84,7 @@ func (r *Registry) Do(routes []*routing.Route) []*routing.Route {
cf, ok := fi.Filter.(LIFOFilter)
if ok {
c := cf.Config()
cf.SetKey(ri.Id)
key := cf.Key()
s, ok := r.getStack(key)
if !ok {
Expand Down
18 changes: 17 additions & 1 deletion scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,34 @@ func TestScheduler(t *testing.T) {
doc: `r1: * -> setPath("/bar") -> "http://www.example.org"`,
wantErr: false,
},
{
name: "one scheduler filter lifo",
doc: `l2: * -> lifo(10, 12, "10s") -> "http://www.example.org"`,
wantErr: false,
},
{
name: "one scheduler filter lifoGroup",
doc: `r2: * -> lifoGroup("r2", 10, 12, "10s") -> "http://www.example.org"`,
wantErr: false,
},
{
name: "multiple filters with one scheduler filter lifo",
doc: `l3: * -> setPath("/bar") -> lifo(10, 12, "10s") -> setRequestHeader("X-Foo", "bar") -> "http://www.example.org"`,
wantErr: false,
},
{
name: "multiple filters with one scheduler filter lifoGroup",
doc: `r3: * -> setPath("/bar") -> lifoGroup("r3", 10, 12, "10s") -> setRequestHeader("X-Foo", "bar") -> "http://www.example.org"`,
wantErr: false,
},
{
name: "multiple routes with different grouping do not interfeere",
name: "multiple routes with lifo filters do not interfere",
doc: `l4: Path("/l4") -> setPath("/bar") -> lifo(10, 12, "10s") -> "http://www.example.org"; l5: Path("/l5") -> setPath("/foo") -> lifo(15, 2, "11s") -> setRequestHeader("X-Foo", "bar")-> "http://www.example.org";`,
paths: [][]string{[]string{"l4"}, []string{"l5"}},
wantErr: false,
},
{
name: "multiple routes with different grouping do not interfere",
doc: `r4: Path("/r4") -> setPath("/bar") -> lifoGroup("r4", 10, 12, "10s") -> "http://www.example.org"; r5: Path("/r5") -> setPath("/foo") -> lifoGroup("r5", 15, 2, "11s") -> setRequestHeader("X-Foo", "bar")-> "http://www.example.org";`,
paths: [][]string{[]string{"r4"}, []string{"r5"}},
wantErr: false,
Expand Down

0 comments on commit 32ad2d6

Please sign in to comment.