Skip to content

Commit

Permalink
Support Rule Level NetworkPolicy statistics (#1780)
Browse files Browse the repository at this point in the history
This PR supports collecting and querying the NetworkPolicy statistics for
Antrea Networkpolicies. Native k8s Networkpolicies are not supported
  • Loading branch information
ceclinux authored Mar 15, 2021
1 parent d77f8ff commit e80ab3b
Show file tree
Hide file tree
Showing 34 changed files with 1,959 additions and 370 deletions.
3 changes: 3 additions & 0 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type rule struct {
To v1beta.NetworkPolicyPeer
// Protocols and Ports of this rule.
Services []v1beta.Service
// Name of this rule. Empty for k8s NetworkPolicy.
Name string
// Action of this rule. nil for k8s NetworkPolicy.
Action *secv1alpha1.RuleAction
// Priority of this rule within the NetworkPolicy. Defaults to -1 for K8s NetworkPolicy.
Expand Down Expand Up @@ -548,6 +550,7 @@ func toRule(r *v1beta.NetworkPolicyRule, policy *v1beta.NetworkPolicy, maxPriori
PolicyPriority: policy.Priority,
TierPriority: policy.TierPriority,
AppliedToGroups: appliedToGroups,
Name: r.Name,
PolicyUID: policy.UID,
SourceRef: policy.SourceRef,
EnableLogging: r.EnableLogging,
Expand Down
10 changes: 9 additions & 1 deletion pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,14 @@ func (c *Controller) GetAppliedToGroups() []v1beta2.AppliedToGroup {
}

func (c *Controller) GetNetworkPolicyByRuleFlowID(ruleFlowID uint32) *v1beta2.NetworkPolicyReference {
rule := c.GetRuleByFlowID(ruleFlowID)
if rule == nil {
return nil
}
return rule.PolicyRef
}

func (c *Controller) GetRuleByFlowID(ruleFlowID uint32) *types.PolicyRule {
rule, exists, err := c.reconciler.GetRuleByFlowID(ruleFlowID)
if err != nil {
klog.Errorf("Error when getting network policy by rule flow ID: %v", err)
Expand All @@ -354,7 +362,7 @@ func (c *Controller) GetNetworkPolicyByRuleFlowID(ruleFlowID uint32) *v1beta2.Ne
if !exists {
return nil
}
return rule.PolicyRef
return rule
}

func (c *Controller) GetControllerConnectionStatus() bool {
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
To: ofPortsToOFAddresses(ofPorts),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: rule.Action,
Name: rule.Name,
Priority: ofPriority,
TableID: table,
PolicyRef: rule.SourceRef,
Expand All @@ -450,6 +451,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: rule.Action,
Priority: ofPriority,
Name: rule.Name,
TableID: table,
PolicyRef: rule.SourceRef,
EnableLogging: rule.EnableLogging,
Expand All @@ -472,6 +474,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
To: []types.Address{},
Service: filterUnresolvablePort(rule.Services),
Action: rule.Action,
Name: rule.Name,
Priority: nil,
TableID: table,
PolicyRef: rule.SourceRef,
Expand Down
122 changes: 98 additions & 24 deletions pkg/agent/stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/vmware-tanzu/antrea/pkg/agent"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
agenttypes "github.com/vmware-tanzu/antrea/pkg/agent/types"
cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
statsv1alpha1 "github.com/vmware-tanzu/antrea/pkg/apis/stats/v1alpha1"
"github.com/vmware-tanzu/antrea/pkg/querier"
Expand All @@ -40,9 +41,9 @@ type statsCollection struct {
// networkPolicyStats is a mapping from K8s NetworkPolicy UIDs to their traffic stats.
networkPolicyStats map[types.UID]*statsv1alpha1.TrafficStats
// antreaClusterNetworkPolicyStats is a mapping from Antrea ClusterNetworkPolicy UIDs to their traffic stats.
antreaClusterNetworkPolicyStats map[types.UID]*statsv1alpha1.TrafficStats
antreaClusterNetworkPolicyStats map[types.UID]map[string]*statsv1alpha1.TrafficStats
// antreaNetworkPolicyStats is a mapping from Antrea NetworkPolicy UIDs to their traffic stats.
antreaNetworkPolicyStats map[types.UID]*statsv1alpha1.TrafficStats
antreaNetworkPolicyStats map[types.UID]map[string]*statsv1alpha1.TrafficStats
}

// Collector is responsible for collecting stats from the Openflow client, calculating the delta compared with the last
Expand Down Expand Up @@ -104,36 +105,27 @@ func (m *Collector) Run(stopCh <-chan struct{}) {
func (m *Collector) collect() *statsCollection {
ruleStatsMap := m.ofClient.NetworkPolicyMetrics()
npStatsMap := map[types.UID]*statsv1alpha1.TrafficStats{}
acnpStatsMap := map[types.UID]*statsv1alpha1.TrafficStats{}
anpStatsMap := map[types.UID]*statsv1alpha1.TrafficStats{}
acnpStatsMap := map[types.UID]map[string]*statsv1alpha1.TrafficStats{}
anpStatsMap := map[types.UID]map[string]*statsv1alpha1.TrafficStats{}

for ofID, ruleStats := range ruleStatsMap {
policyRef := m.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(ofID)
if policyRef == nil {
rule := m.networkPolicyQuerier.GetRuleByFlowID(ofID)
if rule == nil {
// This should not happen because the rule flow ID to rule mapping is
// preserved for at least 5 seconds even after the rule deletion.
klog.Warningf("Cannot find NetworkPolicy that has ofID %v", ofID)
klog.Warningf("Cannot find NetworkPolicy Rule that has ofID %v", ofID)
continue
}
klog.V(4).Infof("Converting ofID %v to policy %s", ofID, policyRef.ToString())

var statsMap map[types.UID]*statsv1alpha1.TrafficStats
switch policyRef.Type {
klog.V(4).Infof("Converting ofID %v to policy %s", ofID, rule.PolicyRef.ToString())
switch rule.PolicyRef.Type {
case cpv1beta.K8sNetworkPolicy:
statsMap = npStatsMap
addPolicyStatsUp(npStatsMap, ruleStats, rule)
case cpv1beta.AntreaClusterNetworkPolicy:
statsMap = acnpStatsMap
addRuleStatsUp(acnpStatsMap, ruleStats, rule)
case cpv1beta.AntreaNetworkPolicy:
statsMap = anpStatsMap
}

policyStats, exists := statsMap[policyRef.UID]
if !exists {
policyStats = new(statsv1alpha1.TrafficStats)
statsMap[policyRef.UID] = policyStats
addRuleStatsUp(anpStatsMap, ruleStats, rule)
}
policyStats.Bytes += int64(ruleStats.Bytes)
policyStats.Sessions += int64(ruleStats.Sessions)
policyStats.Packets += int64(ruleStats.Packets)
}
return &statsCollection{
networkPolicyStats: npStatsMap,
Expand All @@ -142,11 +134,40 @@ func (m *Collector) collect() *statsCollection {
}
}

func addPolicyStatsUp(statsMap map[types.UID]*statsv1alpha1.TrafficStats, ruleStats *agenttypes.RuleMetric, rule *agenttypes.PolicyRule) {
policyStats, exists := statsMap[rule.PolicyRef.UID]
if !exists {
policyStats = new(statsv1alpha1.TrafficStats)
statsMap[rule.PolicyRef.UID] = policyStats
}
addUp(policyStats, ruleStats)
}

func addRuleStatsUp(ruleStatsMap map[types.UID]map[string]*statsv1alpha1.TrafficStats, ruleStats *agenttypes.RuleMetric, rule *agenttypes.PolicyRule) {
lastRuleStats, exists := ruleStatsMap[rule.PolicyRef.UID]
if !exists {
lastRuleStats = make(map[string]*statsv1alpha1.TrafficStats)
ruleStatsMap[rule.PolicyRef.UID] = lastRuleStats
}
trafficStats, trafficStatsExists := lastRuleStats[rule.Name]
if !trafficStatsExists {
trafficStats = new(statsv1alpha1.TrafficStats)
lastRuleStats[rule.Name] = trafficStats
}
addUp(trafficStats, ruleStats)
}

func addUp(stats *statsv1alpha1.TrafficStats, inc *agenttypes.RuleMetric) {
stats.Sessions += int64(inc.Sessions)
stats.Packets += int64(inc.Packets)
stats.Bytes += int64(inc.Bytes)
}

// report calculates the delta of the stats and pushes it to the antrea-controller summary API.
func (m *Collector) report(curStatsCollection *statsCollection) error {
npStats := calculateDiff(curStatsCollection.networkPolicyStats, m.lastStatsCollection.networkPolicyStats)
acnpStats := calculateDiff(curStatsCollection.antreaClusterNetworkPolicyStats, m.lastStatsCollection.antreaClusterNetworkPolicyStats)
anpStats := calculateDiff(curStatsCollection.antreaNetworkPolicyStats, m.lastStatsCollection.antreaNetworkPolicyStats)
acnpStats := calculateRuleDiff(curStatsCollection.antreaClusterNetworkPolicyStats, m.lastStatsCollection.antreaClusterNetworkPolicyStats)
anpStats := calculateRuleDiff(curStatsCollection.antreaNetworkPolicyStats, m.lastStatsCollection.antreaNetworkPolicyStats)
if len(npStats) == 0 && len(acnpStats) == 0 && len(anpStats) == 0 {
klog.V(4).Info("No stats to report, skip reporting")
return nil
Expand All @@ -173,6 +194,59 @@ func (m *Collector) report(curStatsCollection *statsCollection) error {
return nil
}

func calculateRuleDiff(curStatsMap, lastStatsMap map[types.UID]map[string]*statsv1alpha1.TrafficStats) []cpv1beta.NetworkPolicyStats {
if len(curStatsMap) == 0 {
return nil
}
statsList := make([]cpv1beta.NetworkPolicyStats, 0, len(curStatsMap))
for uid, curStats := range curStatsMap {
lastStats, exists := lastStatsMap[uid]
stats := make([]statsv1alpha1.RuleTrafficStats, 0, len(curStats))
if !exists {
for name, curRuleStats := range curStats {
if curRuleStats.Bytes != 0 {
ruleTrafficStats := statsv1alpha1.RuleTrafficStats{
Name: name,
TrafficStats: *curRuleStats,
}
stats = append(stats, ruleTrafficStats)
}
}
} else {
for name, curRuleStats := range curStats {
lastRuleStats, ruleStatsExists := lastStats[name]
// curRuleStats.Bytes < lastRuleStats.Bytes could happen
// as rules with same name can be deleted and recreated later.
if (!ruleStatsExists || curRuleStats.Bytes < lastRuleStats.Bytes) && curRuleStats.Bytes != 0 {
ruleTrafficStats := statsv1alpha1.RuleTrafficStats{
Name: name,
TrafficStats: *curRuleStats,
}
stats = append(stats, ruleTrafficStats)
} else if curRuleStats.Bytes > lastRuleStats.Bytes {
ruleTrafficStats := statsv1alpha1.RuleTrafficStats{
Name: name,
TrafficStats: statsv1alpha1.TrafficStats{
Bytes: curRuleStats.Bytes - lastRuleStats.Bytes,
Sessions: curRuleStats.Sessions - lastRuleStats.Sessions,
Packets: curRuleStats.Packets - lastRuleStats.Packets,
},
}
stats = append(stats, ruleTrafficStats)
}
}
}
if len(stats) != 0 {
policyStats := cpv1beta.NetworkPolicyStats{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: uid},
RuleTrafficStats: stats,
}
statsList = append(statsList, policyStats)
}
}
return statsList
}

func calculateDiff(curStatsMap, lastStatsMap map[types.UID]*statsv1alpha1.TrafficStats) []cpv1beta.NetworkPolicyStats {
if len(curStatsMap) == 0 {
return nil
Expand Down
Loading

0 comments on commit e80ab3b

Please sign in to comment.