From 84044dae7b4869991c3beb3b07c114ffba274227 Mon Sep 17 00:00:00 2001 From: Arko Dasgupta Date: Tue, 24 Jan 2023 09:45:12 -0800 Subject: [PATCH] Global rate limit infra components (#723) * Global rate limit components * `RateLimitInfraIR` - holds the config to manage the ratelimit service. * `global-ratelimit` runner that subscribes to `XdsIR` messages and translates it to `RateLimitInfraIR`. * enhance the `infrastructure` runner to subscribe to `RateLimitInfraIR` messages and translate it into platform specific ratelimit resources. Relates to https://github.com/envoyproxy/gateway/issues/670 Signed-off-by: Arko Dasgupta --- internal/cmd/server.go | 20 +++- internal/envoygateway/config/config.go | 22 +++-- internal/envoygateway/config/config_test.go | 75 ++++++++++++++ internal/globalratelimit/runner/runner.go | 102 ++++++++++++++++++++ internal/infrastructure/kubernetes/infra.go | 12 +++ internal/infrastructure/manager.go | 6 +- internal/infrastructure/runner/runner.go | 31 +++++- internal/ir/ratelimit_infra.go | 40 ++++++++ internal/ir/zz_generated.deepcopy.go | 81 ++++++++++++++++ internal/message/types.go | 5 + internal/xds/translator/ratelimit.go | 11 +++ internal/xds/translator/translator_test.go | 8 +- 12 files changed, 394 insertions(+), 19 deletions(-) create mode 100644 internal/globalratelimit/runner/runner.go create mode 100644 internal/ir/ratelimit_infra.go diff --git a/internal/cmd/server.go b/internal/cmd/server.go index 3361c9acfbb..27d3c447205 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -11,6 +11,7 @@ import ( "github.com/envoyproxy/gateway/internal/envoygateway/config" gatewayapirunner "github.com/envoyproxy/gateway/internal/gatewayapi/runner" + ratelimitrunner "github.com/envoyproxy/gateway/internal/globalratelimit/runner" infrarunner "github.com/envoyproxy/gateway/internal/infrastructure/runner" "github.com/envoyproxy/gateway/internal/message" providerrunner "github.com/envoyproxy/gateway/internal/provider/runner" @@ -130,12 +131,14 @@ func setupRunners(cfg *config.Server) error { return err } + rateLimitInfraIR := new(message.RateLimitInfraIR) // Start the Infra Manager Runner // It subscribes to the infraIR, translates it into Envoy Proxy infrastructure // resources such as K8s deployment and services. infraRunner := infrarunner.New(&infrarunner.Config{ - Server: *cfg, - InfraIR: infraIR, + Server: *cfg, + InfraIR: infraIR, + RateLimitInfraIR: rateLimitInfraIR, }) if err := infraRunner.Start(ctx); err != nil { return err @@ -152,12 +155,25 @@ func setupRunners(cfg *config.Server) error { return err } + // Start the Global RateLimit Runner + // It subscribes to the xds Resources and translates it to Envoy Ratelimit Service + // infrastructure and configuration. + rateLimitRunner := ratelimitrunner.New(&ratelimitrunner.Config{ + Server: *cfg, + XdsIR: xdsIR, + RateLimitInfraIR: rateLimitInfraIR, + }) + if err := rateLimitRunner.Start(ctx); err != nil { + return err + } + // Wait until done <-ctx.Done() // Close messages pResources.Close() xdsIR.Close() infraIR.Close() + rateLimitInfraIR.Close() xds.Close() cfg.Logger.Info("shutting down") diff --git a/internal/envoygateway/config/config.go b/internal/envoygateway/config/config.go index 9f1859d55bb..df85d26a107 100644 --- a/internal/envoygateway/config/config.go +++ b/internal/envoygateway/config/config.go @@ -8,6 +8,7 @@ package config import ( "errors" "fmt" + "net/url" "github.com/go-logr/logr" @@ -56,17 +57,26 @@ func (s *Server) Validate() error { return errors.New("server config is unspecified") case s.EnvoyGateway == nil: return errors.New("envoy gateway config is unspecified") - case s.EnvoyGateway.EnvoyGatewaySpec.Gateway == nil: + case s.EnvoyGateway.Gateway == nil: return errors.New("gateway is unspecified") - case len(s.EnvoyGateway.EnvoyGatewaySpec.Gateway.ControllerName) == 0: + case len(s.EnvoyGateway.Gateway.ControllerName) == 0: return errors.New("gateway controllerName is unspecified") - case s.EnvoyGateway.EnvoyGatewaySpec.Provider == nil: + case s.EnvoyGateway.Provider == nil: return errors.New("provider is unspecified") - case s.EnvoyGateway.EnvoyGatewaySpec.Provider.Type != v1alpha1.ProviderTypeKubernetes: - return fmt.Errorf("unsupported provider %v", s.EnvoyGateway.EnvoyGatewaySpec.Provider.Type) + case s.EnvoyGateway.Provider.Type != v1alpha1.ProviderTypeKubernetes: + return fmt.Errorf("unsupported provider %v", s.EnvoyGateway.Provider.Type) case len(s.Namespace) == 0: return errors.New("namespace is empty string") + case s.EnvoyGateway.RateLimit != nil: + if s.EnvoyGateway.RateLimit.Backend.Type != v1alpha1.RedisBackendType { + return fmt.Errorf("unsupported ratelimit backend %v", s.EnvoyGateway.RateLimit.Backend.Type) + } + if s.EnvoyGateway.RateLimit.Backend.Redis == nil || s.EnvoyGateway.RateLimit.Backend.Redis.URL == "" { + return fmt.Errorf("empty ratelimit redis settings") + } + if _, err := url.Parse(s.EnvoyGateway.RateLimit.Backend.Redis.URL); err != nil { + return fmt.Errorf("unknown ratelimit redis url format: %w", err) + } } - return nil } diff --git a/internal/envoygateway/config/config_test.go b/internal/envoygateway/config/config_test.go index e0561f4402b..f1a5a1eb862 100644 --- a/internal/envoygateway/config/config_test.go +++ b/internal/envoygateway/config/config_test.go @@ -99,6 +99,81 @@ func TestValidate(t *testing.T) { }, expect: false, }, + { + name: "empty ratelimit", + cfg: &Server{ + EnvoyGateway: &v1alpha1.EnvoyGateway{ + EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{ + Gateway: v1alpha1.DefaultGateway(), + Provider: v1alpha1.DefaultProvider(), + RateLimit: &v1alpha1.RateLimit{}, + }, + }, + Namespace: "test-ns", + }, + expect: false, + }, + { + name: "empty ratelimit redis setting", + cfg: &Server{ + EnvoyGateway: &v1alpha1.EnvoyGateway{ + EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{ + Gateway: v1alpha1.DefaultGateway(), + Provider: v1alpha1.DefaultProvider(), + RateLimit: &v1alpha1.RateLimit{ + Backend: v1alpha1.RateLimitDatabaseBackend{ + Type: v1alpha1.RedisBackendType, + Redis: &v1alpha1.RateLimitRedisSettings{}, + }, + }, + }, + }, + Namespace: "test-ns", + }, + expect: false, + }, + { + name: "unknown ratelimit redis url format", + cfg: &Server{ + EnvoyGateway: &v1alpha1.EnvoyGateway{ + EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{ + Gateway: v1alpha1.DefaultGateway(), + Provider: v1alpha1.DefaultProvider(), + RateLimit: &v1alpha1.RateLimit{ + Backend: v1alpha1.RateLimitDatabaseBackend{ + Type: v1alpha1.RedisBackendType, + Redis: &v1alpha1.RateLimitRedisSettings{ + URL: ":foo", + }, + }, + }, + }, + }, + Namespace: "test-ns", + }, + expect: false, + }, + { + name: "happy ratelimit redis settings", + cfg: &Server{ + EnvoyGateway: &v1alpha1.EnvoyGateway{ + EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{ + Gateway: v1alpha1.DefaultGateway(), + Provider: v1alpha1.DefaultProvider(), + RateLimit: &v1alpha1.RateLimit{ + Backend: v1alpha1.RateLimitDatabaseBackend{ + Type: v1alpha1.RedisBackendType, + Redis: &v1alpha1.RateLimitRedisSettings{ + URL: "localhost:6376", + }, + }, + }, + }, + }, + Namespace: "test-ns", + }, + expect: true, + }, } for _, tc := range testCases { diff --git a/internal/globalratelimit/runner/runner.go b/internal/globalratelimit/runner/runner.go new file mode 100644 index 00000000000..46ccd96e229 --- /dev/null +++ b/internal/globalratelimit/runner/runner.go @@ -0,0 +1,102 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package runner + +import ( + "context" + "fmt" + + "github.com/envoyproxy/gateway/internal/envoygateway/config" + "github.com/envoyproxy/gateway/internal/ir" + "github.com/envoyproxy/gateway/internal/message" + "github.com/envoyproxy/gateway/internal/xds/translator" +) + +type Config struct { + config.Server + XdsIR *message.XdsIR + RateLimitInfraIR *message.RateLimitInfraIR +} + +type Runner struct { + Config +} + +func (r *Runner) Name() string { + return "global-ratelimit" +} + +func New(cfg *Config) *Runner { + return &Runner{Config: *cfg} +} + +// Start starts the infrastructure runner +func (r *Runner) Start(ctx context.Context) error { + r.Logger = r.Logger.WithValues("runner", r.Name()) + go r.subscribeAndTranslate(ctx) + r.Logger.Info("started") + return nil +} + +func (r *Runner) subscribeAndTranslate(ctx context.Context) { + // Subscribe + xdsIRCh := r.XdsIR.Subscribe(ctx) + for ctx.Err() == nil { + var xdsIRs []*ir.Xds + snapshot := <-xdsIRCh + r.Logger.Info("received a notification") + for _, value := range snapshot.State { + xdsIRs = append(xdsIRs, value) + } + // Translate to ratelimit infra IR + result, err := r.translate(xdsIRs) + if err != nil { + r.Logger.Error(err, "failed to translate xds ir") + } else { + if result == nil { + r.RateLimitInfraIR.Delete(r.Name()) + } else { + // Publish ratelimit infra IR + r.RateLimitInfraIR.Store(r.Name(), result) + } + } + } + r.Logger.Info("subscriber shutting down") +} + +func (r *Runner) translate(xdsIRs []*ir.Xds) (*ir.RateLimitInfra, error) { + rlInfra := new(ir.RateLimitInfra) + + // Return empty IR if ratelimit has not been enabled yet in the EnvoyGateway API + if r.EnvoyGateway.RateLimit == nil { + return nil, nil + } + + for _, xdsIR := range xdsIRs { + for _, listener := range xdsIR.HTTP { + config := translator.BuildRateLimitServiceConfig(listener) + if config != nil { + str, err := translator.GetRateLimitServiceConfigStr(config) + if err != nil { + return nil, fmt.Errorf("failed to get rate limit config string: %w", err) + } + c := &ir.RateLimitServiceConfig{ + Name: listener.Name, + Config: str, + } + rlInfra.Configs = append(rlInfra.Configs, c) + } + } + } + + rlInfra.Backend = &ir.RateLimitDBBackend{ + Redis: &ir.RateLimitRedis{ + URL: r.EnvoyGateway.RateLimit.Backend.Redis.URL, + }, + } + + return rlInfra, nil +} diff --git a/internal/infrastructure/kubernetes/infra.go b/internal/infrastructure/kubernetes/infra.go index e23c7fcf1ff..efce6a7779d 100644 --- a/internal/infrastructure/kubernetes/infra.go +++ b/internal/infrastructure/kubernetes/infra.go @@ -85,3 +85,15 @@ func (i *Infra) DeleteInfra(ctx context.Context, infra *ir.Infra) error { return nil } + +// CreateOrUpdateRateLimitInfra creates the managed kube rate limit infra, if it doesn't exist. +func (i *Infra) CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error { + // TODO + return nil +} + +// DeleteRateLimitInfra removes the managed kube infra, if it doesn't exist. +func (i *Infra) DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error { + // TODO + return nil +} diff --git a/internal/infrastructure/manager.go b/internal/infrastructure/manager.go index 50533d26f5d..94cf8c5531d 100644 --- a/internal/infrastructure/manager.go +++ b/internal/infrastructure/manager.go @@ -25,8 +25,12 @@ var _ Manager = (*kubernetes.Infra)(nil) type Manager interface { // CreateOrUpdateInfra creates or updates infra. CreateOrUpdateInfra(ctx context.Context, infra *ir.Infra) error - // DeleteInfra deletes infra + // DeleteInfra deletes infra. DeleteInfra(ctx context.Context, infra *ir.Infra) error + // CreateOrUpdateRateLimitInfra creates or updates rate limit infra. + CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error + // DeleteRateLimitInfra deletes rate limit infra. + DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error } // NewManager returns a new infrastructure Manager. diff --git a/internal/infrastructure/runner/runner.go b/internal/infrastructure/runner/runner.go index cb9ec0d551c..10c122e19f9 100644 --- a/internal/infrastructure/runner/runner.go +++ b/internal/infrastructure/runner/runner.go @@ -16,7 +16,8 @@ import ( type Config struct { config.Server - InfraIR *message.InfraIR + InfraIR *message.InfraIR + RateLimitInfraIR *message.RateLimitInfraIR } type Runner struct { @@ -40,12 +41,13 @@ func (r *Runner) Start(ctx context.Context) error { if err != nil { r.Logger.Error(err, "failed to create new manager") } - go r.subscribeAndTranslate(ctx) + go r.subscribeToInfraIR(ctx) + go r.subscribeToRateLimitInfraIR(ctx) r.Logger.Info("started") return nil } -func (r *Runner) subscribeAndTranslate(ctx context.Context) { +func (r *Runner) subscribeToInfraIR(ctx context.Context) { // Subscribe to resources message.HandleSubscription(r.InfraIR.Subscribe(ctx), func(update message.Update[string, *ir.Infra]) { @@ -63,5 +65,26 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { } }, ) - r.Logger.Info("subscriber shutting down") + r.Logger.Info("infra subscriber shutting down") +} + +func (r *Runner) subscribeToRateLimitInfraIR(ctx context.Context) { + // Subscribe to resources + message.HandleSubscription(r.RateLimitInfraIR.Subscribe(ctx), + func(update message.Update[string, *ir.RateLimitInfra]) { + val := update.Value + + if update.Delete { + if err := r.mgr.DeleteRateLimitInfra(ctx, val); err != nil { + r.Logger.Error(err, "failed to delete rate limit infra") + } + } else { + // Manage the rate limit infra. + if err := r.mgr.CreateOrUpdateRateLimitInfra(ctx, val); err != nil { + r.Logger.Error(err, "failed to create new rate limit infra") + } + } + }, + ) + r.Logger.Info("ratelimit infra subscriber shutting down") } diff --git a/internal/ir/ratelimit_infra.go b/internal/ir/ratelimit_infra.go new file mode 100644 index 00000000000..807fd768128 --- /dev/null +++ b/internal/ir/ratelimit_infra.go @@ -0,0 +1,40 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package ir + +// RateLimitInfra defines managed rate limit service infrastructure. +// +k8s:deepcopy-gen=true +type RateLimitInfra struct { + // Rate limit service configuration + Configs []*RateLimitServiceConfig + // Backend holds configuration associated with the backend database. + Backend *RateLimitDBBackend +} + +// RateLimitServiceConfig holds the rate limit service configurations +// defined here https://github.com/envoyproxy/ratelimit#configuration-1 +// +k8s:deepcopy-gen=true +type RateLimitServiceConfig struct { + // Name of the config file. + Name string + // Config contents saved as a YAML string. + Config string +} + +// RateLimitDBBackend defines the database backend properties +// associated with the rate limit service. +// +k8s:deepcopy-gen=true +type RateLimitDBBackend struct { + // Redis backend details. + Redis *RateLimitRedis +} + +// RateLimitRedis defines the redis database configuration. +// +k8s:deepcopy-gen=true +type RateLimitRedis struct { + // URL of the Redis Database. + URL string +} diff --git a/internal/ir/zz_generated.deepcopy.go b/internal/ir/zz_generated.deepcopy.go index 5bd23f1eb28..64778f94aa0 100644 --- a/internal/ir/zz_generated.deepcopy.go +++ b/internal/ir/zz_generated.deepcopy.go @@ -398,6 +398,72 @@ func (in *RateLimit) DeepCopy() *RateLimit { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitDBBackend) DeepCopyInto(out *RateLimitDBBackend) { + *out = *in + if in.Redis != nil { + in, out := &in.Redis, &out.Redis + *out = new(RateLimitRedis) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitDBBackend. +func (in *RateLimitDBBackend) DeepCopy() *RateLimitDBBackend { + if in == nil { + return nil + } + out := new(RateLimitDBBackend) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitInfra) DeepCopyInto(out *RateLimitInfra) { + *out = *in + if in.Configs != nil { + in, out := &in.Configs, &out.Configs + *out = make([]*RateLimitServiceConfig, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(RateLimitServiceConfig) + **out = **in + } + } + } + if in.Backend != nil { + in, out := &in.Backend, &out.Backend + *out = new(RateLimitDBBackend) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitInfra. +func (in *RateLimitInfra) DeepCopy() *RateLimitInfra { + if in == nil { + return nil + } + out := new(RateLimitInfra) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitRedis) DeepCopyInto(out *RateLimitRedis) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitRedis. +func (in *RateLimitRedis) DeepCopy() *RateLimitRedis { + if in == nil { + return nil + } + out := new(RateLimitRedis) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RateLimitRule) DeepCopyInto(out *RateLimitRule) { *out = *in @@ -429,6 +495,21 @@ func (in *RateLimitRule) DeepCopy() *RateLimitRule { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitServiceConfig) DeepCopyInto(out *RateLimitServiceConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitServiceConfig. +func (in *RateLimitServiceConfig) DeepCopy() *RateLimitServiceConfig { + if in == nil { + return nil + } + out := new(RateLimitServiceConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RateLimitValue) DeepCopyInto(out *RateLimitValue) { *out = *in diff --git a/internal/message/types.go b/internal/message/types.go index 4efef215ced..3cbf7170b56 100644 --- a/internal/message/types.go +++ b/internal/message/types.go @@ -69,6 +69,11 @@ type InfraIR struct { watchable.Map[string, *ir.Infra] } +// RateLimitInfraIR message +type RateLimitInfraIR struct { + watchable.Map[string, *ir.RateLimitInfra] +} + // Xds message type Xds struct { watchable.Map[string, *xdstypes.ResourceVersionTable] diff --git a/internal/xds/translator/ratelimit.go b/internal/xds/translator/ratelimit.go index dd00c200e81..4edf1e20065 100644 --- a/internal/xds/translator/ratelimit.go +++ b/internal/xds/translator/ratelimit.go @@ -6,6 +6,7 @@ package translator import ( + "bytes" "strconv" "time" @@ -21,6 +22,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" + goyaml "gopkg.in/yaml.v3" // nolint: depguard "github.com/envoyproxy/gateway/internal/ir" ) @@ -166,6 +168,15 @@ func buildRouteRateLimits(descriptorPrefix string, global *ir.GlobalRateLimit) [ return rateLimits } +// GetRateLimitServiceConfigStr returns the YAML string for the rate limit service configuration. +func GetRateLimitServiceConfigStr(yamlRoot *ratelimitserviceconfig.YamlRoot) (string, error) { + var buf bytes.Buffer + enc := goyaml.NewEncoder(&buf) + enc.SetIndent(2) + err := enc.Encode(*yamlRoot) + return buf.String(), err +} + // BuildRateLimitServiceConfig builds the rate limit service configuration based on // https://github.com/envoyproxy/ratelimit#the-configuration-format func BuildRateLimitServiceConfig(irListener *ir.HTTPListener) *ratelimitserviceconfig.YamlRoot { diff --git a/internal/xds/translator/translator_test.go b/internal/xds/translator/translator_test.go index 08ad2326abc..013db918545 100644 --- a/internal/xds/translator/translator_test.go +++ b/internal/xds/translator/translator_test.go @@ -17,7 +17,6 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" - goyaml "gopkg.in/yaml.v3" // nolint: depguard "sigs.k8s.io/yaml" "github.com/envoyproxy/gateway/internal/ir" @@ -196,12 +195,9 @@ func requireTestDataOutFile(t *testing.T, name ...string) string { } func requireYamlRootToYAMLString(t *testing.T, yamlRoot *ratelimitserviceconfig.YamlRoot) string { - var buf bytes.Buffer - enc := goyaml.NewEncoder(&buf) - enc.SetIndent(2) - err := enc.Encode(*yamlRoot) + str, err := GetRateLimitServiceConfigStr(yamlRoot) require.NoError(t, err) - return buf.String() + return str } func requireResourcesToYAMLString(t *testing.T, resources []types.Resource) string {