Skip to content

Commit

Permalink
limitador cluster envoy filter leverages DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
eguzki committed Apr 10, 2024
1 parent 36eaa1b commit db8fe5c
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 110 deletions.
6 changes: 0 additions & 6 deletions api/v1beta2/ratelimitpolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ const (
ExcludeOperator WhenConditionOperator = "excl"
MatchesOperator WhenConditionOperator = "matches"

RateLimitPolicyBackReferenceAnnotationName = "kuadrant.io/ratelimitpolicies"
RateLimitPolicyDirectReferenceAnnotationName = "kuadrant.io/ratelimitpolicy"
)

Expand Down Expand Up @@ -183,7 +182,6 @@ func (s *RateLimitPolicyStatus) Equals(other *RateLimitPolicyStatus, logger logr
}

var _ kuadrant.Policy = &RateLimitPolicy{}
var _ kuadrant.Referrer = &RateLimitPolicy{}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
Expand Down Expand Up @@ -269,10 +267,6 @@ func (r *RateLimitPolicy) Kind() string {
return r.TypeMeta.Kind
}

func (r *RateLimitPolicy) BackReferenceAnnotationName() string {
return RateLimitPolicyBackReferenceAnnotationName
}

func (r *RateLimitPolicy) DirectReferenceAnnotationName() string {
return RateLimitPolicyDirectReferenceAnnotationName
}
Expand Down
59 changes: 36 additions & 23 deletions controllers/limitador_cluster_envoyfilter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/handler"
gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/kuadrant/kuadrant-operator/api/v1beta2"
kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2"
"github.com/kuadrant/kuadrant-operator/pkg/common"
kuadrantistioutils "github.com/kuadrant/kuadrant-operator/pkg/istio"
"github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant"
"github.com/kuadrant/kuadrant-operator/pkg/library/mappers"
"github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers"
"github.com/kuadrant/kuadrant-operator/pkg/library/utils"
"github.com/kuadrant/kuadrant-operator/pkg/rlptools"
)

// LimitadorClusterEnvoyFilterReconciler reconciles a EnvoyFilter object with limitador's cluster
Expand Down Expand Up @@ -76,28 +77,22 @@ func (r *LimitadorClusterEnvoyFilterReconciler) Reconcile(eventCtx context.Conte
logger.V(1).Info(string(jsonData))
}

err := r.reconcileRateLimitingClusterEnvoyFilter(ctx, gw)

desired, err := r.desiredRateLimitingClusterEnvoyFilter(ctx, gw)

Check warning on line 80 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L80

Added line #L80 was not covered by tests
if err != nil {
return ctrl.Result{}, err
}

logger.Info("EnvoyFilter reconciled successfully")
return ctrl.Result{}, nil
}

func (r *LimitadorClusterEnvoyFilterReconciler) reconcileRateLimitingClusterEnvoyFilter(ctx context.Context, gw *gatewayapiv1.Gateway) error {
desired, err := r.desiredRateLimitingClusterEnvoyFilter(ctx, gw)
err = r.ReconcileResource(ctx, &istioclientnetworkingv1alpha3.EnvoyFilter{}, desired, kuadrantistioutils.AlwaysUpdateEnvoyFilter)

Check warning on line 85 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L85

Added line #L85 was not covered by tests
if err != nil {
return err
return ctrl.Result{}, err

Check warning on line 87 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L87

Added line #L87 was not covered by tests
}

err = r.ReconcileResource(ctx, &istioclientnetworkingv1alpha3.EnvoyFilter{}, desired, kuadrantistioutils.AlwaysUpdateEnvoyFilter)
if err != nil {
return err
return ctrl.Result{}, err

Check warning on line 91 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L91

Added line #L91 was not covered by tests
}

return nil
logger.Info("EnvoyFilter reconciled successfully")
return ctrl.Result{}, nil

Check warning on line 95 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L94-L95

Added lines #L94 - L95 were not covered by tests
}

