Skip to content

Commit

Permalink
Add cluster's Service CIDR to Multi-cluster Gateway
Browse files Browse the repository at this point in the history
In order to keep Service CIDR in antrea-mc-controller and
antrea-agent consistent, the node controller will store the
Service CIDR to Gateway's field. When the Service CIDR is used
in antrea-agent, controllers could get the Service CIDR in
Multi-cluster Gateway.

Signed-off-by: hjiajing <hjiajing@vmware.com>
  • Loading branch information
hjiajing committed Mar 28, 2023
1 parent be1b86d commit 1d39bac
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 48 deletions.
2 changes: 2 additions & 0 deletions multicluster/apis/multicluster/v1alpha1/gateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Gateway struct {
GatewayIP string `json:"gatewayIP,omitempty"`
// In-cluster tunnel IP of the Gateway.
InternalIP string `json:"internalIP,omitempty"`
// Service CIDR of the local member cluster.
ServiceCIDR string `json:"serviceCIDR,omitempty"`
}

type ClusterInfo struct {
Expand Down
3 changes: 3 additions & 0 deletions multicluster/build/yamls/antrea-multicluster-member.yml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ spec:
type: string
metadata:
type: object
serviceCIDR:
description: Service CIDR of the local member cluster.
type: string
type: object
served: true
storage: true
Expand Down
25 changes: 25 additions & 0 deletions multicluster/cmd/multicluster-controller/gateway_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package main

import (
"context"
"encoding/json"
"fmt"
"net/http"

admissionv1 "k8s.io/api/admission/v1"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand Down Expand Up @@ -59,10 +61,33 @@ func (v *gatewayValidator) Handle(ctx context.Context, req admission.Request) ad
klog.ErrorS(err, "failed to create Gateway", "Gateway", klog.KObj(gateway), "Namespace", v.namespace)
return admission.Errored(http.StatusPreconditionFailed, err)
}
if req.Operation == admissionv1.Update {
oldGateway := &mcv1alpha1.Gateway{}
if req.OldObject.Raw != nil {
if err := json.Unmarshal(req.OldObject.Raw, oldGateway); err != nil {
klog.ErrorS(err, "Error while decoding old Gateway")
return admission.Errored(http.StatusBadRequest, err)
}
}
ui := req.UserInfo
_, saName, err := serviceaccount.SplitUsername(ui.Username)
if err != nil {
klog.ErrorS(err, "Error getting ServiceAccount name", "Gateway", req.Namespace+"/"+req.Name)
return admission.Errored(http.StatusBadRequest, err)
}
if saName != mcControllerSAName && isServiceCIDRChanged(oldGateway, gateway) {
return admission.Errored(http.StatusPreconditionFailed, fmt.Errorf("ServiceCIDR can only be updated by Antrea Multi-cluster Controller"))
}
return admission.Allowed("")
}
return admission.Allowed("")
}

func (v *gatewayValidator) InjectDecoder(d *admission.Decoder) error {
v.decoder = d
return nil
}

func isServiceCIDRChanged(old, cur *mcv1alpha1.Gateway) bool {
return old.ServiceCIDR != cur.ServiceCIDR
}
47 changes: 47 additions & 0 deletions multicluster/cmd/multicluster-controller/gateway_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/admission/v1"
authenticationv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -52,8 +53,19 @@ func TestWebhookGatewayEvents(t *testing.T) {
Name: "node-2",
},
}
updatedGateway := &mcsv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "node-2",
},
ServiceCIDR: "10.100.0.0/16",
}
oldGateway := updatedGateway.DeepCopy()
oldGateway.ServiceCIDR = "10.101.0.0/16"

newGW, _ := json.Marshal(newGateway)
updatedGW, _ := json.Marshal(updatedGateway)
oldGW, _ := json.Marshal(oldGateway)

