From b1fe9c46e71a518ef55175c637b1b51956cb2428 Mon Sep 17 00:00:00 2001 From: Arko Dasgupta Date: Thu, 10 Nov 2022 16:23:14 -0800 Subject: [PATCH] Global rate limit components * `RateLimitInfraIR` - holds the config to manage the ratelimit service. * `global-ratelimit` runner that subscribes to `XdsIR` messages and translates it to `RateLimitInfraIR`. * enhance the `infrastructure` runner to subscribe to `RateLimitInfraIR` messages and translate it into platform specific ratelimit resources. Relates to https://github.com/envoyproxy/gateway/issues/670 Signed-off-by: Arko Dasgupta --- internal/cmd/server.go | 20 +++++- internal/globalratelimit/runner/runner.go | 68 ++++++++++++++++++ internal/globalratelimit/translator.go | 18 +++++ internal/infrastructure/kubernetes/infra.go | 12 ++++ internal/infrastructure/manager.go | 6 +- internal/infrastructure/runner/runner.go | 31 ++++++-- internal/ir/ratelimit_infra.go | 40 +++++++++++ internal/ir/zz_generated.deepcopy.go | 80 +++++++++++++++++++++ internal/message/types.go | 5 ++ 9 files changed, 273 insertions(+), 7 deletions(-) create mode 100644 internal/globalratelimit/runner/runner.go create mode 100644 internal/globalratelimit/translator.go create mode 100644 internal/ir/ratelimit_infra.go diff --git a/internal/cmd/server.go b/internal/cmd/server.go index 87aaf21fa0fb..54162d25832e 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -11,6 +11,7 @@ import ( "github.com/envoyproxy/gateway/internal/envoygateway/config" gatewayapirunner "github.com/envoyproxy/gateway/internal/gatewayapi/runner" + ratelimitrunner "github.com/envoyproxy/gateway/internal/globalratelimit/runner" infrarunner "github.com/envoyproxy/gateway/internal/infrastructure/runner" "github.com/envoyproxy/gateway/internal/message" providerrunner "github.com/envoyproxy/gateway/internal/provider/runner" @@ -126,12 +127,14 @@ func setupRunners(cfg *config.Server) error { return err } + rateLimitInfraIR := new(message.RateLimitInfraIR) // Start the Infra Manager Runner // It subscribes to the infraIR, translates it into Envoy Proxy infrastructure // resources such as K8s deployment and services. infraRunner := infrarunner.New(&infrarunner.Config{ - Server: *cfg, - InfraIR: infraIR, + Server: *cfg, + InfraIR: infraIR, + RateLimitInfraIR: rateLimitInfraIR, }) if err := infraRunner.Start(ctx); err != nil { return err @@ -148,6 +151,18 @@ func setupRunners(cfg *config.Server) error { return err } + // Start the Global RateLimit Runner + // It subscribes to the xds Resources and translates it to Envoy Ratelimit Service + // infrastructure and configuration. + rateLimitRunner := ratelimitrunner.New(&ratelimitrunner.Config{ + Server: *cfg, + XdsIR: xdsIR, + RateLimitInfraIR: rateLimitInfraIR, + }) + if err := rateLimitRunner.Start(ctx); err != nil { + return err + } + // Wait until done <-ctx.Done() // Close messages @@ -164,6 +179,7 @@ func setupRunners(cfg *config.Server) error { pResources.TLSRouteStatuses.Close() xdsIR.Close() infraIR.Close() + rateLimitInfraIR.Close() xds.Close() cfg.Logger.Info("shutting down") diff --git a/internal/globalratelimit/runner/runner.go b/internal/globalratelimit/runner/runner.go new file mode 100644 index 000000000000..c69d48e0ecad --- /dev/null +++ b/internal/globalratelimit/runner/runner.go @@ -0,0 +1,68 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package runner + +import ( + "context" + + "github.com/envoyproxy/gateway/internal/envoygateway/config" + "github.com/envoyproxy/gateway/internal/globalratelimit" + "github.com/envoyproxy/gateway/internal/message" +) + +type Config struct { + config.Server + XdsIR *message.XdsIR + RateLimitInfraIR *message.RateLimitInfraIR +} + +type Runner struct { + Config +} + +func (r *Runner) Name() string { + return "global-ratelimit" +} + +func New(cfg *Config) *Runner { + return &Runner{Config: *cfg} +} + +// Start starts the infrastructure runner +func (r *Runner) Start(ctx context.Context) error { + r.Logger = r.Logger.WithValues("runner", r.Name()) + go r.subscribeAndTranslate(ctx) + r.Logger.Info("started") + return nil +} + +func (r *Runner) subscribeAndTranslate(ctx context.Context) { + // Subscribe + xdsIRCh := r.XdsIR.Subscribe(ctx) + for ctx.Err() == nil { + select { + case snapshot := <-xdsIRCh: + r.Logger.Info("received a notification") + xdsIRMap := make(globalratelimit.XdsIRMap) + for key, value := range snapshot.State { + xdsIRMap[key] = value + } + // Translate to ratelimit infra IR + result, err := globalratelimit.Translate(xdsIRMap) + if err != nil { + r.Logger.Error(err, "failed to translate xds ir") + } else { + if result == nil { + r.RateLimitInfraIR.Delete(r.Name()) + } else { + // Publish ratelimit infra IR + r.RateLimitInfraIR.Store(r.Name(), result) + } + } + } + } + r.Logger.Info("subscriber shutting down") +} diff --git a/internal/globalratelimit/translator.go b/internal/globalratelimit/translator.go new file mode 100644 index 000000000000..b4ec3871bcdd --- /dev/null +++ b/internal/globalratelimit/translator.go @@ -0,0 +1,18 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package globalratelimit + +import ( + "github.com/envoyproxy/gateway/internal/ir" +) + +type XdsIRMap map[string]*ir.Xds + +// Translate converts the xdsIR into the rate limit infra IR. +func Translate(xdsIRMap XdsIRMap) (*ir.RateLimitInfra, error) { + // TODO + return nil, nil +} diff --git a/internal/infrastructure/kubernetes/infra.go b/internal/infrastructure/kubernetes/infra.go index b52094118b6d..76fb2562ec8b 100644 --- a/internal/infrastructure/kubernetes/infra.go +++ b/internal/infrastructure/kubernetes/infra.go @@ -86,3 +86,15 @@ func (i *Infra) DeleteInfra(ctx context.Context, infra *ir.Infra) error { return nil } + +// CreateOrUpdateRateLimitInfra creates the managed kube rate limit infra, if it doesn't exist. +func (i *Infra) CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error { + // TODO + return nil +} + +// DeleteRateLimitInfra removes the managed kube infra, if it doesn't exist. +func (i *Infra) DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error { + // TODO + return nil +} diff --git a/internal/infrastructure/manager.go b/internal/infrastructure/manager.go index a263dc1566c0..d42e41f0150e 100644 --- a/internal/infrastructure/manager.go +++ b/internal/infrastructure/manager.go @@ -25,8 +25,12 @@ var _ Manager = (*kubernetes.Infra)(nil) type Manager interface { // CreateOrUpdateInfra creates or updates infra. CreateOrUpdateInfra(ctx context.Context, infra *ir.Infra) error - // DeleteInfra deletes infra + // DeleteInfra deletes infra. DeleteInfra(ctx context.Context, infra *ir.Infra) error + // CreateOrUpdateRateLimitInfra creates or updates rate limit infra. + CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error + // DeleteRateLimitInfra deletes rate limit infra. + DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error } // NewManager returns a new infrastructure Manager. diff --git a/internal/infrastructure/runner/runner.go b/internal/infrastructure/runner/runner.go index cb9ec0d551cc..10c122e19f9c 100644 --- a/internal/infrastructure/runner/runner.go +++ b/internal/infrastructure/runner/runner.go @@ -16,7 +16,8 @@ import ( type Config struct { config.Server - InfraIR *message.InfraIR + InfraIR *message.InfraIR + RateLimitInfraIR *message.RateLimitInfraIR } type Runner struct { @@ -40,12 +41,13 @@ func (r *Runner) Start(ctx context.Context) error { if err != nil { r.Logger.Error(err, "failed to create new manager") } - go r.subscribeAndTranslate(ctx) + go r.subscribeToInfraIR(ctx) + go r.subscribeToRateLimitInfraIR(ctx) r.Logger.Info("started") return nil } -func (r *Runner) subscribeAndTranslate(ctx context.Context) { +func (r *Runner) subscribeToInfraIR(ctx context.Context) { // Subscribe to resources message.HandleSubscription(r.InfraIR.Subscribe(ctx), func(update message.Update[string, *ir.Infra]) { @@ -63,5 +65,26 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { } }, ) - r.Logger.Info("subscriber shutting down") + r.Logger.Info("infra subscriber shutting down") +} + +func (r *Runner) subscribeToRateLimitInfraIR(ctx context.Context) { + // Subscribe to resources + message.HandleSubscription(r.RateLimitInfraIR.Subscribe(ctx), + func(update message.Update[string, *ir.RateLimitInfra]) { + val := update.Value + + if update.Delete { + if err := r.mgr.DeleteRateLimitInfra(ctx, val); err != nil { + r.Logger.Error(err, "failed to delete rate limit infra") + } + } else { + // Manage the rate limit infra. + if err := r.mgr.CreateOrUpdateRateLimitInfra(ctx, val); err != nil { + r.Logger.Error(err, "failed to create new rate limit infra") + } + } + }, + ) + r.Logger.Info("ratelimit infra subscriber shutting down") } diff --git a/internal/ir/ratelimit_infra.go b/internal/ir/ratelimit_infra.go new file mode 100644 index 000000000000..908441988d83 --- /dev/null +++ b/internal/ir/ratelimit_infra.go @@ -0,0 +1,40 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package ir + +// RateLimitInfra defines the managed ratelimit infrastructure. +// +k8s:deepcopy-gen=true +type RateLimitInfra struct { + // Service holds the envoy rate limit service + // configuration + Service *RateLimitService + +} + +// RateLimitInfra defines managed rate limit service infrastructure. +// +k8s:deepcopy-gen=true +type RateLimitService struct { + // Config holds the YAML string representing the + // Rate limit service configuration + // https://github.com/envoyproxy/ratelimit#configuration-1 + Config *string + // Backend holds configuration associated with the backend database. + // If nil, the ratelimit runner deploys a demo redis instance (should not be used in production). + Backend *RateLimitDBBackend +} + +// RateLimitDBBackend defines the database backend properties +// associated with the rate limit service. +// +k8s:deepcopy-gen=true +type RateLimitDBBackend struct { + Redis *RateLimitRedis +} + +// RateLimitRedis defines the redis database configuration. +// +k8s:deepcopy-gen=true +type RateLimitRedis struct { +// TODO https://github.com/envoyproxy/ratelimit#redis +} diff --git a/internal/ir/zz_generated.deepcopy.go b/internal/ir/zz_generated.deepcopy.go index 6458241fdc0f..046710d73ea2 100644 --- a/internal/ir/zz_generated.deepcopy.go +++ b/internal/ir/zz_generated.deepcopy.go @@ -293,6 +293,86 @@ func (in *ProxyListener) DeepCopy() *ProxyListener { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitDBBackend) DeepCopyInto(out *RateLimitDBBackend) { + *out = *in + if in.Redis != nil { + in, out := &in.Redis, &out.Redis + *out = new(RateLimitRedis) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitDBBackend. +func (in *RateLimitDBBackend) DeepCopy() *RateLimitDBBackend { + if in == nil { + return nil + } + out := new(RateLimitDBBackend) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitInfra) DeepCopyInto(out *RateLimitInfra) { + *out = *in + if in.Service != nil { + in, out := &in.Service, &out.Service + *out = new(RateLimitService) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitInfra. +func (in *RateLimitInfra) DeepCopy() *RateLimitInfra { + if in == nil { + return nil + } + out := new(RateLimitInfra) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitRedis) DeepCopyInto(out *RateLimitRedis) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitRedis. +func (in *RateLimitRedis) DeepCopy() *RateLimitRedis { + if in == nil { + return nil + } + out := new(RateLimitRedis) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitService) DeepCopyInto(out *RateLimitService) { + *out = *in + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = new(string) + **out = **in + } + if in.Backend != nil { + in, out := &in.Backend, &out.Backend + *out = new(RateLimitDBBackend) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitService. +func (in *RateLimitService) DeepCopy() *RateLimitService { + if in == nil { + return nil + } + out := new(RateLimitService) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Redirect) DeepCopyInto(out *Redirect) { *out = *in diff --git a/internal/message/types.go b/internal/message/types.go index e986d5762611..7c92762fe8b3 100644 --- a/internal/message/types.go +++ b/internal/message/types.go @@ -133,6 +133,11 @@ type InfraIR struct { watchable.Map[string, *ir.Infra] } +// RateLimitInfraIR message +type RateLimitInfraIR struct { + watchable.Map[string, *ir.RateLimitInfra] +} + // Xds message type Xds struct { watchable.Map[string, *xdstypes.ResourceVersionTable]