func (r *LimitadorClusterEnvoyFilterReconciler) desiredRateLimitingClusterEnvoyFilter(ctx context.Context, gw *gatewayapiv1.Gateway) (*istioclientnetworkingv1alpha3.EnvoyFilter, error) {
Expand All @@ -123,11 +118,15 @@ func (r *LimitadorClusterEnvoyFilterReconciler) desiredRateLimitingClusterEnvoyF
},
}

gateway := kuadrant.GatewayWrapper{Gateway: gw, Referrer: &v1beta2.RateLimitPolicy{}}
rlpRefs := gateway.PolicyRefs()
logger.V(1).Info("desiredRateLimitingClusterEnvoyFilter", "rlpRefs", rlpRefs)
t, err := rlptools.TopologyIndexesFromGateway(ctx, r.Client(), gw)
if err != nil {
return nil, err

Check warning on line 123 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L121-L123

Added lines #L121 - L123 were not covered by tests
}
rateLimitPolicies := t.PoliciesFromGateway(gw)

Check warning on line 125 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L125

Added line #L125 was not covered by tests

logger.V(1).Info("desiredRateLimitingClusterEnvoyFilter", "#RLPS", len(rateLimitPolicies))

Check warning on line 127 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L127

Added line #L127 was not covered by tests

if len(rlpRefs) < 1 {
if len(rateLimitPolicies) < 1 {

Check warning on line 129 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L129

Added line #L129 was not covered by tests
utils.TagObjectToDelete(ef)
return ef, nil
}
Expand Down Expand Up @@ -165,11 +164,25 @@ func (r *LimitadorClusterEnvoyFilterReconciler) desiredRateLimitingClusterEnvoyF

// SetupWithManager sets up the controller with the Manager.
func (r *LimitadorClusterEnvoyFilterReconciler) SetupWithManager(mgr ctrl.Manager) error {
httpRouteToParentGatewaysEventMapper := mappers.NewHTTPRouteToParentGatewaysEventMapper(
mappers.WithLogger(r.Logger().WithName("httpRouteToParentGatewaysEventMapper")),
)

Check warning on line 169 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L167-L169

Added lines #L167 - L169 were not covered by tests

rlpToParentGatewaysEventMapper := mappers.NewPolicyToParentGatewaysEventMapper(
mappers.WithLogger(r.Logger().WithName("ratelimitpolicyToParentGatewaysEventMapper")),
mappers.WithClient(r.Client()),
)

Check warning on line 174 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L171-L174

Added lines #L171 - L174 were not covered by tests

return ctrl.NewControllerManagedBy(mgr).
// Limitador cluster EnvoyFilter controller only cares about
// the annotation having references to RLP's
// kuadrant.io/ratelimitpolicies
For(&gatewayapiv1.Gateway{}, builder.WithPredicates(predicate.AnnotationChangedPredicate{})).
For(&gatewayapiv1.Gateway{}).

Check warning on line 177 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L177

Added line #L177 was not covered by tests
Owns(&istioclientnetworkingv1alpha3.EnvoyFilter{}).
Watches(
&gatewayapiv1.HTTPRoute{},
handler.EnqueueRequestsFromMapFunc(httpRouteToParentGatewaysEventMapper.Map),
).
Watches(
&kuadrantv1beta2.RateLimitPolicy{},
handler.EnqueueRequestsFromMapFunc(rlpToParentGatewaysEventMapper.Map),
).

Check warning on line 186 in controllers/limitador_cluster_envoyfilter_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/limitador_cluster_envoyfilter_controller.go#L179-L186

Added lines #L179 - L186 were not covered by tests
Complete(r)
}
82 changes: 1 addition & 81 deletions controllers/rate_limiting_wasmplugin_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package controllers
import (
"context"
"encoding/json"
"fmt"
"sort"

"github.com/go-logr/logr"
istioextensionsv1alpha1 "istio.io/api/extensions/v1alpha1"
istioclientgoextensionv1alpha1 "istio.io/client-go/pkg/apis/extensions/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -44,10 +42,6 @@ import (
"github.com/kuadrant/kuadrant-operator/pkg/rlptools/wasm"
)

const (
HTTPRouteGatewayParentField = ".metadata.parentRefs.gateway"
)

// RateLimitingWASMPluginReconciler reconciles a WASMPlugin object for rate limiting
type RateLimitingWASMPluginReconciler struct {
*reconcilers.BaseReconciler
Expand Down Expand Up @@ -160,7 +154,7 @@ func (r *RateLimitingWASMPluginReconciler) wasmPluginConfig(ctx context.Context,
RateLimitPolicies: make([]wasm.RateLimitPolicy, 0),
}

t, err := r.topologyIndexesFromGateway(ctx, gw)
t, err := rlptools.TopologyIndexesFromGateway(ctx, r.Client(), gw)

Check warning on line 157 in controllers/rate_limiting_wasmplugin_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rate_limiting_wasmplugin_controller.go#L157

Added line #L157 was not covered by tests
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -190,48 +184,6 @@ func (r *RateLimitingWASMPluginReconciler) wasmPluginConfig(ctx context.Context,
return wasmPlugin, nil
}

func (r *RateLimitingWASMPluginReconciler) topologyIndexesFromGateway(ctx context.Context, gw *gatewayapiv1.Gateway) (*kuadrantgatewayapi.TopologyIndexes, error) {
logger, err := logr.FromContext(ctx)
if err != nil {
return nil, err
}

routeList := &gatewayapiv1.HTTPRouteList{}
// Get all the routes having the gateway as parent
err = r.Client().List(ctx, routeList, client.MatchingFields{HTTPRouteGatewayParentField: client.ObjectKeyFromObject(gw).String()})
logger.V(1).Info("topologyIndexesFromGateway: list httproutes from gateway",
"gateway", client.ObjectKeyFromObject(gw),
"#HTTPRoutes", len(routeList.Items),
"err", err)
if err != nil {
return nil, err
}

rlpList := &kuadrantv1beta2.RateLimitPolicyList{}
// Get all the rate limit policies
err = r.Client().List(ctx, rlpList)
logger.V(1).Info("topologyIndexesFromGateway: list rate limit policies",
"#RLPS", len(rlpList.Items),
"err", err)
if err != nil {
return nil, err
}

policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p })

t, err := kuadrantgatewayapi.NewTopology(
kuadrantgatewayapi.WithGateways([]*gatewayapiv1.Gateway{gw}),
kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To[gatewayapiv1.HTTPRoute])),
kuadrantgatewayapi.WithPolicies(policies),
kuadrantgatewayapi.WithLogger(logger),
)
if err != nil {
return nil, err
}

