Skip to content

Commit

Permalink
Support Allocating Egress IPs from ExternalIPPools
Browse files Browse the repository at this point in the history
If users don't specify EgressIP of an Egress, antrea-controller will
allocate one IP from the specified ExternalIPPool if it's available.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Jun 3, 2021
1 parent 13e0f36 commit eee68d8
Show file tree
Hide file tree
Showing 5 changed files with 595 additions and 20 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func run(o *Options) error {
cgv1a2Informer := crdInformerFactory.Crd().V1alpha2().ClusterGroups()
cgInformer := crdInformerFactory.Crd().V1alpha3().ClusterGroups()
egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses()
externalIPPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools()

clusterIdentityAllocator := clusteridentity.NewClusterIdentityAllocator(
env.GetAntreaNamespace(),
Expand Down Expand Up @@ -226,7 +227,7 @@ func run(o *Options) error {

var egressController *egress.EgressController
if features.DefaultFeatureGate.Enabled(features.Egress) {
egressController = egress.NewEgressController(groupEntityIndex, egressInformer, egressGroupStore)
egressController = egress.NewEgressController(crdClient, groupEntityIndex, egressInformer, externalIPPoolInformer, egressGroupStore)
}

var traceflowController *traceflow.Controller
Expand Down
276 changes: 258 additions & 18 deletions pkg/controller/egress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
package egress

import (
"context"
"fmt"
"net"
"reflect"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
Expand All @@ -27,7 +33,10 @@ import (
"antrea.io/antrea/pkg/apis/controlplane"
egressv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
"antrea.io/antrea/pkg/apiserver/storage"
clientset "antrea.io/antrea/pkg/client/clientset/versioned"
egressinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2"
egresslisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2"
"antrea.io/antrea/pkg/controller/egress/ipallocator"
"antrea.io/antrea/pkg/controller/grouping"
antreatypes "antrea.io/antrea/pkg/controller/types"
)
Expand All @@ -45,9 +54,28 @@ const (
egressGroupType grouping.GroupType = "egressGroup"
)

// ipAllocation contains the IP and the Allocator which allocates it.
type ipAllocation struct {
IP net.IP
Allocator ipallocator.MultiIPSetAllocator
}

// EgressController is responsible for synchronizing the EgressGroups selected by Egresses.
type EgressController struct {
crdClient clientset.Interface
externalIPPoolLister egresslisters.ExternalIPPoolLister
externalIPPoolListerSynced cache.InformerSynced
// ipAllocatorMap is a map from ExternalIPPool name to MultiIPSetAllocator.
ipAllocatorMap map[string]ipallocator.MultiIPSetAllocator
ipAllocatorMutex sync.RWMutex

// ipAllocationMap is a map from Egress name to ipAllocation, which is used to check whether the Egress's IP has
// changed and to release the IP after the Egress is removed.
ipAllocationMap map[string]*ipAllocation
ipAllocationMutex sync.RWMutex

egressInformer egressinformers.EgressInformer
egressLister egresslisters.EgressLister
// egressListerSynced is a function which returns true if the Egresses shared informer has been synced at least once.
egressListerSynced cache.InformerSynced
// egressGroupStore is the storage where the EgressGroups are stored.
Expand All @@ -61,16 +89,23 @@ type EgressController struct {
}

// NewEgressController returns a new *EgressController.
func NewEgressController(groupingInterface grouping.Interface,
func NewEgressController(crdClient clientset.Interface, groupingInterface grouping.Interface,
egressInformer egressinformers.EgressInformer,
externalIPPoolInformer egressinformers.ExternalIPPoolInformer,
egressGroupStore storage.Interface) *EgressController {
c := &EgressController{
egressInformer: egressInformer,
egressListerSynced: egressInformer.Informer().HasSynced,
egressGroupStore: egressGroupStore,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "egress"),
groupingInterface: groupingInterface,
groupingInterfaceSynced: groupingInterface.HasSynced,
crdClient: crdClient,
egressInformer: egressInformer,
egressLister: egressInformer.Lister(),
egressListerSynced: egressInformer.Informer().HasSynced,
externalIPPoolLister: externalIPPoolInformer.Lister(),
externalIPPoolListerSynced: externalIPPoolInformer.Informer().HasSynced,
egressGroupStore: egressGroupStore,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "egress"),
groupingInterface: groupingInterface,
groupingInterfaceSynced: groupingInterface.HasSynced,
ipAllocatorMap: map[string]ipallocator.MultiIPSetAllocator{},
ipAllocationMap: map[string]*ipAllocation{},
}
// Add handlers for Group events and Egress events.
c.groupingInterface.AddEventHandler(egressGroupType, c.enqueueEgressGroup)
Expand All @@ -82,6 +117,14 @@ func NewEgressController(groupingInterface grouping.Interface,
},
resyncPeriod,
)
externalIPPoolInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addExternalIPPool,
UpdateFunc: c.updateExternalIPPool,
DeleteFunc: c.deleteExternalIPPool,
},
resyncPeriod,
)
return c
}

Expand All @@ -92,17 +135,99 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting %s", controllerName)
defer klog.Infof("Shutting down %s", controllerName)

cacheSyncs := []cache.InformerSynced{c.egressListerSynced, c.groupingInterfaceSynced}
cacheSyncs := []cache.InformerSynced{c.egressListerSynced, c.externalIPPoolListerSynced, c.groupingInterfaceSynced}
if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) {
return
}

