Skip to content

Commit

Permalink
Global rate limit infra components (#723)
Browse files Browse the repository at this point in the history
* Global rate limit components

* `RateLimitInfraIR` - holds the config to manage
the ratelimit service.
* `global-ratelimit` runner that subscribes to `XdsIR` messages
and translates it to `RateLimitInfraIR`.
* enhance the `infrastructure` runner to subscribe to `RateLimitInfraIR`
messages and translate it into platform specific ratelimit resources.

Relates to #670

Signed-off-by: Arko Dasgupta <arko@tetrate.io>
  • Loading branch information
arkodg authored Jan 24, 2023
1 parent 2a03c58 commit 84044da
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 19 deletions.
20 changes: 18 additions & 2 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/envoyproxy/gateway/internal/envoygateway/config"
gatewayapirunner "github.com/envoyproxy/gateway/internal/gatewayapi/runner"
ratelimitrunner "github.com/envoyproxy/gateway/internal/globalratelimit/runner"
infrarunner "github.com/envoyproxy/gateway/internal/infrastructure/runner"
"github.com/envoyproxy/gateway/internal/message"
providerrunner "github.com/envoyproxy/gateway/internal/provider/runner"
Expand Down Expand Up @@ -130,12 +131,14 @@ func setupRunners(cfg *config.Server) error {
return err
}

rateLimitInfraIR := new(message.RateLimitInfraIR)
// Start the Infra Manager Runner
// It subscribes to the infraIR, translates it into Envoy Proxy infrastructure
// resources such as K8s deployment and services.
infraRunner := infrarunner.New(&infrarunner.Config{
Server: *cfg,
InfraIR: infraIR,
Server: *cfg,
InfraIR: infraIR,
RateLimitInfraIR: rateLimitInfraIR,
})
if err := infraRunner.Start(ctx); err != nil {
return err
Expand All @@ -152,12 +155,25 @@ func setupRunners(cfg *config.Server) error {
return err
}

// Start the Global RateLimit Runner
// It subscribes to the xds Resources and translates it to Envoy Ratelimit Service
// infrastructure and configuration.
rateLimitRunner := ratelimitrunner.New(&ratelimitrunner.Config{
Server: *cfg,
XdsIR: xdsIR,
RateLimitInfraIR: rateLimitInfraIR,
})
if err := rateLimitRunner.Start(ctx); err != nil {
return err
}

// Wait until done
<-ctx.Done()
// Close messages
pResources.Close()
xdsIR.Close()
infraIR.Close()
rateLimitInfraIR.Close()
xds.Close()

cfg.Logger.Info("shutting down")
Expand Down
22 changes: 16 additions & 6 deletions internal/envoygateway/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package config
import (
"errors"
"fmt"
"net/url"

"github.com/go-logr/logr"

Expand Down Expand Up @@ -56,17 +57,26 @@ func (s *Server) Validate() error {
return errors.New("server config is unspecified")
case s.EnvoyGateway == nil:
return errors.New("envoy gateway config is unspecified")
case s.EnvoyGateway.EnvoyGatewaySpec.Gateway == nil:
case s.EnvoyGateway.Gateway == nil:
return errors.New("gateway is unspecified")
case len(s.EnvoyGateway.EnvoyGatewaySpec.Gateway.ControllerName) == 0:
case len(s.EnvoyGateway.Gateway.ControllerName) == 0:
return errors.New("gateway controllerName is unspecified")
case s.EnvoyGateway.EnvoyGatewaySpec.Provider == nil:
case s.EnvoyGateway.Provider == nil:
return errors.New("provider is unspecified")
case s.EnvoyGateway.EnvoyGatewaySpec.Provider.Type != v1alpha1.ProviderTypeKubernetes:
return fmt.Errorf("unsupported provider %v", s.EnvoyGateway.EnvoyGatewaySpec.Provider.Type)
case s.EnvoyGateway.Provider.Type != v1alpha1.ProviderTypeKubernetes:
return fmt.Errorf("unsupported provider %v", s.EnvoyGateway.Provider.Type)
case len(s.Namespace) == 0:
return errors.New("namespace is empty string")
case s.EnvoyGateway.RateLimit != nil:
if s.EnvoyGateway.RateLimit.Backend.Type != v1alpha1.RedisBackendType {
return fmt.Errorf("unsupported ratelimit backend %v", s.EnvoyGateway.RateLimit.Backend.Type)
}
if s.EnvoyGateway.RateLimit.Backend.Redis == nil || s.EnvoyGateway.RateLimit.Backend.Redis.URL == "" {
return fmt.Errorf("empty ratelimit redis settings")
}
if _, err := url.Parse(s.EnvoyGateway.RateLimit.Backend.Redis.URL); err != nil {
return fmt.Errorf("unknown ratelimit redis url format: %w", err)
}
}

return nil
}
75 changes: 75 additions & 0 deletions internal/envoygateway/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,81 @@ func TestValidate(t *testing.T) {
},
expect: false,
},
{
name: "empty ratelimit",
cfg: &Server{
EnvoyGateway: &v1alpha1.EnvoyGateway{
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: v1alpha1.DefaultProvider(),
RateLimit: &v1alpha1.RateLimit{},
},
},
Namespace: "test-ns",
},
expect: false,
},
{
name: "empty ratelimit redis setting",
cfg: &Server{
EnvoyGateway: &v1alpha1.EnvoyGateway{
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: v1alpha1.DefaultProvider(),
RateLimit: &v1alpha1.RateLimit{
Backend: v1alpha1.RateLimitDatabaseBackend{
Type: v1alpha1.RedisBackendType,
Redis: &v1alpha1.RateLimitRedisSettings{},
},
},
},
},
Namespace: "test-ns",
},
expect: false,
},
{
name: "unknown ratelimit redis url format",
cfg: &Server{
EnvoyGateway: &v1alpha1.EnvoyGateway{
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: v1alpha1.DefaultProvider(),
RateLimit: &v1alpha1.RateLimit{
Backend: v1alpha1.RateLimitDatabaseBackend{
Type: v1alpha1.RedisBackendType,
Redis: &v1alpha1.RateLimitRedisSettings{
URL: ":foo",
},
},
},
},
},
Namespace: "test-ns",
},
expect: false,
},
{
name: "happy ratelimit redis settings",
cfg: &Server{
EnvoyGateway: &v1alpha1.EnvoyGateway{
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: v1alpha1.DefaultProvider(),
RateLimit: &v1alpha1.RateLimit{
Backend: v1alpha1.RateLimitDatabaseBackend{
Type: v1alpha1.RedisBackendType,
Redis: &v1alpha1.RateLimitRedisSettings{
URL: "localhost:6376",
},
},
},
},
},
Namespace: "test-ns",
},
expect: true,
},
}

