Skip to content

Commit

Permalink
Add cluster's Service CIDR to Multi-cluster Gateway
Browse files Browse the repository at this point in the history
Signed-off-by: hjiajing <hjiajing@vmware.com>
  • Loading branch information
hjiajing committed Mar 22, 2023
1 parent be1b86d commit d6352b0
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 33 deletions.
2 changes: 2 additions & 0 deletions multicluster/apis/multicluster/v1alpha1/gateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Gateway struct {
GatewayIP string `json:"gatewayIP,omitempty"`
// In-cluster tunnel IP of the Gateway.
InternalIP string `json:"internalIP,omitempty"`
// Service CIDR of the local cluster.
ServiceCIDR string `json:"serviceCIDR,omitempty"`
}

type ClusterInfo struct {
Expand Down
3 changes: 3 additions & 0 deletions multicluster/build/yamls/antrea-multicluster-member.yml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ spec:
type: string
metadata:
type: object
serviceCIDR:
description: Service CIDR of the local cluster.
type: string
type: object
served: true
storage: true
Expand Down
8 changes: 8 additions & 0 deletions multicluster/cmd/multicluster-controller/gateway_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func (v *gatewayValidator) Handle(ctx context.Context, req admission.Request) ad
klog.ErrorS(err, "failed to create Gateway", "Gateway", klog.KObj(gateway), "Namespace", v.namespace)
return admission.Errored(http.StatusPreconditionFailed, err)
}
if req.Operation == admissionv1.Update && len(gatewayList.Items) > 0 {
oldGateway := gatewayList.Items[0]
if oldGateway.ServiceCIDR != gateway.ServiceCIDR {
klog.ErrorS(err, "The field 'serviceCIDR' is immutable", "Gateway", klog.KObj(gateway))
return admission.Denied("the field 'serviceCIDR' is immutable")
}
return admission.Allowed("")
}
return admission.Allowed("")
}

Expand Down
36 changes: 36 additions & 0 deletions multicluster/cmd/multicluster-controller/gateway_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,16 @@ func TestWebhookGatewayEvents(t *testing.T) {
Name: "node-2",
},
}
updatedGateway := &mcsv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "node-2",
},
ServiceCIDR: "10.100.0.0/16",
}

newGW, _ := json.Marshal(newGateway)
updatedGW, _ := json.Marshal(updatedGateway)

newReq := admission.Request{
AdmissionRequest: v1.AdmissionRequest{
Expand All @@ -77,6 +85,28 @@ func TestWebhookGatewayEvents(t *testing.T) {
},
}

updateReq := admission.Request{
AdmissionRequest: v1.AdmissionRequest{
UID: "07e52e8d-4513-11e9-a716-42010a800270",
Kind: metav1.GroupVersionKind{
Group: "multicluster.crd.antrea.io",
Version: "v1alpha1",
Kind: "Gateway",
},
Resource: metav1.GroupVersionResource{
Group: "multicluster.crd.antrea.io",
Version: "v1alpha1",
Resource: "Gateways",
},
Name: "node-2",
Namespace: "default",
Operation: v1.Update,
Object: runtime.RawExtension{
Raw: updatedGW,
},
},
}

newReqCopy := newReq.DeepCopy()
invalidReq := admission.Request{
AdmissionRequest: *newReqCopy,
Expand Down Expand Up @@ -106,6 +136,12 @@ func TestWebhookGatewayEvents(t *testing.T) {
req: invalidReq,
isAllowed: false,
},
{
name: "failed to update a Gateway's ServiceCIDR field",
existingGateway: existingGateway,
req: updateReq,
isAllowed: false,
},
}

newScheme := runtime.NewScheme()
Expand Down
1 change: 1 addition & 0 deletions multicluster/cmd/multicluster-controller/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func runMember(o *Options) error {
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
opts.ServiceCIDR,
opts.GatewayIPPrecedence)
if err = nodeReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating Node controller: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ spec:
type: string
metadata:
type: object
serviceCIDR:
description: Service CIDR of the local cluster.
type: string
type: object
served: true
storage: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package member

import (
"context"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -85,10 +84,6 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{Requeue: true}, err
}
r.leaderNamespace = commonArea.GetNamespace()
err = r.getServiceCIDR(ctx)
if err != nil {
return ctrl.Result{}, err
}

resExportName := common.NewClusterInfoResourceExportName(r.localClusterID)
resExportNamespacedName := types.NamespacedName{
Expand Down Expand Up @@ -132,6 +127,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
return ctrl.Result{}, nil
}
r.serviceCIDR = gw.ServiceCIDR

if err := createOrUpdate(gw.GatewayIP); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -206,15 +202,3 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
}).
Complete(r)
}