ipPools, _ := c.externalIPPoolLister.List(labels.Everything())
for _, ipPool := range ipPools {
c.createOrUpdateIPAllocator(ipPool)
}

egresses, _ := c.egressLister.List(labels.Everything())
for _, egress := range egresses {
// Ignore Egress that is not associated to ExternalIPPool or doesn't have EgressIP assigned.
if egress.Spec.ExternalIPPool == "" || egress.Spec.EgressIP == "" {
continue
}
ipAllocator, exists := c.getIPAllocator(egress.Spec.ExternalIPPool)
if !exists {
klog.Error("The Egress IP of %s was allocated from an unknown ExternalIPPool", egress.Name)
continue
}
ip := net.ParseIP(egress.Spec.EgressIP)
err := ipAllocator.AllocateIP(ip)
if err != nil {
klog.Error("Failed to allocate IP %s to Egress %s", egress.Spec.EgressIP, egress.Name)
continue
}
// Record the valid IP allocation.
c.setIPAllocation(egress.Name, ip, ipAllocator)
}

for i := 0; i < defaultWorkers; i++ {
go wait.Until(c.egressGroupWorker, time.Second, stopCh)
}
<-stopCh
}

// createOrUpdateIPAllocator creates or updates the IP allocator based on the provided IPPool.
func (c *EgressController) createOrUpdateIPAllocator(ipPool *egressv1alpha2.ExternalIPPool) {
c.ipAllocationMutex.Lock()
defer c.ipAllocationMutex.Unlock()

ipAllocator, exists := c.ipAllocatorMap[ipPool.Name]
if !exists {
ipAllocator = ipallocator.MultiIPSetAllocator{}
}
// Save the existing ipSets of the IP allocator.
staleIPSets := sets.StringKeySet(ipAllocator)
for _, ipSet := range ipPool.Spec.IPSets {
ipSetStr := ipSet.CIDR
if ipSet.IPRange != nil {
ipSetStr = fmt.Sprintf("%s-%s", ipSet.IPRange.Start, ipSet.IPRange.End)
}
staleIPSets.Delete(ipSetStr)
// The ipSet is already in the allocator.
if _, exists := ipAllocator[ipSetStr]; exists {
continue
}
var ipSetAllocator *ipallocator.IPSetAllocator
var err error
if ipSet.IPRange != nil {
ipSetAllocator, err = ipallocator.NewIPRangeAllocator(ipSet.IPRange.Start, ipSet.IPRange.End)
} else {
ipSetAllocator, err = ipallocator.NewCIDRAllocator(ipSet.CIDR)
}
if err != nil {
klog.Error("Failed to create allocator for ipSet %v", ipSet)
continue
}
ipAllocator[ipSetStr] = ipSetAllocator
}
// Delete the stale ipSets from the IP allocator.
for ipSet := range staleIPSets {
delete(ipAllocator, ipSet)
}
c.ipAllocatorMap[ipPool.Name] = ipAllocator
}

// deleteIPAllocator deletes the IP allocator of given IP pool name.
func (c *EgressController) deleteIPAllocator(ipPoolName string) {
c.ipAllocationMutex.Lock()
defer c.ipAllocationMutex.Unlock()
delete(c.ipAllocatorMap, ipPoolName)
}

