Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eguzki committed Jun 6, 2024
1 parent 3ed055b commit c026986
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 3 deletions.
1 change: 1 addition & 0 deletions controllers/limitador_cluster_envoyfilter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2"
"github.com/kuadrant/kuadrant-operator/pkg/common"
kuadrantistioutils "github.com/kuadrant/kuadrant-operator/pkg/istio"
kuadrantgatewayapi "github.com/kuadrant/kuadrant-operator/pkg/library/gatewayapi"
"github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant"
"github.com/kuadrant/kuadrant-operator/pkg/library/mappers"
"github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers"
Expand Down
57 changes: 54 additions & 3 deletions controllers/rate_limiting_limits_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ package controllers
import (
"context"
"encoding/json"
"fmt"
"slices"
"sort"

"github.com/go-logr/logr"
"github.com/samber/lo"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -97,13 +101,11 @@ func (r *RateLimitingLimitsReconciler) Reconcile(eventCtx context.Context, req c
return ctrl.Result{}, nil
}

rlps, err := r.readRLPs(ctx, limitadorCR.GetNamespace())
rateLimitIndexFromCluster, err := r.buildRateLimitIndexFromCluster(ctx)

Check failure on line 104 in controllers/rate_limiting_limits_controller.go

View workflow job for this annotation

GitHub Actions / Lint

not enough arguments in call to r.buildRateLimitIndexFromCluster
if err != nil {
return ctrl.Result{}, err
}

rateLimitIndex := rlptools.NewRateLimitIndex()

for _, rlp := range rlps {

Check failure on line 109 in controllers/rate_limiting_limits_controller.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: rlps (typecheck)
if _, ok := rateLimitIndex.Get(client.ObjectKeyFromObject(rlp)); ok {

Check failure on line 110 in controllers/rate_limiting_limits_controller.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: rateLimitIndex (typecheck)
continue
Expand All @@ -130,6 +132,55 @@ func (r *RateLimitingLimitsReconciler) Reconcile(eventCtx context.Context, req c
return ctrl.Result{}, nil
}

func (r *RateLimitingLimitsReconciler) buildRateLimitIndexFromCluster(ctx context.Context, topology *kuadrantgatewayapi.Topology) (*rlptools.RateLimitIndex, error) {
logger, err := logr.FromContext(ctx)
if err != nil {
return nil, err
}

gateways := lo.KeyBy(topology.Gateways(), func(gateway kuadrantgatewayapi.GatewayNode) string {
return client.ObjectKeyFromObject(gateway.Gateway).String()
})

// sort the gateways for deterministic output and consistent comparison against existing objects
gatewayNames := lo.Keys(gateways)
slices.Sort(gatewayNames)

rateLimitIndex := rlptools.NewRateLimitIndex()

for _, gatewayName := range gatewayNames {
gateway := gateways[gatewayName].Gateway
topologyWithOverrides, err := rlptools.ApplyOverrides(topology, gateway)
if err != nil {
logger.Error(err, "failed to apply overrides")
return nil
}

// sort the policies for deterministic output and consistent comparison against existing objects
indexes := kuadrantgatewayapi.NewTopologyIndexes(topologyWithOverrides)
policies := indexes.PoliciesFromGateway(gateway)
sort.Sort(kuadrantgatewayapi.PolicyByTargetRefKindAndCreationTimeStamp(policies))

logger.V(1).Info("new rate limit index", "gateway", client.ObjectKeyFromObject(gateway), "policies", lo.Map(policies, func(p kuadrantgatewayapi.Policy, _ int) string { return client.ObjectKeyFromObject(p).String() }))

for _, policy := range policies {
rlpKey := client.ObjectKeyFromObject(policy)
gatewayKey := client.ObjectKeyFromObject(gateway)
key := rlptools.RateLimitIndexKey{
RateLimitPolicyKey: rlpKey,
GatewayKey: gatewayKey,
}
if _, ok := rateLimitIndex.Get(key); ok { // should never happen
logger.Error(fmt.Errorf("unexpected duplicate rate limit policy key found"), "failed do add rate limit policy to index", "RateLimitPolicy", rlpKey.String(), "Gateway", gatewayKey)
continue
}
rlp := policy.(*kuadrantv1beta2.RateLimitPolicy)
rateLimitIndex.Set(key, rlptools.LimitadorRateLimitsFromRLP(rlp))
}
}

}

// Rate limit policies targeting programmed gateways or routes accepted by parent gateways.
func (r *RateLimitingLimitsReconciler) readRLPs(ctx context.Context, kuadrantNS string) ([]*kuadrantv1beta2.RateLimitPolicy, error) {
logger, err := logr.FromContext(ctx)
Expand Down
92 changes: 92 additions & 0 deletions pkg/rlptools/topology_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,98 @@ import (
"github.com/kuadrant/kuadrant-operator/pkg/library/utils"
)

func TopologyIndex(ctx context.Context, cl client.Client) (*kuadrantgatewayapi.TopologyIndexes, error) {
logger, err := logr.FromContext(ctx)
if err != nil {
return nil, err
}

// Get all the gateways
gwList := &gatewayapiv1.GatewayList{}
err = cl.List(ctx, gwList)
logger.V(1).Info("TopologyIndex: list gateways", "#Gateways", len(gwList.Items), "err", err)
if err != nil {
return nil, err
}

// Get all the routes
routeList := &gatewayapiv1.HTTPRouteList{}
err = cl.List(ctx, routeList)
logger.V(1).Info("TopologyIndex: list httproutes", "#HTTPRoutes", len(routeList.Items), "err", err)
if err != nil {
return nil, err
}

// Get all the rate limit policies
rlpList := &kuadrantv1beta2.RateLimitPolicyList{}
err = cl.List(ctx, rlpList)
logger.V(1).Info("TopologyIndex: list rate limit policies", "#RLPS", len(rlpList.Items), "err", err)
if err != nil {
return nil, err
}

policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p })

t, err := kuadrantgatewayapi.NewTopology(
kuadrantgatewayapi.WithGateways(utils.Map(gwList.Items, ptr.To[gatewayapiv1.Gateway])),
kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To[gatewayapiv1.HTTPRoute])),
kuadrantgatewayapi.WithPolicies(policies),
kuadrantgatewayapi.WithLogger(logger),
)
if err != nil {
return nil, err
}

return kuadrantgatewayapi.NewTopologyIndexes(t), nil
}

