From c026986f81aa0cfcf3b6fd28df9fbbf2cc93109f Mon Sep 17 00:00:00 2001 From: Eguzki Astiz Lezaun Date: Thu, 6 Jun 2024 17:27:03 +0200 Subject: [PATCH] wip --- ...imitador_cluster_envoyfilter_controller.go | 1 + .../rate_limiting_limits_controller.go | 57 +++++++++++- pkg/rlptools/topology_index.go | 92 +++++++++++++++++++ 3 files changed, 147 insertions(+), 3 deletions(-) diff --git a/controllers/limitador_cluster_envoyfilter_controller.go b/controllers/limitador_cluster_envoyfilter_controller.go index 58de5da5e..dc142c24f 100644 --- a/controllers/limitador_cluster_envoyfilter_controller.go +++ b/controllers/limitador_cluster_envoyfilter_controller.go @@ -36,6 +36,7 @@ import ( kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2" "github.com/kuadrant/kuadrant-operator/pkg/common" kuadrantistioutils "github.com/kuadrant/kuadrant-operator/pkg/istio" + kuadrantgatewayapi "github.com/kuadrant/kuadrant-operator/pkg/library/gatewayapi" "github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant" "github.com/kuadrant/kuadrant-operator/pkg/library/mappers" "github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers" diff --git a/controllers/rate_limiting_limits_controller.go b/controllers/rate_limiting_limits_controller.go index 9d2b9920b..9c64f557e 100644 --- a/controllers/rate_limiting_limits_controller.go +++ b/controllers/rate_limiting_limits_controller.go @@ -19,8 +19,12 @@ package controllers import ( "context" "encoding/json" + "fmt" + "slices" + "sort" "github.com/go-logr/logr" + "github.com/samber/lo" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" @@ -97,13 +101,11 @@ func (r *RateLimitingLimitsReconciler) Reconcile(eventCtx context.Context, req c return ctrl.Result{}, nil } - rlps, err := r.readRLPs(ctx, limitadorCR.GetNamespace()) + rateLimitIndexFromCluster, err := r.buildRateLimitIndexFromCluster(ctx) if err != nil { return ctrl.Result{}, err } - rateLimitIndex := rlptools.NewRateLimitIndex() - for _, rlp := range rlps { if _, ok := rateLimitIndex.Get(client.ObjectKeyFromObject(rlp)); ok { continue @@ -130,6 +132,55 @@ func (r *RateLimitingLimitsReconciler) Reconcile(eventCtx context.Context, req c return ctrl.Result{}, nil } +func (r *RateLimitingLimitsReconciler) buildRateLimitIndexFromCluster(ctx context.Context, topology *kuadrantgatewayapi.Topology) (*rlptools.RateLimitIndex, error) { + logger, err := logr.FromContext(ctx) + if err != nil { + return nil, err + } + + gateways := lo.KeyBy(topology.Gateways(), func(gateway kuadrantgatewayapi.GatewayNode) string { + return client.ObjectKeyFromObject(gateway.Gateway).String() + }) + + // sort the gateways for deterministic output and consistent comparison against existing objects + gatewayNames := lo.Keys(gateways) + slices.Sort(gatewayNames) + + rateLimitIndex := rlptools.NewRateLimitIndex() + + for _, gatewayName := range gatewayNames { + gateway := gateways[gatewayName].Gateway + topologyWithOverrides, err := rlptools.ApplyOverrides(topology, gateway) + if err != nil { + logger.Error(err, "failed to apply overrides") + return nil + } + + // sort the policies for deterministic output and consistent comparison against existing objects + indexes := kuadrantgatewayapi.NewTopologyIndexes(topologyWithOverrides) + policies := indexes.PoliciesFromGateway(gateway) + sort.Sort(kuadrantgatewayapi.PolicyByTargetRefKindAndCreationTimeStamp(policies)) + + logger.V(1).Info("new rate limit index", "gateway", client.ObjectKeyFromObject(gateway), "policies", lo.Map(policies, func(p kuadrantgatewayapi.Policy, _ int) string { return client.ObjectKeyFromObject(p).String() })) + + for _, policy := range policies { + rlpKey := client.ObjectKeyFromObject(policy) + gatewayKey := client.ObjectKeyFromObject(gateway) + key := rlptools.RateLimitIndexKey{ + RateLimitPolicyKey: rlpKey, + GatewayKey: gatewayKey, + } + if _, ok := rateLimitIndex.Get(key); ok { // should never happen + logger.Error(fmt.Errorf("unexpected duplicate rate limit policy key found"), "failed do add rate limit policy to index", "RateLimitPolicy", rlpKey.String(), "Gateway", gatewayKey) + continue + } + rlp := policy.(*kuadrantv1beta2.RateLimitPolicy) + rateLimitIndex.Set(key, rlptools.LimitadorRateLimitsFromRLP(rlp)) + } + } + +} + // Rate limit policies targeting programmed gateways or routes accepted by parent gateways. func (r *RateLimitingLimitsReconciler) readRLPs(ctx context.Context, kuadrantNS string) ([]*kuadrantv1beta2.RateLimitPolicy, error) { logger, err := logr.FromContext(ctx) diff --git a/pkg/rlptools/topology_index.go b/pkg/rlptools/topology_index.go index 62bed78ac..5edfb2da1 100644 --- a/pkg/rlptools/topology_index.go +++ b/pkg/rlptools/topology_index.go @@ -14,6 +14,98 @@ import ( "github.com/kuadrant/kuadrant-operator/pkg/library/utils" ) +func TopologyIndex(ctx context.Context, cl client.Client) (*kuadrantgatewayapi.TopologyIndexes, error) { + logger, err := logr.FromContext(ctx) + if err != nil { + return nil, err + } + + // Get all the gateways + gwList := &gatewayapiv1.GatewayList{} + err = cl.List(ctx, gwList) + logger.V(1).Info("TopologyIndex: list gateways", "#Gateways", len(gwList.Items), "err", err) + if err != nil { + return nil, err + } + + // Get all the routes + routeList := &gatewayapiv1.HTTPRouteList{} + err = cl.List(ctx, routeList) + logger.V(1).Info("TopologyIndex: list httproutes", "#HTTPRoutes", len(routeList.Items), "err", err) + if err != nil { + return nil, err + } + + // Get all the rate limit policies + rlpList := &kuadrantv1beta2.RateLimitPolicyList{} + err = cl.List(ctx, rlpList) + logger.V(1).Info("TopologyIndex: list rate limit policies", "#RLPS", len(rlpList.Items), "err", err) + if err != nil { + return nil, err + } + + policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p }) + + t, err := kuadrantgatewayapi.NewTopology( + kuadrantgatewayapi.WithGateways(utils.Map(gwList.Items, ptr.To[gatewayapiv1.Gateway])), + kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To[gatewayapiv1.HTTPRoute])), + kuadrantgatewayapi.WithPolicies(policies), + kuadrantgatewayapi.WithLogger(logger), + ) + if err != nil { + return nil, err + } + + return kuadrantgatewayapi.NewTopologyIndexes(t), nil +} + +func TopologyIndexesFromGateway(ctx context.Context, cl client.Client, gw *gatewayapiv1.Gateway) (*kuadrantgatewayapi.TopologyIndexes, error) { + logger, err := logr.FromContext(ctx) + if err != nil { + return nil, err + } + + routeList := &gatewayapiv1.HTTPRouteList{} + // Get all the routes having the gateway as parent + err = cl.List( + ctx, + routeList, + client.MatchingFields{ + fieldindexers.HTTPRouteGatewayParentField: client.ObjectKeyFromObject(gw).String(), + }) + logger.V(1).Info("topologyIndexesFromGateway: list httproutes from gateway", + "gateway", client.ObjectKeyFromObject(gw), + "#HTTPRoutes", len(routeList.Items), + "err", err) + if err != nil { + return nil, err + } + + rlpList := &kuadrantv1beta2.RateLimitPolicyList{} + // Get all the rate limit policies + err = cl.List(ctx, rlpList) + logger.V(1).Info("topologyIndexesFromGateway: list rate limit policies", + "#RLPS", len(rlpList.Items), + "err", err) + if err != nil { + return nil, err + } + + policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p }) + + t, err := kuadrantgatewayapi.NewTopology( + kuadrantgatewayapi.WithGateways([]*gatewayapiv1.Gateway{gw}), + kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To)), + kuadrantgatewayapi.WithPolicies(policies), + kuadrantgatewayapi.WithLogger(logger), + ) + if err != nil { + return nil, err + } + + return kuadrantgatewayapi.NewTopologyIndexes(t), nil +} + func TopologyIndexesFromGateway(ctx context.Context, cl client.Client, gw *gatewayapiv1.Gateway) (*kuadrantgatewayapi.TopologyIndexes, error) { logger, err := logr.FromContext(ctx) if err != nil {