Skip to content

Commit

Permalink
Global rate limit components
Browse files Browse the repository at this point in the history
* `RateLimitInfraIR` - holds the config to manage
the ratelimit service.
* `global-ratelimit` runner that subscribes to `XdsIR` messages
and translates it to `RateLimitInfraIR`.
* enhance the `infrastructure` runner to subscribe to `RateLimitInfraIR`
messages and translate it into platform specific ratelimit resources.

Relates to envoyproxy#670

Signed-off-by: Arko Dasgupta <arko@tetrate.io>
  • Loading branch information
arkodg committed Jan 18, 2023
1 parent 12eb305 commit ac910bd
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 13 deletions.
20 changes: 18 additions & 2 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/envoyproxy/gateway/internal/envoygateway/config"
gatewayapirunner "github.com/envoyproxy/gateway/internal/gatewayapi/runner"
ratelimitrunner "github.com/envoyproxy/gateway/internal/globalratelimit/runner"
infrarunner "github.com/envoyproxy/gateway/internal/infrastructure/runner"
"github.com/envoyproxy/gateway/internal/message"
providerrunner "github.com/envoyproxy/gateway/internal/provider/runner"
Expand Down Expand Up @@ -130,12 +131,14 @@ func setupRunners(cfg *config.Server) error {
return err
}

rateLimitInfraIR := new(message.RateLimitInfraIR)
// Start the Infra Manager Runner
// It subscribes to the infraIR, translates it into Envoy Proxy infrastructure
// resources such as K8s deployment and services.
infraRunner := infrarunner.New(&infrarunner.Config{
Server: *cfg,
InfraIR: infraIR,
Server: *cfg,
InfraIR: infraIR,
RateLimitInfraIR: rateLimitInfraIR,
})
if err := infraRunner.Start(ctx); err != nil {
return err
Expand All @@ -152,12 +155,25 @@ func setupRunners(cfg *config.Server) error {
return err
}

// Start the Global RateLimit Runner
// It subscribes to the xds Resources and translates it to Envoy Ratelimit Service
// infrastructure and configuration.
rateLimitRunner := ratelimitrunner.New(&ratelimitrunner.Config{
Server: *cfg,
XdsIR: xdsIR,
RateLimitInfraIR: rateLimitInfraIR,
})
if err := rateLimitRunner.Start(ctx); err != nil {
return err
}

// Wait until done
<-ctx.Done()
// Close messages
pResources.Close()
xdsIR.Close()
infraIR.Close()
rateLimitInfraIR.Close()
xds.Close()

cfg.Logger.Info("shutting down")
Expand Down
102 changes: 102 additions & 0 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package runner

import (
"context"
"fmt"

"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/xds/translator"
)

type Config struct {
config.Server
XdsIR *message.XdsIR
RateLimitInfraIR *message.RateLimitInfraIR
}

type Runner struct {
Config
}

func (r *Runner) Name() string {
return "global-ratelimit"
}

func New(cfg *Config) *Runner {
return &Runner{Config: *cfg}
}

// Start starts the infrastructure runner
func (r *Runner) Start(ctx context.Context) error {
r.Logger = r.Logger.WithValues("runner", r.Name())
go r.subscribeAndTranslate(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe
xdsIRCh := r.XdsIR.Subscribe(ctx)
for ctx.Err() == nil {
var xdsIRs []*ir.Xds
snapshot := <-xdsIRCh
r.Logger.Info("received a notification")
for _, value := range snapshot.State {
xdsIRs = append(xdsIRs, value)
}
// Translate to ratelimit infra IR
result, err := r.translate(xdsIRs)
if err != nil {
r.Logger.Error(err, "failed to translate xds ir")
} else {
if result == nil {
r.RateLimitInfraIR.Delete(r.Name())
} else {
// Publish ratelimit infra IR
r.RateLimitInfraIR.Store(r.Name(), result)
}
}
}
r.Logger.Info("subscriber shutting down")
}

func (r *Runner) translate(xdsIRs []*ir.Xds) (*ir.RateLimitInfra, error) {
rlInfra := new(ir.RateLimitInfra)

// Return empty IR if ratelimit has not been enabled yet in the EnvoyGateway API
if r.EnvoyGateway.RateLimit == nil {
return nil, nil
}

for _, xdsIR := range xdsIRs {
for _, listener := range xdsIR.HTTP {
config := translator.BuildRateLimitServiceConfig(listener)
if config != nil {
str, err := translator.GetRateLimitServiceConfigStr(config)
if err != nil {
return nil, fmt.Errorf("failed to get rate limit config string: %w", err)
}
c := &ir.RateLimitServiceConfig{
Name: listener.Name,
Config: str,
}
rlInfra.Configs = append(rlInfra.Configs, c)
}
}
}

rlInfra.Backend = &ir.RateLimitDBBackend{
Redis: &ir.RateLimitRedis{
URL: r.EnvoyGateway.RateLimit.Backend.Redis.URL,
},
}

return rlInfra, nil
}
12 changes: 12 additions & 0 deletions internal/infrastructure/kubernetes/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,15 @@ func (i *Infra) DeleteInfra(ctx context.Context, infra *ir.Infra) error {

return nil
}

// CreateOrUpdateRateLimitInfra creates the managed kube rate limit infra, if it doesn't exist.
func (i *Infra) CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error {
// TODO
return nil
}

// DeleteRateLimitInfra removes the managed kube infra, if it doesn't exist.
func (i *Infra) DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error {
// TODO
return nil
}
6 changes: 5 additions & 1 deletion internal/infrastructure/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ var _ Manager = (*kubernetes.Infra)(nil)
type Manager interface {
// CreateOrUpdateInfra creates or updates infra.
CreateOrUpdateInfra(ctx context.Context, infra *ir.Infra) error
// DeleteInfra deletes infra
// DeleteInfra deletes infra.
DeleteInfra(ctx context.Context, infra *ir.Infra) error
// CreateOrUpdateRateLimitInfra creates or updates rate limit infra.
CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error
// DeleteRateLimitInfra deletes rate limit infra.
DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error
}

// NewManager returns a new infrastructure Manager.
Expand Down
31 changes: 27 additions & 4 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

type Config struct {
config.Server
InfraIR *message.InfraIR
InfraIR *message.InfraIR
RateLimitInfraIR *message.RateLimitInfraIR
}

type Runner struct {
Expand All @@ -40,12 +41,13 @@ func (r *Runner) Start(ctx context.Context) error {
if err != nil {
r.Logger.Error(err, "failed to create new manager")
}
go r.subscribeAndTranslate(ctx)
go r.subscribeToInfraIR(ctx)
go r.subscribeToRateLimitInfraIR(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
func (r *Runner) subscribeToInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra]) {
Expand All @@ -63,5 +65,26 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
}
},
)
r.Logger.Info("subscriber shutting down")
r.Logger.Info("infra subscriber shutting down")
}