func TopologyIndexesFromGateway(ctx context.Context, cl client.Client, gw *gatewayapiv1.Gateway) (*kuadrantgatewayapi.TopologyIndexes, error) {

Check failure on line 62 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Unit Tests (1.21.x, ubuntu-latest)

other declaration of TopologyIndexesFromGateway

Check failure on line 62 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Lint

other declaration of TopologyIndexesFromGateway (typecheck)

Check failure on line 62 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Lint

other declaration of TopologyIndexesFromGateway) (typecheck)

Check failure on line 62 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Integration Tests for github.com/kuadrant/kuadrant-operator/tests/istio

other declaration of TopologyIndexesFromGateway

Check failure on line 62 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Integration Tests for github.com/kuadrant/kuadrant-operator/tests/bare_k8s

other declaration of TopologyIndexesFromGateway

Check failure on line 62 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Integration Tests for github.com/kuadrant/kuadrant-operator/tests/gatewayapi

other declaration of TopologyIndexesFromGateway

Check failure on line 62 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Integration Tests for github.com/kuadrant/kuadrant-operator/controllers (istio)

other declaration of TopologyIndexesFromGateway
logger, err := logr.FromContext(ctx)
if err != nil {
return nil, err
}

routeList := &gatewayapiv1.HTTPRouteList{}
// Get all the routes having the gateway as parent
err = cl.List(
ctx,
routeList,
client.MatchingFields{
fieldindexers.HTTPRouteGatewayParentField: client.ObjectKeyFromObject(gw).String(),
})
logger.V(1).Info("topologyIndexesFromGateway: list httproutes from gateway",
"gateway", client.ObjectKeyFromObject(gw),
"#HTTPRoutes", len(routeList.Items),
"err", err)
if err != nil {
return nil, err
}

rlpList := &kuadrantv1beta2.RateLimitPolicyList{}
// Get all the rate limit policies
err = cl.List(ctx, rlpList)
logger.V(1).Info("topologyIndexesFromGateway: list rate limit policies",
"#RLPS", len(rlpList.Items),
"err", err)
if err != nil {
return nil, err
}

policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p })

t, err := kuadrantgatewayapi.NewTopology(
kuadrantgatewayapi.WithGateways([]*gatewayapiv1.Gateway{gw}),
kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To)),
kuadrantgatewayapi.WithPolicies(policies),
kuadrantgatewayapi.WithLogger(logger),
)
if err != nil {
return nil, err
}

return kuadrantgatewayapi.NewTopologyIndexes(t), nil
}

func TopologyIndexesFromGateway(ctx context.Context, cl client.Client, gw *gatewayapiv1.Gateway) (*kuadrantgatewayapi.TopologyIndexes, error) {

Check failure on line 109 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Unit Tests (1.21.x, ubuntu-latest)

TopologyIndexesFromGateway redeclared in this block

Check failure on line 109 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Lint

TopologyIndexesFromGateway redeclared in this block

Check failure on line 109 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Lint

TopologyIndexesFromGateway redeclared in this block

Check failure on line 109 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Integration Tests for github.com/kuadrant/kuadrant-operator/tests/istio

TopologyIndexesFromGateway redeclared in this block

Check failure on line 109 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Integration Tests for github.com/kuadrant/kuadrant-operator/tests/bare_k8s

TopologyIndexesFromGateway redeclared in this block

Check failure on line 109 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Integration Tests for github.com/kuadrant/kuadrant-operator/tests/gatewayapi

TopologyIndexesFromGateway redeclared in this block

Check failure on line 109 in pkg/rlptools/topology_index.go

View workflow job for this annotation

GitHub Actions / Integration Tests for github.com/kuadrant/kuadrant-operator/controllers (istio)

TopologyIndexesFromGateway redeclared in this block
logger, err := logr.FromContext(ctx)
if err != nil {
Expand Down

0 comments on commit c026986

Please sign in to comment.