Skip to content

Commit

Permalink
Add cluster's Service CIDR to Multi-cluster Gateway (#4737)
Browse files Browse the repository at this point in the history
In order to keep Service CIDR in antrea-mc-controller and
antrea-agent consistent, the node controller will store the
Service CIDR to Gateway's field. When the Service CIDR is used
in antrea-agent, controllers could get the Service CIDR in
Multi-cluster Gateway.

Signed-off-by: hjiajing <hjiajing@vmware.com>
  • Loading branch information
hjiajing authored Mar 31, 2023
1 parent 02b3779 commit d7ffc85
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 63 deletions.
2 changes: 2 additions & 0 deletions multicluster/apis/multicluster/v1alpha1/gateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Gateway struct {
GatewayIP string `json:"gatewayIP,omitempty"`
// In-cluster tunnel IP of the Gateway.
InternalIP string `json:"internalIP,omitempty"`
// Service CIDR of the local member cluster.
ServiceCIDR string `json:"serviceCIDR,omitempty"`
}

type ClusterInfo struct {
Expand Down
3 changes: 3 additions & 0 deletions multicluster/build/yamls/antrea-multicluster-member.yml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ spec:
type: string
metadata:
type: object
serviceCIDR:
description: Service CIDR of the local member cluster.
type: string
type: object
served: true
storage: true
Expand Down
23 changes: 12 additions & 11 deletions multicluster/cmd/multicluster-controller/gateway_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"

admissionv1 "k8s.io/api/admission/v1"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand All @@ -47,17 +48,17 @@ func (v *gatewayValidator) Handle(ctx context.Context, req admission.Request) ad
return admission.Errored(http.StatusBadRequest, err)
}

// Check if there is any existing Gateway.
gatewayList := &mcv1alpha1.GatewayList{}
if err := v.Client.List(context.TODO(), gatewayList, client.InNamespace(v.namespace)); err != nil {
klog.ErrorS(err, "Error reading Gateway", "Namespace", v.namespace)
return admission.Errored(http.StatusPreconditionFailed, err)
}

if req.Operation == admissionv1.Create && len(gatewayList.Items) > 0 {
err := fmt.Errorf("multiple Gateways in a Namespace are not allowed")
klog.ErrorS(err, "failed to create Gateway", "Gateway", klog.KObj(gateway), "Namespace", v.namespace)
return admission.Errored(http.StatusPreconditionFailed, err)
// Gateway can only be updated or created by antrea-mc-controller
if req.Operation == admissionv1.Update || req.Operation == admissionv1.Create {
ui := req.UserInfo
_, saName, err := serviceaccount.SplitUsername(ui.Username)
if err != nil {
klog.ErrorS(err, "Error getting ServiceAccount name", "Gateway", req.Namespace+"/"+req.Name)
return admission.Errored(http.StatusBadRequest, err)
}
if saName != mcControllerSAName {
return admission.Errored(http.StatusPreconditionFailed, fmt.Errorf("Gateway can only be created or updated by Antrea Multi-cluster controller"))
}
}
return admission.Allowed("")
}
Expand Down
77 changes: 72 additions & 5 deletions multicluster/cmd/multicluster-controller/gateway_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/admission/v1"
authenticationv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -52,8 +53,19 @@ func TestWebhookGatewayEvents(t *testing.T) {
Name: "node-2",
},
}
updatedGateway := &mcsv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "node-2",
},
ServiceCIDR: "10.100.0.0/16",
}
oldGateway := updatedGateway.DeepCopy()
oldGateway.ServiceCIDR = "10.101.0.0/16"

newGW, _ := json.Marshal(newGateway)
updatedGW, _ := json.Marshal(updatedGateway)
oldGW, _ := json.Marshal(oldGateway)

