Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global rate limit infra components #723

Merged
merged 2 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/envoyproxy/gateway/internal/envoygateway/config"
gatewayapirunner "github.com/envoyproxy/gateway/internal/gatewayapi/runner"
ratelimitrunner "github.com/envoyproxy/gateway/internal/globalratelimit/runner"
infrarunner "github.com/envoyproxy/gateway/internal/infrastructure/runner"
"github.com/envoyproxy/gateway/internal/message"
providerrunner "github.com/envoyproxy/gateway/internal/provider/runner"
Expand Down Expand Up @@ -130,12 +131,14 @@ func setupRunners(cfg *config.Server) error {
return err
}

rateLimitInfraIR := new(message.RateLimitInfraIR)
// Start the Infra Manager Runner
// It subscribes to the infraIR, translates it into Envoy Proxy infrastructure
// resources such as K8s deployment and services.
infraRunner := infrarunner.New(&infrarunner.Config{
Server: *cfg,
InfraIR: infraIR,
Server: *cfg,
InfraIR: infraIR,
RateLimitInfraIR: rateLimitInfraIR,
})
if err := infraRunner.Start(ctx); err != nil {
return err
Expand All @@ -152,12 +155,25 @@ func setupRunners(cfg *config.Server) error {
return err
}

// Start the Global RateLimit Runner
// It subscribes to the xds Resources and translates it to Envoy Ratelimit Service
// infrastructure and configuration.
rateLimitRunner := ratelimitrunner.New(&ratelimitrunner.Config{
Server: *cfg,
XdsIR: xdsIR,
RateLimitInfraIR: rateLimitInfraIR,
})
if err := rateLimitRunner.Start(ctx); err != nil {
return err
}

// Wait until done
<-ctx.Done()
// Close messages
pResources.Close()
xdsIR.Close()
infraIR.Close()
rateLimitInfraIR.Close()
xds.Close()

cfg.Logger.Info("shutting down")
Expand Down
22 changes: 16 additions & 6 deletions internal/envoygateway/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package config
import (
"errors"
"fmt"
"net/url"

"github.com/go-logr/logr"

Expand Down Expand Up @@ -56,17 +57,26 @@ func (s *Server) Validate() error {
return errors.New("server config is unspecified")
case s.EnvoyGateway == nil:
return errors.New("envoy gateway config is unspecified")
case s.EnvoyGateway.EnvoyGatewaySpec.Gateway == nil:
case s.EnvoyGateway.Gateway == nil:
return errors.New("gateway is unspecified")
case len(s.EnvoyGateway.EnvoyGatewaySpec.Gateway.ControllerName) == 0:
case len(s.EnvoyGateway.Gateway.ControllerName) == 0:
return errors.New("gateway controllerName is unspecified")
case s.EnvoyGateway.EnvoyGatewaySpec.Provider == nil:
case s.EnvoyGateway.Provider == nil:
return errors.New("provider is unspecified")
case s.EnvoyGateway.EnvoyGatewaySpec.Provider.Type != v1alpha1.ProviderTypeKubernetes:
return fmt.Errorf("unsupported provider %v", s.EnvoyGateway.EnvoyGatewaySpec.Provider.Type)
case s.EnvoyGateway.Provider.Type != v1alpha1.ProviderTypeKubernetes:
return fmt.Errorf("unsupported provider %v", s.EnvoyGateway.Provider.Type)
case len(s.Namespace) == 0:
return errors.New("namespace is empty string")
case s.EnvoyGateway.RateLimit != nil:
danehans marked this conversation as resolved.
Show resolved Hide resolved
if s.EnvoyGateway.RateLimit.Backend.Type != v1alpha1.RedisBackendType {
return fmt.Errorf("unsupported ratelimit backend %v", s.EnvoyGateway.RateLimit.Backend.Type)
}
if s.EnvoyGateway.RateLimit.Backend.Redis == nil || s.EnvoyGateway.RateLimit.Backend.Redis.URL == "" {
return fmt.Errorf("empty ratelimit redis settings")
}
if _, err := url.Parse(s.EnvoyGateway.RateLimit.Backend.Redis.URL); err != nil {
return fmt.Errorf("unknown ratelimit redis url format: %w", err)
}
}

return nil
}
75 changes: 75 additions & 0 deletions internal/envoygateway/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,81 @@ func TestValidate(t *testing.T) {
},
expect: false,
},
{
name: "empty ratelimit",
cfg: &Server{
EnvoyGateway: &v1alpha1.EnvoyGateway{
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: v1alpha1.DefaultProvider(),
RateLimit: &v1alpha1.RateLimit{},
},
},
Namespace: "test-ns",
},
expect: false,
},
{
name: "empty ratelimit redis setting",
cfg: &Server{
EnvoyGateway: &v1alpha1.EnvoyGateway{
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: v1alpha1.DefaultProvider(),
RateLimit: &v1alpha1.RateLimit{
Backend: v1alpha1.RateLimitDatabaseBackend{
Type: v1alpha1.RedisBackendType,
Redis: &v1alpha1.RateLimitRedisSettings{},
},
},
},
},
Namespace: "test-ns",
},
expect: false,
},
{
name: "unknown ratelimit redis url format",
cfg: &Server{
EnvoyGateway: &v1alpha1.EnvoyGateway{
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: v1alpha1.DefaultProvider(),
RateLimit: &v1alpha1.RateLimit{
Backend: v1alpha1.RateLimitDatabaseBackend{
Type: v1alpha1.RedisBackendType,
Redis: &v1alpha1.RateLimitRedisSettings{
URL: ":foo",
},
},
},
},
},
Namespace: "test-ns",
},
expect: false,
},
{
name: "happy ratelimit redis settings",
cfg: &Server{
EnvoyGateway: &v1alpha1.EnvoyGateway{
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: v1alpha1.DefaultProvider(),
RateLimit: &v1alpha1.RateLimit{
Backend: v1alpha1.RateLimitDatabaseBackend{
Type: v1alpha1.RedisBackendType,
Redis: &v1alpha1.RateLimitRedisSettings{
URL: "localhost:6376",
},
},
},
},
},
Namespace: "test-ns",
},
expect: true,
},
}