func (r *Runner) subscribeToRateLimitInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.RateLimitInfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.RateLimitInfra]) {
val := update.Value

if update.Delete {
if err := r.mgr.DeleteRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete rate limit infra")
}
} else {
// Manage the rate limit infra.
if err := r.mgr.CreateOrUpdateRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new rate limit infra")
}
}
},
)
r.Logger.Info("ratelimit infra subscriber shutting down")
}
40 changes: 40 additions & 0 deletions internal/ir/ratelimit_infra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package ir

// RateLimitInfra defines managed rate limit service infrastructure.
// +k8s:deepcopy-gen=true
type RateLimitInfra struct {
// Rate limit service configuration
Configs []*RateLimitServiceConfig
// Backend holds configuration associated with the backend database.
Backend *RateLimitDBBackend
}

// RateLimitServiceConfig holds the rate limit service configurations
// defined here https://github.com/envoyproxy/ratelimit#configuration-1
// +k8s:deepcopy-gen=true
type RateLimitServiceConfig struct {
// Name of the config file.
Name string
// Config contents saved as a YAML string.
Config string
}

// RateLimitDBBackend defines the database backend properties
// associated with the rate limit service.
// +k8s:deepcopy-gen=true
type RateLimitDBBackend struct {
// Redis backend details.
Redis *RateLimitRedis
}

// RateLimitRedis defines the redis database configuration.
// +k8s:deepcopy-gen=true
type RateLimitRedis struct {
// URL of the Redis Database.
URL string
}
81 changes: 81 additions & 0 deletions internal/ir/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions internal/message/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type InfraIR struct {
watchable.Map[string, *ir.Infra]
}

// RateLimitInfraIR message
type RateLimitInfraIR struct {
watchable.Map[string, *ir.RateLimitInfra]
}

// Xds message
type Xds struct {
watchable.Map[string, *xdstypes.ResourceVersionTable]
Expand Down
Loading

0 comments on commit ac910bd

Please sign in to comment.