return kuadrantgatewayapi.NewTopologyIndexes(t), nil
}

func (r *RateLimitingWASMPluginReconciler) wasmRateLimitPolicy(ctx context.Context, t *kuadrantgatewayapi.TopologyIndexes, rlp *kuadrantv1beta2.RateLimitPolicy, gw *gatewayapiv1.Gateway) (*wasm.RateLimitPolicy, error) {
route, err := r.routeFromRLP(ctx, t, rlp, gw)
if err != nil {
Expand Down Expand Up @@ -320,40 +272,8 @@ func (r *RateLimitingWASMPluginReconciler) routeFromRLP(ctx context.Context, t *
return route, nil
}

// addHTTPRouteByGatewayIndexer declares an index key that we can later use with the client as a pseudo-field name,
// allowing to query all the routes parented by a given gateway
// to prevent creating the same index field multiple times, the function is declared private to be
// called only by this controller
func addHTTPRouteByGatewayIndexer(mgr ctrl.Manager, baseLogger logr.Logger) error {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &gatewayapiv1.HTTPRoute{}, HTTPRouteGatewayParentField, func(rawObj client.Object) []string {
// grab the route object, extract the parents
route, assertionOk := rawObj.(*gatewayapiv1.HTTPRoute)
if !assertionOk {
baseLogger.V(1).Error(fmt.Errorf("%T is not a *gatewayapiv1.HTTPRoute", rawObj), "cannot map")
return nil
}

logger := baseLogger.WithValues("route", client.ObjectKeyFromObject(route).String())

return utils.Map(kuadrantgatewayapi.GetRouteAcceptedGatewayParentKeys(route), func(key client.ObjectKey) string {
logger.V(1).Info("new gateway added", "key", key.String())
return key.String()
})
}); err != nil {
return err
}

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *RateLimitingWASMPluginReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Add custom indexer
err := addHTTPRouteByGatewayIndexer(mgr, r.Logger().WithName("routeByGatewayIndexer"))
if err != nil {
return err
}

httpRouteToParentGatewaysEventMapper := mappers.NewHTTPRouteToParentGatewaysEventMapper(
mappers.WithLogger(r.Logger().WithName("httpRouteToParentGatewaysEventMapper")),
)
Expand Down
7 changes: 7 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
kuadrantv1alpha1 "github.com/kuadrant/kuadrant-operator/api/v1alpha1"
kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1"
kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2"
"github.com/kuadrant/kuadrant-operator/pkg/library/fieldindexers"
"github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant"
"github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers"
"github.com/kuadrant/kuadrant-operator/pkg/log"
Expand Down Expand Up @@ -138,6 +139,12 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

err = fieldindexers.HTTPRouteIndexByGateway(
mgr,
log.Log.WithName("kuadrant").WithName("indexer").WithName("routeIndexByGateway"),
)
Expect(err).ToNot(HaveOccurred())

authPolicyBaseReconciler := reconcilers.NewBaseReconciler(
mgr.GetClient(), mgr.GetScheme(), mgr.GetAPIReader(),
log.Log.WithName("authpolicy"),
Expand Down
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1"
kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2"
"github.com/kuadrant/kuadrant-operator/controllers"
"github.com/kuadrant/kuadrant-operator/pkg/library/fieldindexers"
"github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant"
"github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers"
"github.com/kuadrant/kuadrant-operator/pkg/log"
Expand Down Expand Up @@ -132,6 +133,14 @@ func main() {
os.Exit(1)
}

if err := fieldindexers.HTTPRouteIndexByGateway(
mgr,
log.Log.WithName("kuadrant").WithName("indexer").WithName("routeIndexByGateway"),
); err != nil {
setupLog.Error(err, "unable to add indexer")
os.Exit(1)
}

kuadrantBaseReconciler := reconcilers.NewBaseReconciler(
mgr.GetClient(), mgr.GetScheme(), mgr.GetAPIReader(),
log.Log.WithName("kuadrant"),
Expand Down
44 changes: 44 additions & 0 deletions pkg/library/fieldindexers/httproute_parents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package fieldindexers

import (
"context"
"fmt"

"github.com/go-logr/logr"
"github.com/kuadrant/kuadrant-operator/pkg/library/utils"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1"

kuadrantgatewayapi "github.com/kuadrant/kuadrant-operator/pkg/library/gatewayapi"
)

const (
HTTPRouteGatewayParentField = ".metadata.parentRefs.gateway"
)

// HTTPRouteByGatewayIndexer declares an index key that we can later use with the client as a pseudo-field name,
// allowing to query all the routes parented by a given gateway
// to prevent creating the same index field multiple times, the function is declared private to be
// called only by this controller
func HTTPRouteIndexByGateway(mgr ctrl.Manager, baseLogger logr.Logger) error {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &gatewayapiv1.HTTPRoute{}, HTTPRouteGatewayParentField, func(rawObj client.Object) []string {
// grab the route object, extract the parents
route, assertionOk := rawObj.(*gatewayapiv1.HTTPRoute)
if !assertionOk {
baseLogger.V(1).Error(fmt.Errorf("%T is not a *gatewayapiv1.HTTPRoute", rawObj), "cannot map")
return nil
}

logger := baseLogger.WithValues("route", client.ObjectKeyFromObject(route).String())

return utils.Map(kuadrantgatewayapi.GetRouteAcceptedGatewayParentKeys(route), func(key client.ObjectKey) string {
logger.V(1).Info("new gateway added", "key", key.String())
return key.String()
})
}); err != nil {
return err
}

return nil
}
Loading

0 comments on commit db8fe5c

Please sign in to comment.