Skip to content

Commit

Permalink
Global rate limit components
Browse files Browse the repository at this point in the history
* `RateLimitInfraIR` - holds the config to manage
the ratelimit service.
* `global-ratelimit` runner that subscribes to `XdsIR` messages
and translates it to `RateLimitInfraIR`.
* enhance the `infrastructure` runner to subscribe to `RateLimitInfraIR`
messages and translate it into platform specific ratelimit resources.

Relates to #670

Signed-off-by: Arko Dasgupta <arko@tetrate.io>
  • Loading branch information
arkodg committed Nov 11, 2022
1 parent ee75e3b commit b1fe9c4
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 7 deletions.
20 changes: 18 additions & 2 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/envoyproxy/gateway/internal/envoygateway/config"
gatewayapirunner "github.com/envoyproxy/gateway/internal/gatewayapi/runner"
ratelimitrunner "github.com/envoyproxy/gateway/internal/globalratelimit/runner"
infrarunner "github.com/envoyproxy/gateway/internal/infrastructure/runner"
"github.com/envoyproxy/gateway/internal/message"
providerrunner "github.com/envoyproxy/gateway/internal/provider/runner"
Expand Down Expand Up @@ -126,12 +127,14 @@ func setupRunners(cfg *config.Server) error {
return err
}

rateLimitInfraIR := new(message.RateLimitInfraIR)
// Start the Infra Manager Runner
// It subscribes to the infraIR, translates it into Envoy Proxy infrastructure
// resources such as K8s deployment and services.
infraRunner := infrarunner.New(&infrarunner.Config{
Server: *cfg,
InfraIR: infraIR,
Server: *cfg,
InfraIR: infraIR,
RateLimitInfraIR: rateLimitInfraIR,
})
if err := infraRunner.Start(ctx); err != nil {
return err
Expand All @@ -148,6 +151,18 @@ func setupRunners(cfg *config.Server) error {
return err
}

// Start the Global RateLimit Runner
// It subscribes to the xds Resources and translates it to Envoy Ratelimit Service
// infrastructure and configuration.
rateLimitRunner := ratelimitrunner.New(&ratelimitrunner.Config{
Server: *cfg,
XdsIR: xdsIR,
RateLimitInfraIR: rateLimitInfraIR,
})
if err := rateLimitRunner.Start(ctx); err != nil {
return err
}

// Wait until done
<-ctx.Done()
// Close messages
Expand All @@ -164,6 +179,7 @@ func setupRunners(cfg *config.Server) error {
pResources.TLSRouteStatuses.Close()
xdsIR.Close()
infraIR.Close()
rateLimitInfraIR.Close()
xds.Close()

cfg.Logger.Info("shutting down")
Expand Down
68 changes: 68 additions & 0 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package runner

import (
"context"

"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/globalratelimit"
"github.com/envoyproxy/gateway/internal/message"
)

type Config struct {
config.Server
XdsIR *message.XdsIR
RateLimitInfraIR *message.RateLimitInfraIR
}

type Runner struct {
Config
}

func (r *Runner) Name() string {
return "global-ratelimit"
}

func New(cfg *Config) *Runner {
return &Runner{Config: *cfg}
}

// Start starts the infrastructure runner
func (r *Runner) Start(ctx context.Context) error {
r.Logger = r.Logger.WithValues("runner", r.Name())
go r.subscribeAndTranslate(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe
xdsIRCh := r.XdsIR.Subscribe(ctx)
for ctx.Err() == nil {
select {
case snapshot := <-xdsIRCh:
r.Logger.Info("received a notification")
xdsIRMap := make(globalratelimit.XdsIRMap)
for key, value := range snapshot.State {
xdsIRMap[key] = value
}
// Translate to ratelimit infra IR
result, err := globalratelimit.Translate(xdsIRMap)
if err != nil {
r.Logger.Error(err, "failed to translate xds ir")
} else {
if result == nil {
r.RateLimitInfraIR.Delete(r.Name())
} else {
// Publish ratelimit infra IR
r.RateLimitInfraIR.Store(r.Name(), result)
}
}
}
}
r.Logger.Info("subscriber shutting down")
}
18 changes: 18 additions & 0 deletions internal/globalratelimit/translator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package globalratelimit

import (
"github.com/envoyproxy/gateway/internal/ir"
)

type XdsIRMap map[string]*ir.Xds

// Translate converts the xdsIR into the rate limit infra IR.
func Translate(xdsIRMap XdsIRMap) (*ir.RateLimitInfra, error) {
// TODO
return nil, nil
}
12 changes: 12 additions & 0 deletions internal/infrastructure/kubernetes/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,15 @@ func (i *Infra) DeleteInfra(ctx context.Context, infra *ir.Infra) error {

return nil
}

// CreateOrUpdateRateLimitInfra creates the managed kube rate limit infra, if it doesn't exist.
func (i *Infra) CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error {
// TODO
return nil
}

// DeleteRateLimitInfra removes the managed kube infra, if it doesn't exist.
func (i *Infra) DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error {
// TODO
return nil
}
6 changes: 5 additions & 1 deletion internal/infrastructure/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ var _ Manager = (*kubernetes.Infra)(nil)
type Manager interface {
// CreateOrUpdateInfra creates or updates infra.
CreateOrUpdateInfra(ctx context.Context, infra *ir.Infra) error
// DeleteInfra deletes infra
// DeleteInfra deletes infra.
DeleteInfra(ctx context.Context, infra *ir.Infra) error
// CreateOrUpdateRateLimitInfra creates or updates rate limit infra.
CreateOrUpdateRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error
// DeleteRateLimitInfra deletes rate limit infra.
DeleteRateLimitInfra(ctx context.Context, infra *ir.RateLimitInfra) error
}

// NewManager returns a new infrastructure Manager.
Expand Down
31 changes: 27 additions & 4 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

type Config struct {
config.Server
InfraIR *message.InfraIR
InfraIR *message.InfraIR
RateLimitInfraIR *message.RateLimitInfraIR
}

type Runner struct {
Expand All @@ -40,12 +41,13 @@ func (r *Runner) Start(ctx context.Context) error {
if err != nil {
r.Logger.Error(err, "failed to create new manager")
}
go r.subscribeAndTranslate(ctx)
go r.subscribeToInfraIR(ctx)
go r.subscribeToRateLimitInfraIR(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
func (r *Runner) subscribeToInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra]) {
Expand All @@ -63,5 +65,26 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
}
},
)
r.Logger.Info("subscriber shutting down")
r.Logger.Info("infra subscriber shutting down")
}

func (r *Runner) subscribeToRateLimitInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.RateLimitInfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.RateLimitInfra]) {
val := update.Value

if update.Delete {
if err := r.mgr.DeleteRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete rate limit infra")
}
} else {
// Manage the rate limit infra.
if err := r.mgr.CreateOrUpdateRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new rate limit infra")
}
}
},
)
r.Logger.Info("ratelimit infra subscriber shutting down")
}
40 changes: 40 additions & 0 deletions internal/ir/ratelimit_infra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package ir