newReq := admission.Request{
AdmissionRequest: v1.AdmissionRequest{
Expand All @@ -74,6 +86,39 @@ func TestWebhookGatewayEvents(t *testing.T) {
Object: runtime.RawExtension{
Raw: newGW,
},
UserInfo: authenticationv1.UserInfo{
Username: "system:serviceaccount:mcs1:antrea-mc-controller",
UID: "4842eb60-68e3-4e38-adad-3abfd6117241",
},
},
}

updateReqWithInvalidSA := admission.Request{
AdmissionRequest: v1.AdmissionRequest{
UID: "07e52e8d-4513-11e9-a716-42010a800270",
Kind: metav1.GroupVersionKind{
Group: "multicluster.crd.antrea.io",
Version: "v1alpha1",
Kind: "Gateway",
},
Resource: metav1.GroupVersionResource{
Group: "multicluster.crd.antrea.io",
Version: "v1alpha1",
Resource: "Gateways",
},
Name: "node-2",
Namespace: "default",
Operation: v1.Update,
OldObject: runtime.RawExtension{
Raw: oldGW,
},
Object: runtime.RawExtension{
Raw: updatedGW,
},
UserInfo: authenticationv1.UserInfo{
Username: "system:serviceaccount:mcs1:other-sa",
UID: "4842eb60-68e3-4e38-adad-3abfd6117241",
},
},
}

Expand All @@ -82,6 +127,16 @@ func TestWebhookGatewayEvents(t *testing.T) {
AdmissionRequest: *newReqCopy,
}
invalidReq.Object = runtime.RawExtension{Raw: []byte("a")}
updateReqCopy := updateReqWithInvalidSA.DeepCopy()
updateReqCopy.UserInfo.Username = "system:serviceaccount:mcs1:antrea-mc-controller"
updateReq := admission.Request{
AdmissionRequest: *updateReqCopy,
}
connectReqCopy := updateReqWithInvalidSA.DeepCopy()
connectReqCopy.Operation = v1.Connect
connectReq := admission.Request{
AdmissionRequest: *connectReqCopy,
}

tests := []struct {
name string
Expand All @@ -96,15 +151,27 @@ func TestWebhookGatewayEvents(t *testing.T) {
isAllowed: true,
},
{
name: "failed to create a Gateway when there is an existing one",
name: "failed to decode request",
req: invalidReq,
isAllowed: false,
},
{
name: "failed to update a Gateway with other ServiceAccount",
existingGateway: existingGateway,
req: newReq,
req: updateReqWithInvalidSA,
isAllowed: false,
},
{
name: "failed to decode request",
req: invalidReq,
isAllowed: false,
name: "connect to a Gateway with other ServiceAccount successfully",
existingGateway: existingGateway,
req: connectReq,
isAllowed: true,
},
{
name: "update a Gateway with ServiceAccount antrea-mc-controller successfully",
existingGateway: existingGateway,
req: updateReq,
isAllowed: true,
},
}

Expand Down
2 changes: 1 addition & 1 deletion multicluster/cmd/multicluster-controller/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func runMember(o *Options) error {
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
opts.ServiceCIDR,
opts.PodCIDRs,
commonAreaGetter)
if err = gwReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -107,6 +106,7 @@ func runMember(o *Options) error {
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
opts.ServiceCIDR,
opts.GatewayIPPrecedence)
if err = nodeReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating Node controller: %v", err)
Expand Down
11 changes: 11 additions & 0 deletions multicluster/cmd/multicluster-controller/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"antrea.io/antrea/multicluster/controllers/multicluster/member"
"antrea.io/antrea/multicluster/test/mocks"
)

Expand Down Expand Up @@ -75,6 +77,12 @@ func TestRunMember(t *testing.T) {
name: "Start member controller successfully with stretchedNetworkPolicy enabled",
options: &Options{EnableStretchedNetworkPolicy: true},
},
{
name: "Start member controller successfully with ServiceCIDR",
options: &Options{
ServiceCIDR: "10.101.0.0/16",
},
},
}

