Skip to content

Commit

Permalink
Address updateCNP comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Ding <dingyang@vmware.com>
  • Loading branch information
Dyanngg committed May 26, 2021
1 parent b252f38 commit 06e937e
Showing 1 changed file with 47 additions and 6 deletions.
53 changes: 47 additions & 6 deletions pkg/controller/networkpolicy/clusternetworkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,14 @@ func (n *NetworkPolicyController) deleteCNP(old interface{}) {
defer n.heartbeat("deleteCNP")
klog.Infof("Processing ClusterNetworkPolicy %s DELETE event", cnp.Name)
key := internalNetworkPolicyKeyFunc(cnp)
// Lock access to internal NetworkPolicy store so that concurrent reprocessCNP
// calls will not re-process and add a CNP that has already been deleted.
n.internalNetworkPolicyMutex.Lock()
oldInternalNPObj, _, _ := n.internalNetworkPolicyStore.Get(key)
oldInternalNP := oldInternalNPObj.(*antreatypes.NetworkPolicy)
klog.V(2).Infof("Deleting internal NetworkPolicy %s for %s", oldInternalNP.Name, oldInternalNP.SourceRef.ToString())
err := n.internalNetworkPolicyStore.Delete(key)
n.internalNetworkPolicyMutex.Unlock()
if err != nil {
klog.Errorf("Error deleting internal NetworkPolicy during NetworkPolicy %s delete: %v", cnp.Name, err)
return
Expand All @@ -122,6 +126,43 @@ func (n *NetworkPolicyController) deleteCNP(old interface{}) {
n.deleteDereferencedAddressGroups(oldInternalNP)
}

// reprocessCNP is triggered by Namespace ADD/UPDATE/DELETE events when they impact the
// per-namespace rules of a CNP.
func (n *NetworkPolicyController) reprocessCNP(cnp *crdv1alpha1.ClusterNetworkPolicy) {
key := internalNetworkPolicyKeyFunc(cnp)
n.internalNetworkPolicyMutex.Lock()
oldInternalNPObj, exist, _ := n.internalNetworkPolicyStore.Get(key)
if !exist {
klog.V(2).Infof("Cannot find the original internal NetworkPolicy, skip reprocessCNP")
n.internalNetworkPolicyMutex.Unlock()
return
}
defer n.heartbeat("reprocessCNP")
klog.Infof("Processing ClusterNetworkPolicy %s REPROCESS event", cnp.Name)
oldInternalNP := oldInternalNPObj.(*antreatypes.NetworkPolicy)
curInternalNP := n.processClusterNetworkPolicy(cnp)
// Must preserve old internal NetworkPolicy Span.
curInternalNP.SpanMeta = oldInternalNP.SpanMeta
n.internalNetworkPolicyStore.Update(curInternalNP)
n.internalNetworkPolicyMutex.Unlock()
// Enqueue addressGroup keys to update their Node span.
for _, rule := range curInternalNP.Rules {
for _, addrGroupName := range rule.From.AddressGroups {
n.enqueueAddressGroup(addrGroupName)
}
for _, addrGroupName := range rule.To.AddressGroups {
n.enqueueAddressGroup(addrGroupName)
}
}
n.enqueueInternalNetworkPolicy(key)
for _, atg := range oldInternalNP.AppliedToGroups {
// Delete the old AppliedToGroup object if it is not referenced
// by any internal NetworkPolicy.
n.deleteDereferencedAppliedToGroup(atg)
}
n.deleteDereferencedAddressGroups(oldInternalNP)
}

// filterPerNamespaceRuleACNPsByNSLabels gets all ClusterNetworkPolicy names that will need to be
// re-processed based on the entire label set of an added/updated/deleted Namespace.
func (n *NetworkPolicyController) filterPerNamespaceRuleACNPsByNSLabels(nsLabels labels.Set) sets.String {
Expand Down Expand Up @@ -155,7 +196,7 @@ func (n *NetworkPolicyController) addNamespace(obj interface{}) {
affectedACNPs := n.filterPerNamespaceRuleACNPsByNSLabels(namespace.Labels)
for cnpName := range affectedACNPs {
if cnp, err := n.cnpLister.Get(cnpName); err == nil {
n.updateCNP(cnp, cnp)
n.reprocessCNP(cnp)
}
}
}
Expand All @@ -172,7 +213,7 @@ func (n *NetworkPolicyController) updateNamespace(oldObj, curObj interface{}) {
affectedACNPs := utilsets.SymmetricDifference(affectedACNPsByOldLabels, affectedACNPsByCurLabels)
for cnpName := range affectedACNPs {
if cnp, err := n.cnpLister.Get(cnpName); err == nil {
n.updateCNP(cnp, cnp)
n.reprocessCNP(cnp)
}
}
}
Expand Down Expand Up @@ -202,7 +243,7 @@ func (n *NetworkPolicyController) deleteNamespace(old interface{}) {
klog.Errorf("Error getting Antrea ClusterNetworkPolicy %s", cnpName)
continue
}
n.updateCNP(cnp, cnp)
n.reprocessCNP(cnp)
}
}

Expand All @@ -223,9 +264,9 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C
// The span calculation and stale appliedToGroup cleanup logic would work seamlessly for both cases.
atgNamesSet := sets.String{}
// affectedNamespaceSelectors tracks all the appliedTo's namespaceSelectors of per-namespace rules.
// It is used as an index for internalNetworkPolicyStore, so that Namespace updates can trigger
// ACNPs that selects this Namespace's label to be re-processed, and corresponding rules
// to re-calculate affected Namespaces.
// It is used by the PerNamespaceRuleIndex for internalNetworkPolicyStore to filter out internal NPs
// that has per-namespace rules, and in Namespace ADD/UPDATE/DELETE events, trigger ACNPs that selects
// this Namespace's label to be re-processed, and corresponding rules to re-calculate affected Namespaces.
var affectedNamespaceSelectors []labels.Selector
// If appliedTo is set at spec level and the ACNP has per-namespace rules, then each appliedTo needs
// to be split into appliedToGroups for each of its affected Namespace.
Expand Down

0 comments on commit 06e937e

Please sign in to comment.