newReq := admission.Request{
AdmissionRequest: v1.AdmissionRequest{
Expand All @@ -77,6 +89,35 @@ func TestWebhookGatewayEvents(t *testing.T) {
},
}

updateReq := admission.Request{
AdmissionRequest: v1.AdmissionRequest{
UID: "07e52e8d-4513-11e9-a716-42010a800270",
Kind: metav1.GroupVersionKind{
Group: "multicluster.crd.antrea.io",
Version: "v1alpha1",
Kind: "Gateway",
},
Resource: metav1.GroupVersionResource{
Group: "multicluster.crd.antrea.io",
Version: "v1alpha1",
Resource: "Gateways",
},
Name: "node-2",
Namespace: "default",
Operation: v1.Update,
OldObject: runtime.RawExtension{
Raw: oldGW,
},
Object: runtime.RawExtension{
Raw: updatedGW,
},
UserInfo: authenticationv1.UserInfo{
Username: "system:serviceaccount:mcs1:other-sa",
UID: "4842eb60-68e3-4e38-adad-3abfd6117241",
},
},
}

newReqCopy := newReq.DeepCopy()
invalidReq := admission.Request{
AdmissionRequest: *newReqCopy,
Expand Down Expand Up @@ -106,6 +147,12 @@ func TestWebhookGatewayEvents(t *testing.T) {
req: invalidReq,
isAllowed: false,
},
{
name: "failed to update a Gateway's ServiceCIDR field",
existingGateway: existingGateway,
req: updateReq,
isAllowed: false,
},
}

newScheme := runtime.NewScheme()
Expand Down
2 changes: 1 addition & 1 deletion multicluster/cmd/multicluster-controller/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func runMember(o *Options) error {
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
opts.ServiceCIDR,
opts.PodCIDRs,
commonAreaGetter)
if err = gwReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -107,6 +106,7 @@ func runMember(o *Options) error {
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
opts.ServiceCIDR,
opts.GatewayIPPrecedence)
if err = nodeReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating Node controller: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ spec:
type: string
metadata:
type: object
serviceCIDR:
description: Service CIDR of the local member cluster.
type: string
type: object
served: true
storage: true
Expand Down
40 changes: 10 additions & 30 deletions multicluster/controllers/multicluster/member/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package member

import (
"context"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -43,7 +42,6 @@ type (
commonAreaGetter commonarea.RemoteCommonAreaGetter
namespace string
localClusterID string
serviceCIDR string
podCIDRs []string
leaderNamespace string
}
Expand All @@ -55,14 +53,12 @@ func NewGatewayReconciler(
client client.Client,
scheme *runtime.Scheme,
namespace string,
serviceCIDR string,
podCIDRs []string,
commonAreaGetter commonarea.RemoteCommonAreaGetter) *GatewayReconciler {
reconciler := &GatewayReconciler{
Client: client,
Scheme: scheme,
namespace: namespace,
serviceCIDR: serviceCIDR,
podCIDRs: podCIDRs,
commonAreaGetter: commonAreaGetter,
}
Expand All @@ -85,10 +81,6 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{Requeue: true}, err
}
r.leaderNamespace = commonArea.GetNamespace()
err = r.getServiceCIDR(ctx)
if err != nil {
return ctrl.Result{}, err
}

resExportName := common.NewClusterInfoResourceExportName(r.localClusterID)
resExportNamespacedName := types.NamespacedName{
Expand All @@ -102,21 +94,21 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
},
}

createOrUpdate := func(gwIP string) error {
createOrUpdate := func(gateway *mcsv1alpha1.Gateway) error {
existingResExport := &mcsv1alpha1.ResourceExport{}
err := commonArea.Get(ctx, resExportNamespacedName, existingResExport)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if apierrors.IsNotFound(err) || !existingResExport.DeletionTimestamp.IsZero() {
if err = r.createResourceExport(ctx, req, commonArea, gwIP); err != nil {
if err = r.createResourceExport(ctx, req, commonArea, gateway); err != nil {
return err
}
return nil
}
// updateResourceExport will update latest Gateway information with the existing ResourceExport's resourceVersion.
// It will return an error and retry when there is a version conflict.
if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, &mcsv1alpha1.GatewayInfo{GatewayIP: gwIP}); err != nil {
if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, gateway); err != nil {
return err
}
return nil
Expand All @@ -133,14 +125,14 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

if err := createOrUpdate(gw.GatewayIP); err != nil {
if err := createOrUpdate(gw); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gwInfo *mcsv1alpha1.GatewayInfo) error {
commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gw *mcsv1alpha1.Gateway) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
Kind: constants.ClusterInfoKind,
ClusterID: r.localClusterID,
Expand All @@ -149,9 +141,9 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R
}
resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: r.serviceCIDR,
ServiceCIDR: gw.ServiceCIDR,
PodCIDRs: r.podCIDRs,
GatewayInfos: []mcsv1alpha1.GatewayInfo{*gwInfo},
GatewayInfos: []mcsv1alpha1.GatewayInfo{{GatewayIP: gw.GatewayIP}},
}
klog.V(2).InfoS("Updating ClusterInfo kind of ResourceExport", "clusterinfo", klog.KObj(existingResExport),
"gateway", req.NamespacedName)
Expand All @@ -163,7 +155,7 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R
}

func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, gatewayIP string) error {
commonArea commonarea.RemoteCommonArea, gateway *mcsv1alpha1.Gateway) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
Kind: constants.ClusterInfoKind,
ClusterID: r.localClusterID,
Expand All @@ -172,11 +164,11 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R
}
resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: r.serviceCIDR,
ServiceCIDR: gateway.ServiceCIDR,
PodCIDRs: r.podCIDRs,
GatewayInfos: []mcsv1alpha1.GatewayInfo{
{
GatewayIP: gatewayIP,
GatewayIP: gateway.GatewayIP,
},
},
}
Expand Down Expand Up @@ -206,15 +198,3 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
}).
Complete(r)
}

// getServiceCIDR gets Service ClusterIP CIDR used in the member cluster.
func (r *GatewayReconciler) getServiceCIDR(ctx context.Context) error {
if len(r.serviceCIDR) == 0 {
serviceCIDR, err := common.DiscoverServiceCIDRByInvalidServiceCreation(ctx, r.Client, r.namespace)
if err != nil {
return fmt.Errorf("failed to find Service ClusterIP range automatically, you may set the 'serviceCIDR' config as an alternative, err: %v, ", err)
}
r.serviceCIDR = serviceCIDR
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package member

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -157,7 +156,7 @@ func TestGatewayReconciler(t *testing.T) {
mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false)
mcReconciler.SetRemoteCommonArea(commonArea)
commonAreaGatter := mcReconciler
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", "10.96.0.0/12", []string{"10.200.1.1/16"}, commonAreaGatter)
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, commonAreaGatter)
t.Run(tt.name, func(t *testing.T) {
req := ctrl.Request{NamespacedName: tt.namespacedName}
if _, err := r.Reconcile(common.TestCtx, req); err != nil {
Expand Down Expand Up @@ -186,10 +185,3 @@ func TestGatewayReconciler(t *testing.T) {
})
}
}

func TestGetServiceCIDR(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects().Build()
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", "", []string{"10.200.1.1/16"}, nil)
err := r.getServiceCIDR(context.TODO())
assert.Contains(t, err.Error(), "expected a specific error but none was returned")
}
Loading

0 comments on commit 1d39bac

Please sign in to comment.