// getServiceCIDR gets Service ClusterIP CIDR used in the member cluster.
func (r *GatewayReconciler) getServiceCIDR(ctx context.Context) error {
if len(r.serviceCIDR) == 0 {
serviceCIDR, err := common.DiscoverServiceCIDRByInvalidServiceCreation(ctx, r.Client, r.namespace)
if err != nil {
return fmt.Errorf("failed to find Service ClusterIP range automatically, you may set the 'serviceCIDR' config as an alternative, err: %v, ", err)
}
r.serviceCIDR = serviceCIDR
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package member

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -186,10 +185,3 @@ func TestGatewayReconciler(t *testing.T) {
})
}
}

func TestGetServiceCIDR(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects().Build()
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", "", []string{"10.200.1.1/16"}, nil)
err := r.getServiceCIDR(context.TODO())
assert.Contains(t, err.Error(), "expected a specific error but none was returned")
}
39 changes: 33 additions & 6 deletions multicluster/controllers/multicluster/member/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type (
precedence mcsv1alpha1.Precedence
gatewayCandidates map[string]bool
activeGateway string
serviceCIDR string
initialized bool
}
)
Expand All @@ -57,6 +58,7 @@ func NewNodeReconciler(
client client.Client,
scheme *runtime.Scheme,
namespace string,
serviceCIDR string,
precedence mcsv1alpha1.Precedence) *NodeReconciler {
if string(precedence) == "" {
precedence = mcsv1alpha1.PrecedenceInternal
Expand All @@ -65,6 +67,7 @@ func NewNodeReconciler(
Client: client,
Scheme: scheme,
namespace: namespace,
serviceCIDR: serviceCIDR,
precedence: precedence,
gatewayCandidates: make(map[string]bool),
}
Expand Down Expand Up @@ -116,12 +119,15 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
var isValidGateway bool

if stillGatewayNode {
gw.ServiceCIDR, err = r.getServiceCIDR()
if err != nil {
klog.ErrorS(err, "Failed to discover Service CIDR in cluster")
}
gw.InternalIP, gw.GatewayIP, err = r.getGatawayNodeIP(node)
if err != nil {
klog.ErrorS(err, "There is no valid Gateway IP for Node", "node", node.Name)
} else {
isValidGateway = true
}
isValidGateway = err == nil
}

if isActiveGateway {
Expand Down Expand Up @@ -194,11 +200,13 @@ func (r *NodeReconciler) updateActiveGateway(ctx context.Context, newGateway *mc
if err := r.Client.Get(ctx, types.NamespacedName{Name: newGateway.Name, Namespace: r.namespace}, existingGW); err != nil {
return err
}
if existingGW.GatewayIP == newGateway.GatewayIP && existingGW.InternalIP == newGateway.InternalIP {
if existingGW.GatewayIP == newGateway.GatewayIP && existingGW.InternalIP == newGateway.InternalIP &&
existingGW.ServiceCIDR == newGateway.ServiceCIDR {
return nil
}
existingGW.GatewayIP = newGateway.GatewayIP
existingGW.InternalIP = newGateway.InternalIP
existingGW.ServiceCIDR = newGateway.ServiceCIDR
// If the Gateway version in the client cache is stale, the update operation will fail,
// then the reconciler will retry with latest state again.
if err := r.Client.Update(ctx, existingGW, &client.UpdateOptions{}); err != nil {
Expand Down Expand Up @@ -230,8 +238,13 @@ func (r *NodeReconciler) recreateActiveGateway(ctx context.Context, gateway *mcs
// creates a Gateway. It returns no error if no good Gateway candidate.
func (r *NodeReconciler) getValidGatewayFromCandidates() (*mcsv1alpha1.Gateway, error) {
var activeGateway *mcsv1alpha1.Gateway
var internalIP, gwIP string
var internalIP, gwIP, serviceCIDR string
var err error

if serviceCIDR, err = r.getServiceCIDR(); err != nil {
klog.V(2).ErrorS(err, "Failed to discover Service CIDR in local cluster")
return nil, err
}
gatewayNode := &corev1.Node{}
for name := range r.gatewayCandidates {
if err = r.Client.Get(context.Background(), types.NamespacedName{Name: name}, gatewayNode); err == nil {
Expand All @@ -242,13 +255,15 @@ func (r *NodeReconciler) getValidGatewayFromCandidates() (*mcsv1alpha1.Gateway,
klog.V(2).ErrorS(err, "Node has no valid IP", "node", gatewayNode.Name)
continue
}

activeGateway = &mcsv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: gatewayNode.Name,
Namespace: r.namespace,
},
GatewayIP: gwIP,
InternalIP: internalIP,
GatewayIP: gwIP,
InternalIP: internalIP,
ServiceCIDR: serviceCIDR,
}
klog.InfoS("Found good Gateway candidate", "node", gatewayNode.Name)
return activeGateway, nil
Expand Down Expand Up @@ -297,6 +312,18 @@ func (r *NodeReconciler) getGatawayNodeIP(node *corev1.Node) (string, string, er
return internalIP, gatewayIP, nil
}

func (r *NodeReconciler) getServiceCIDR() (string, error) {
if r.serviceCIDR != "" {
return r.serviceCIDR, nil
}
serviceCIDR, err := common.DiscoverServiceCIDRByInvalidServiceCreation(context.TODO(), r.Client, r.namespace)
if err != nil {
return "", err
}
r.serviceCIDR = serviceCIDR
return r.serviceCIDR, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestNodeReconciler(t *testing.T) {
obj = append(obj, tt.existingGW)
}
fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(obj...).Build()
r := NewNodeReconciler(fakeClient, common.TestScheme, "default", tt.precedence)
r := NewNodeReconciler(fakeClient, common.TestScheme, "default", "10.100.0.0/16", tt.precedence)
r.activeGateway = tt.activeGateway
if _, err := r.Reconcile(common.TestCtx, tt.req); err != nil {
t.Errorf("Node Reconciler should handle Node events successfully but got error = %v", err)
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestInitialize(t *testing.T) {
obj = append(obj, tt.existingGW)
}
fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(obj...).Build()
r := NewNodeReconciler(fakeClient, common.TestScheme, "default", mcsv1alpha1.PrecedencePublic)
r := NewNodeReconciler(fakeClient, common.TestScheme, "default", "10.100.0.0/16", mcsv1alpha1.PrecedencePublic)
if err := r.initialize(); err != nil {
t.Errorf("Expected initialize() successfully but got err: %v", err)
} else {
Expand All @@ -325,3 +325,15 @@ func TestInitialize(t *testing.T) {
})
}
}

func TestGetServiceCIDR(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects().Build()
r := NewNodeReconciler(fakeClient, common.TestScheme, "default", "", mcsv1alpha1.PrecedencePublic)

_, err := r.getServiceCIDR()
assert.Contains(t, err.Error(), "expected a specific error but none was returned")
r.serviceCIDR = "10.100.0.0/16"
serviceCIDR, err := r.getServiceCIDR()
assert.Nil(t, err)
assert.Equal(t, "10.100.0.0/16", serviceCIDR)
}

0 comments on commit d6352b0

Please sign in to comment.