// getIPAllocator gets the IP allocator of given IP pool name.
func (c *EgressController) getIPAllocator(ipPoolName string) (ipallocator.MultiIPSetAllocator, bool) {
c.ipAllocationMutex.RLock()
defer c.ipAllocationMutex.RUnlock()
ipAllocator, exists := c.ipAllocatorMap[ipPoolName]
return ipAllocator, exists
}

func (c *EgressController) egressGroupWorker() {
for c.processNextEgressGroupWorkItem() {
}
Expand All @@ -115,7 +240,7 @@ func (c *EgressController) processNextEgressGroupWorkItem() bool {
}
defer c.queue.Done(key)

err := c.syncEgressGroup(key.(string))
err := c.syncEgress(key.(string))
if err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.queue.AddRateLimited(key)
Expand All @@ -128,13 +253,108 @@ func (c *EgressController) processNextEgressGroupWorkItem() bool {
return true
}

func (c *EgressController) syncEgressGroup(key string) error {
func (c *EgressController) getIPAllocation(egressName string) (net.IP, ipallocator.MultiIPSetAllocator, bool) {
c.ipAllocationMutex.RLock()
defer c.ipAllocationMutex.RUnlock()
allocation, exists := c.ipAllocationMap[egressName]
if !exists {
return nil, nil, false
}
return allocation.IP, allocation.Allocator, true
}

func (c *EgressController) deleteIPAllocation(egressName string) {
c.ipAllocationMutex.Lock()
defer c.ipAllocationMutex.Unlock()
delete(c.ipAllocationMap, egressName)
}

func (c *EgressController) setIPAllocation(egressName string, ip net.IP, allocator ipallocator.MultiIPSetAllocator) {
c.ipAllocationMutex.Lock()
defer c.ipAllocationMutex.Unlock()
c.ipAllocationMap[egressName] = &ipAllocation{
IP: ip,
Allocator: allocator,
}
}

func (c *EgressController) syncEgressIP(egress *egressv1alpha2.Egress) error {
prevIP, prevAllocator, exists := c.getIPAllocation(egress.Name)
if exists {
// The Egress IP doesn't change, do nothing.
if prevIP.String() == egress.Spec.EgressIP {
return nil
}
klog.V(2).Infof("Releasing IP %v for Egress %s", prevIP, egress.Name)
// The Egress IP changes, release the previous one first.
if err := prevAllocator.Release(prevIP); err != nil {
klog.Errorf("Failed to release IP %v for Egress %s: %v", prevIP, egress.Name, err)
}
c.deleteIPAllocation(egress.Name)
}

ipSetAllocator, exists := c.ipAllocatorMap[egress.Spec.ExternalIPPool]
if !exists {
return fmt.Errorf("ExternalIPPool %s was not found", egress.Spec.ExternalIPPool)
}

var ip net.IP
// User specifies the Egress IP, try to allocate it. If it fails, the datapath may still work, we just don't track
// the IP allocation so deleting this Egress won't release the IP to the Pool.
if egress.Spec.EgressIP != "" {
ip = net.ParseIP(egress.Spec.EgressIP)
if err := ipSetAllocator.AllocateIP(ip); err != nil {
klog.Errorf("Failed to allocate IP %s for Egress %s from ExternalIPPool %s: %v", egress.Spec.EgressIP, egress.Name, egress.Spec.ExternalIPPool, err)
return nil
}
} else {
var err error
// User doesn't specify the Egress IP, allocate one.
if ip, err = ipSetAllocator.AllocateNext(); err != nil {
return err
}
toUpdate := egress.DeepCopy()
toUpdate.Spec.EgressIP = ip.String()
if _, err = c.crdClient.CrdV1alpha2().Egresses().Update(context.TODO(), toUpdate, metav1.UpdateOptions{}); err != nil {
ipSetAllocator.Release(ip)
return fmt.Errorf("error when updating Egress %v", egress)
}
}
klog.V(2).Infof("Allocated IP %v for Egress %s", ip, egress.Name)
c.setIPAllocation(egress.Name, ip, ipSetAllocator)
return nil
}

func (c *EgressController) syncEgress(key string) error {
startTime := time.Now()
defer func() {
d := time.Since(startTime)
klog.V(2).Infof("Finished syncing EgressGroup %s. (%v)", key, d)
klog.V(2).Infof("Finished syncing Egress %s. (%v)", key, d)
}()

egress, err := c.egressLister.Get(key)
if err != nil {
// The Egress had been deleted, release its EgressIP if there was one.
prevIP, prevAllocator, exists := c.getIPAllocation(key)
if !exists {
return nil
}
klog.V(2).Infof("Releasing IP %v for Egress %s", prevIP, key)
if err := prevAllocator.Release(prevIP); err != nil {
klog.Errorf("Failed to release IP %v for Egress %s: %v", prevIP, key, err)
}
c.deleteIPAllocation(key)
return nil
}

// Sync Egress IP if ExternalIPPool is specified. Otherwise the Egress IP is supposed to be set and configured on
// network interfaces manually.
if egress.Spec.ExternalIPPool != "" {
if err := c.syncEgressIP(egress); err != nil {
return err
}
}

egressGroupObj, found, _ := c.egressGroupStore.Get(key)
if !found {
klog.V(2).Infof("EgressGroup %s not found", key)
Expand Down Expand Up @@ -209,15 +429,12 @@ func (c *EgressController) updateEgress(old, cur interface{}) {
oldEgress := old.(*egressv1alpha2.Egress)
curEgress := cur.(*egressv1alpha2.Egress)
klog.Infof("Processing Egress %s UPDATE event with selector (%s)", curEgress.Name, curEgress.Spec.AppliedTo)
// Do nothing if AppliedTo doesn't change.
// TODO: Define custom Equal function to be more efficient.
if reflect.DeepEqual(oldEgress.Spec.AppliedTo, curEgress.Spec.AppliedTo) {
return
if !reflect.DeepEqual(oldEgress.Spec.AppliedTo, curEgress.Spec.AppliedTo) {
// Update the group's selector in the grouping interface.
groupSelector := antreatypes.NewGroupSelector("", curEgress.Spec.AppliedTo.PodSelector, curEgress.Spec.AppliedTo.NamespaceSelector, nil)
c.groupingInterface.AddGroup(egressGroupType, curEgress.Name, groupSelector)
}

// Update the group's selector in the grouping interface.
groupSelector := antreatypes.NewGroupSelector("", curEgress.Spec.AppliedTo.PodSelector, curEgress.Spec.AppliedTo.NamespaceSelector, nil)
c.groupingInterface.AddGroup(egressGroupType, curEgress.Name, groupSelector)
c.queue.Add(curEgress.Name)
}

Expand All @@ -228,4 +445,27 @@ func (c *EgressController) deleteEgress(obj interface{}) {
c.egressGroupStore.Delete(egress.Name)
// Unregister the group from the grouping interface.
c.groupingInterface.DeleteGroup(egressGroupType, egress.Name)
c.queue.Add(egress.Name)
}

// addExternalIPPool processes ExternalIPPool ADD events and creates corresponding IP allocator.
func (c *EgressController) addExternalIPPool(obj interface{}) {
pool := obj.(*egressv1alpha2.ExternalIPPool)
klog.Infof("Processing ExternalIPPool %s ADD event", pool.Name)
c.createOrUpdateIPAllocator(pool)
// TODO: Trigger the process of Egresses that consume the pool and don't have EgressIP yet.
}

func (c *EgressController) updateExternalIPPool(_, cur interface{}) {
pool := cur.(*egressv1alpha2.ExternalIPPool)
klog.Infof("Processing ExternalIPPool %s UPDATE event", pool.Name)
c.createOrUpdateIPAllocator(pool)
// TODO: Trigger the process of Egresses that consume the pool and don't have EgressIP yet.
}

// deleteEgress processes Egress DELETE events and deletes corresponding EgressGroup.
func (c *EgressController) deleteExternalIPPool(obj interface{}) {
pool := obj.(*egressv1alpha2.ExternalIPPool)
klog.Infof("Processing ExternalIPPool %s DELETE event", pool.Name)
c.deleteIPAllocator(pool.Name)
}
2 changes: 1 addition & 1 deletion pkg/controller/egress/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newController(objects ...runtime.Object) *egressController {
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Namespaces(),
crdInformerFactory.Crd().V1alpha2().ExternalEntities())
controller := NewEgressController(groupEntityIndex, egressInformer, egressGroupStore)
controller := NewEgressController(crdClient, groupEntityIndex, egressInformer, egressGroupStore)
return &egressController{
controller,
client,
Expand Down
Loading

0 comments on commit eee68d8

Please sign in to comment.