// RateLimitInfra defines the managed ratelimit infrastructure.
// +k8s:deepcopy-gen=true
type RateLimitInfra struct {
// Service holds the envoy rate limit service
// configuration
Service *RateLimitService

}

// RateLimitInfra defines managed rate limit service infrastructure.
// +k8s:deepcopy-gen=true
type RateLimitService struct {
// Config holds the YAML string representing the
// Rate limit service configuration
// https://github.com/envoyproxy/ratelimit#configuration-1
Config *string
// Backend holds configuration associated with the backend database.
// If nil, the ratelimit runner deploys a demo redis instance (should not be used in production).
Backend *RateLimitDBBackend
}

// RateLimitDBBackend defines the database backend properties
// associated with the rate limit service.
// +k8s:deepcopy-gen=true
type RateLimitDBBackend struct {
Redis *RateLimitRedis
}

// RateLimitRedis defines the redis database configuration.
// +k8s:deepcopy-gen=true
type RateLimitRedis struct {
// TODO https://github.com/envoyproxy/ratelimit#redis
}
80 changes: 80 additions & 0 deletions internal/ir/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions internal/message/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ type InfraIR struct {
watchable.Map[string, *ir.Infra]
}

// RateLimitInfraIR message
type RateLimitInfraIR struct {
watchable.Map[string, *ir.RateLimitInfra]
}

// Xds message
type Xds struct {
watchable.Map[string, *xdstypes.ResourceVersionTable]
Expand Down

0 comments on commit b1fe9c4

Please sign in to comment.