for _, tc := range testCases {
Expand Down
102 changes: 102 additions & 0 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package runner

import (
"context"
"fmt"

"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/xds/translator"
)

type Config struct {
config.Server
XdsIR *message.XdsIR
RateLimitInfraIR *message.RateLimitInfraIR
}

type Runner struct {
Config
}

func (r *Runner) Name() string {
return "global-ratelimit"
}

func New(cfg *Config) *Runner {
return &Runner{Config: *cfg}
}

// Start starts the infrastructure runner
func (r *Runner) Start(ctx context.Context) error {
r.Logger = r.Logger.WithValues("runner", r.Name())
go r.subscribeAndTranslate(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe
xdsIRCh := r.XdsIR.Subscribe(ctx)
for ctx.Err() == nil {
var xdsIRs []*ir.Xds
snapshot := <-xdsIRCh
r.Logger.Info("received a notification")
for _, value := range snapshot.State {
xdsIRs = append(xdsIRs, value)
}
// Translate to ratelimit infra IR
result, err := r.translate(xdsIRs)
if err != nil {
r.Logger.Error(err, "failed to translate xds ir")
} else {
if result == nil {
r.RateLimitInfraIR.Delete(r.Name())
} else {
// Publish ratelimit infra IR
r.RateLimitInfraIR.Store(r.Name(), result)
}
}
}
r.Logger.Info("subscriber shutting down")
}

func (r *Runner) translate(xdsIRs []*ir.Xds) (*ir.RateLimitInfra, error) {
rlInfra := new(ir.RateLimitInfra)

// Return empty IR if ratelimit has not been enabled yet in the EnvoyGateway API
if r.EnvoyGateway.RateLimit == nil {
return nil, nil
}

for _, xdsIR := range xdsIRs {
for _, listener := range xdsIR.HTTP {
config := translator.BuildRateLimitServiceConfig(listener)
if config != nil {
str, err := translator.GetRateLimitServiceConfigStr(config)
if err != nil {
return nil, fmt.Errorf("failed to get rate limit config string: %w", err)
}
c := &ir.RateLimitServiceConfig{
Name: listener.Name,
Config: str,
}
rlInfra.Configs = append(rlInfra.Configs, c)
}
}
}

rlInfra.Backend = &ir.RateLimitDBBackend{
Redis: &ir.RateLimitRedis{
URL: r.EnvoyGateway.RateLimit.Backend.Redis.URL,
},
}

return rlInfra, nil
}
12 changes: 12 additions & 0 deletions internal/infrastructure/kubernetes/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,15 @@ func (i *Infra) DeleteInfra(ctx context.Context, infra *ir.Infra) error {

return nil
}

// CreateOrUpdateRateLimitInfra creates the managed kube rate limit infra, if it doesn't exist.
func (i *Infra) CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error {
// TODO
return nil
}

// DeleteRateLimitInfra removes the managed kube infra, if it doesn't exist.
func (i *Infra) DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error {
// TODO
return nil
}
6 changes: 5 additions & 1 deletion internal/infrastructure/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ var _ Manager = (*kubernetes.Infra)(nil)
type Manager interface {
// CreateOrUpdateInfra creates or updates infra.
CreateOrUpdateInfra(ctx context.Context, infra *ir.Infra) error
// DeleteInfra deletes infra
// DeleteInfra deletes infra.
DeleteInfra(ctx context.Context, infra *ir.Infra) error
// CreateOrUpdateRateLimitInfra creates or updates rate limit infra.
CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error
// DeleteRateLimitInfra deletes rate limit infra.
DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error
}

// NewManager returns a new infrastructure Manager.
Expand Down
31 changes: 27 additions & 4 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

type Config struct {
config.Server
InfraIR *message.InfraIR
InfraIR *message.InfraIR
RateLimitInfraIR *message.RateLimitInfraIR
}

type Runner struct {
Expand All @@ -40,12 +41,13 @@ func (r *Runner) Start(ctx context.Context) error {
if err != nil {
r.Logger.Error(err, "failed to create new manager")
}
go r.subscribeAndTranslate(ctx)
go r.subscribeToInfraIR(ctx)
go r.subscribeToRateLimitInfraIR(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
func (r *Runner) subscribeToInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra]) {
Expand All @@ -63,5 +65,26 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
}
},
)
r.Logger.Info("subscriber shutting down")
r.Logger.Info("infra subscriber shutting down")
}

func (r *Runner) subscribeToRateLimitInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.RateLimitInfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.RateLimitInfra]) {
val := update.Value

if update.Delete {
if err := r.mgr.DeleteRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete rate limit infra")
}
} else {
// Manage the rate limit infra.
if err := r.mgr.CreateOrUpdateRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new rate limit infra")
}
}
},
)
r.Logger.Info("ratelimit infra subscriber shutting down")
}
Loading

0 comments on commit 84044da

Please sign in to comment.