for _, tc := range testCases {
Expand Down
102 changes: 102 additions & 0 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package runner

import (
"context"
"fmt"

"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/xds/translator"
)

type Config struct {
config.Server
XdsIR *message.XdsIR
RateLimitInfraIR *message.RateLimitInfraIR
}

type Runner struct {
Config
}

func (r *Runner) Name() string {
return "global-ratelimit"
}

func New(cfg *Config) *Runner {
return &Runner{Config: *cfg}
}

// Start starts the infrastructure runner
func (r *Runner) Start(ctx context.Context) error {
r.Logger = r.Logger.WithValues("runner", r.Name())
go r.subscribeAndTranslate(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe
xdsIRCh := r.XdsIR.Subscribe(ctx)
for ctx.Err() == nil {
var xdsIRs []*ir.Xds
snapshot := <-xdsIRCh
r.Logger.Info("received a notification")
for _, value := range snapshot.State {
xdsIRs = append(xdsIRs, value)
}
// Translate to ratelimit infra IR
result, err := r.translate(xdsIRs)
if err != nil {
r.Logger.Error(err, "failed to translate xds ir")
} else {
if result == nil {
r.RateLimitInfraIR.Delete(r.Name())
} else {
// Publish ratelimit infra IR
r.RateLimitInfraIR.Store(r.Name(), result)
}
}
}
r.Logger.Info("subscriber shutting down")
}

func (r *Runner) translate(xdsIRs []*ir.Xds) (*ir.RateLimitInfra, error) {
danehans marked this conversation as resolved.
Show resolved Hide resolved
rlInfra := new(ir.RateLimitInfra)

// Return empty IR if ratelimit has not been enabled yet in the EnvoyGateway API
if r.EnvoyGateway.RateLimit == nil {
return nil, nil
}

for _, xdsIR := range xdsIRs {
for _, listener := range xdsIR.HTTP {
config := translator.BuildRateLimitServiceConfig(listener)
if config != nil {
str, err := translator.GetRateLimitServiceConfigStr(config)
if err != nil {
return nil, fmt.Errorf("failed to get rate limit config string: %w", err)
}
c := &ir.RateLimitServiceConfig{
Name: listener.Name,
Config: str,
}
rlInfra.Configs = append(rlInfra.Configs, c)
}
}
}

rlInfra.Backend = &ir.RateLimitDBBackend{
Redis: &ir.RateLimitRedis{
URL: r.EnvoyGateway.RateLimit.Backend.Redis.URL,
},
}

return rlInfra, nil
}
12 changes: 12 additions & 0 deletions internal/infrastructure/kubernetes/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,15 @@ func (i *Infra) DeleteInfra(ctx context.Context, infra *ir.Infra) error {

return nil
}

// CreateOrUpdateRateLimitInfra creates the managed kube rate limit infra, if it doesn't exist.
func (i *Infra) CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error {
// TODO
return nil
}

// DeleteRateLimitInfra removes the managed kube infra, if it doesn't exist.
func (i *Infra) DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error {
// TODO
return nil
}
6 changes: 5 additions & 1 deletion internal/infrastructure/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ var _ Manager = (*kubernetes.Infra)(nil)
type Manager interface {
// CreateOrUpdateInfra creates or updates infra.
CreateOrUpdateInfra(ctx context.Context, infra *ir.Infra) error
// DeleteInfra deletes infra
// DeleteInfra deletes infra.
DeleteInfra(ctx context.Context, infra *ir.Infra) error
// CreateOrUpdateRateLimitInfra creates or updates rate limit infra.
CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error
// DeleteRateLimitInfra deletes rate limit infra.
DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error
}

// NewManager returns a new infrastructure Manager.
Expand Down
31 changes: 27 additions & 4 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

type Config struct {
config.Server
InfraIR *message.InfraIR
InfraIR *message.InfraIR
RateLimitInfraIR *message.RateLimitInfraIR
}

type Runner struct {
Expand All @@ -40,12 +41,13 @@ func (r *Runner) Start(ctx context.Context) error {
if err != nil {
r.Logger.Error(err, "failed to create new manager")
}
go r.subscribeAndTranslate(ctx)
go r.subscribeToInfraIR(ctx)
go r.subscribeToRateLimitInfraIR(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
func (r *Runner) subscribeToInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra]) {
Expand All @@ -63,5 +65,26 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
}
},
)
r.Logger.Info("subscriber shutting down")
r.Logger.Info("infra subscriber shutting down")
}

func (r *Runner) subscribeToRateLimitInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.RateLimitInfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.RateLimitInfra]) {
val := update.Value

if update.Delete {
if err := r.mgr.DeleteRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete rate limit infra")
}
} else {
// Manage the rate limit infra.
if err := r.mgr.CreateOrUpdateRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new rate limit infra")
}
}
},
)
r.Logger.Info("ratelimit infra subscriber shutting down")
}
Loading