for _, tc := range testCases {
Expand All @@ -84,6 +92,9 @@ func TestRunMember(t *testing.T) {
setupManagerAndCertControllerFunc = func(o *Options) (ctrl.Manager, error) {
return mockMemberManager, nil
}
member.ServiceCIDRDiscoverFn = func(ctx context.Context, k8sClient client.Client, namespace string) (string, error) {
return "10.101.0.0/16", nil
}
ctrl.SetupSignalHandler = mockSetupSignalHandler
t.Run(tc.name, func(t *testing.T) {
err := runMember(tc.options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ spec:
type: string
metadata:
type: object
serviceCIDR:
description: Service CIDR of the local member cluster.
type: string
type: object
served: true
storage: true
Expand Down
40 changes: 10 additions & 30 deletions multicluster/controllers/multicluster/member/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package member

import (
"context"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -43,7 +42,6 @@ type (
commonAreaGetter commonarea.RemoteCommonAreaGetter
namespace string
localClusterID string
serviceCIDR string
podCIDRs []string
leaderNamespace string
}
Expand All @@ -55,14 +53,12 @@ func NewGatewayReconciler(
client client.Client,
scheme *runtime.Scheme,
namespace string,
serviceCIDR string,
podCIDRs []string,
commonAreaGetter commonarea.RemoteCommonAreaGetter) *GatewayReconciler {
reconciler := &GatewayReconciler{
Client: client,
Scheme: scheme,
namespace: namespace,
serviceCIDR: serviceCIDR,
podCIDRs: podCIDRs,
commonAreaGetter: commonAreaGetter,
}
Expand All @@ -85,10 +81,6 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{Requeue: true}, err
}
r.leaderNamespace = commonArea.GetNamespace()
err = r.getServiceCIDR(ctx)
if err != nil {
return ctrl.Result{}, err
}

resExportName := common.NewClusterInfoResourceExportName(r.localClusterID)
resExportNamespacedName := types.NamespacedName{
Expand All @@ -102,21 +94,21 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
},
}

createOrUpdate := func(gwIP string) error {
createOrUpdate := func(gateway *mcsv1alpha1.Gateway) error {
existingResExport := &mcsv1alpha1.ResourceExport{}
err := commonArea.Get(ctx, resExportNamespacedName, existingResExport)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if apierrors.IsNotFound(err) || !existingResExport.DeletionTimestamp.IsZero() {
if err = r.createResourceExport(ctx, req, commonArea, gwIP); err != nil {
if err = r.createResourceExport(ctx, req, commonArea, gateway); err != nil {
return err
}
return nil
}
// updateResourceExport will update latest Gateway information with the existing ResourceExport's resourceVersion.
// It will return an error and retry when there is a version conflict.
if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, &mcsv1alpha1.GatewayInfo{GatewayIP: gwIP}); err != nil {
if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, gateway); err != nil {
return err
}
return nil
Expand All @@ -133,14 +125,14 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

if err := createOrUpdate(gw.GatewayIP); err != nil {
if err := createOrUpdate(gw); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gwInfo *mcsv1alpha1.GatewayInfo) error {
commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gw *mcsv1alpha1.Gateway) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
Kind: constants.ClusterInfoKind,
ClusterID: r.localClusterID,
Expand All @@ -149,9 +141,9 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R
}
resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: r.serviceCIDR,
ServiceCIDR: gw.ServiceCIDR,
PodCIDRs: r.podCIDRs,
GatewayInfos: []mcsv1alpha1.GatewayInfo{*gwInfo},
GatewayInfos: []mcsv1alpha1.GatewayInfo{{GatewayIP: gw.GatewayIP}},
}
klog.V(2).InfoS("Updating ClusterInfo kind of ResourceExport", "clusterinfo", klog.KObj(existingResExport),
"gateway", req.NamespacedName)
Expand All @@ -163,7 +155,7 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R
}

func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, gatewayIP string) error {
commonArea commonarea.RemoteCommonArea, gateway *mcsv1alpha1.Gateway) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
Kind: constants.ClusterInfoKind,
ClusterID: r.localClusterID,
Expand All @@ -172,11 +164,11 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R
}
resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: r.serviceCIDR,
ServiceCIDR: gateway.ServiceCIDR,
PodCIDRs: r.podCIDRs,
GatewayInfos: []mcsv1alpha1.GatewayInfo{
{
GatewayIP: gatewayIP,
GatewayIP: gateway.GatewayIP,
},
},
}
Expand Down Expand Up @@ -206,15 +198,3 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
}).
Complete(r)
}

// getServiceCIDR gets Service ClusterIP CIDR used in the member cluster.
func (r *GatewayReconciler) getServiceCIDR(ctx context.Context) error {
if len(r.serviceCIDR) == 0 {
serviceCIDR, err := common.DiscoverServiceCIDRByInvalidServiceCreation(ctx, r.Client, r.namespace)
if err != nil {
return fmt.Errorf("failed to find Service ClusterIP range automatically, you may set the 'serviceCIDR' config as an alternative, err: %v, ", err)
}
r.serviceCIDR = serviceCIDR
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package member

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -157,7 +156,7 @@ func TestGatewayReconciler(t *testing.T) {
mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false)
mcReconciler.SetRemoteCommonArea(commonArea)
commonAreaGatter := mcReconciler
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", "10.96.0.0/12", []string{"10.200.1.1/16"}, commonAreaGatter)
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, commonAreaGatter)
t.Run(tt.name, func(t *testing.T) {
req := ctrl.Request{NamespacedName: tt.namespacedName}
if _, err := r.Reconcile(common.TestCtx, req); err != nil {
Expand Down Expand Up @@ -186,10 +185,3 @@ func TestGatewayReconciler(t *testing.T) {
})
}
}

func TestGetServiceCIDR(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects().Build()
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", "", []string{"10.200.1.1/16"}, nil)
err := r.getServiceCIDR(context.TODO())
assert.Contains(t, err.Error(), "expected a specific error but none was returned")
}
Loading

0 comments on commit d7ffc85

Please sign in to comment.