Skip to content

Commit

Permalink
Improve NetworkPolicy batch installation (#2479)
Browse files Browse the repository at this point in the history
BatchInstallPolicyRuleFlows first generates all flows then installs them
via single bundle. However, it generates the flows incrementally. If an
address is shared by multiple rules, intermediate flows will be
generated, for instance:

expected flow:
add "nw_dst=10.128.119.108 actions=conjunction(64,2/2),conjunction(65,2/2),conjunction(66,2/2)"

actual flows:
add "nw_dst=10.128.119.108 actions=conjunction(64,2/2)"
mod "nw_dst=10.128.119.108 actions=conjunction(64,2/2),conjunction(65,2/2)"
mod "nw_dst=10.128.119.108 actions=conjunction(64,2/2),conjunction(65,2/2),conjunction(66,2/2)"

The number of flows for this address will be same as the number of the
actions. The number of actual actions in these flows will be O(N^2),
N=number of desired actions. This increases CPU and memory usage
greatly in a high scale cluster.

This patches optimizes it by generating flows based on final state,
reducing the time and space complexity from O(N^2) to O(N). In same
cluster, it's observed that the memory usage was reduced from 1.1G to
500M, the execution time was reduced from 10s to 2s.

benchmark comparison:

name                            old time/op    new time/op    delta
BatchInstallPolicyRuleFlows-48     458ms ± 4%     169ms ± 1%  -63.22%  (p=0.008 n=5+5)

name                            old alloc/op   new alloc/op   delta
BatchInstallPolicyRuleFlows-48     205MB ± 0%      56MB ± 0%  -72.47%  (p=0.008 n=5+5)

name                            old allocs/op  new allocs/op  delta
BatchInstallPolicyRuleFlows-48     4.29M ± 0%     0.92M ± 0%  -78.60%  (p=0.008 n=5+5)

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn authored Jul 30, 2021
1 parent 7a7e1da commit ae98a19
Show file tree
Hide file tree
Showing 5 changed files with 450 additions and 125 deletions.
141 changes: 106 additions & 35 deletions pkg/agent/openflow/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ type conjMatchFlowContext struct {

// createOrUpdateConjunctiveMatchFlow creates or updates the conjunctive match flow with the latest actions. It returns
// the flowChange including the changed OpenFlow entry and the expected operation which need to be applied on the OVS bridge.
func (ctx *conjMatchFlowContext) createOrUpdateConjunctiveMatchFlow(actions []*conjunctiveAction, priority *uint16) *flowChange {
func (ctx *conjMatchFlowContext) createOrUpdateConjunctiveMatchFlow(actions []*conjunctiveAction) *flowChange {
// Check if flow is already installed. If not, create a new flow.
if ctx.flow == nil {
// Check the number of valid conjunctiveActions, and return nil immediately if it is 0. It happens when the match
Expand All @@ -305,7 +305,7 @@ func (ctx *conjMatchFlowContext) createOrUpdateConjunctiveMatchFlow(actions []*c

// Create the conjunctive match flow entry. The actions here should not be empty for either add or update case.
// The expected operation for a new Openflow entry should be "insertion".
flow := ctx.client.conjunctiveMatchFlow(ctx.tableID, ctx.matchKey, ctx.matchValue, priority, actions...)
flow := ctx.client.conjunctiveMatchFlow(ctx.tableID, ctx.matchKey, ctx.matchValue, ctx.priority, actions)
return &flowChange{
flow: flow,
changeType: insertion,
Expand All @@ -326,7 +326,7 @@ func (ctx *conjMatchFlowContext) createOrUpdateConjunctiveMatchFlow(actions []*c

// deleteAction deletes the specified policyRuleConjunction from conjunctiveMatchFlow's actions, and then returns the
// flowChange.
func (ctx *conjMatchFlowContext) deleteAction(conjID uint32, priority *uint16) *flowChange {
func (ctx *conjMatchFlowContext) deleteAction(conjID uint32) *flowChange {
// If the specified conjunctive action is the last one, delete the conjunctive match flow entry from the OVS bridge.
// No need to check if the conjunction ID of the only conjunctive action is the specified ID or not, as it
// has been checked in the caller.
Expand All @@ -343,12 +343,12 @@ func (ctx *conjMatchFlowContext) deleteAction(conjID uint32, priority *uint16) *
actions = append(actions, act)
}
}
return ctx.createOrUpdateConjunctiveMatchFlow(actions, priority)
return ctx.createOrUpdateConjunctiveMatchFlow(actions)
}
}

// addAction adds the specified policyRuleConjunction into conjunctiveMatchFlow's actions, and then returns the flowChange.
func (ctx *conjMatchFlowContext) addAction(action *conjunctiveAction, priority *uint16) *flowChange {
func (ctx *conjMatchFlowContext) addAction(action *conjunctiveAction) *flowChange {
// Check if the conjunction exists in conjMatchFlowContext actions or not. If yes, return nil immediately.
_, found := ctx.actions[action.conjID]
if found {
Expand All @@ -360,7 +360,7 @@ func (ctx *conjMatchFlowContext) addAction(action *conjunctiveAction, priority *
for _, act := range ctx.actions {
actions = append(actions, act)
}
return ctx.createOrUpdateConjunctiveMatchFlow(actions, priority)
return ctx.createOrUpdateConjunctiveMatchFlow(actions)
}

func (ctx *conjMatchFlowContext) addDenyAllRule(ruleID uint32) {
Expand Down Expand Up @@ -567,7 +567,7 @@ func (c *clause) addConjunctiveMatchFlow(client *client, match *conjunctiveMatch
}
if c.action.nClause > 1 {
// Append the conjunction to conjunctiveFlowContext's actions, and add the changed flow into the conjMatchFlowContextChange.
flowChange := context.addAction(c.action, match.priority)
flowChange := context.addAction(c.action)
if flowChange != nil {
ctxChanges.matchFlow = flowChange
ctxChanges.actChange.action = c.action
Expand All @@ -583,11 +583,11 @@ func (c *clause) addConjunctiveMatchFlow(client *client, match *conjunctiveMatch
return ctxChanges
}

func (c *clause) generateAddressConjMatch(addr types.Address, addrType types.AddressType, priority *uint16) *conjunctiveMatch {
func generateAddressConjMatch(ruleTableID binding.TableIDType, addr types.Address, addrType types.AddressType, priority *uint16) *conjunctiveMatch {
matchKey := addr.GetMatchKey(addrType)
matchValue := addr.GetValue()
match := &conjunctiveMatch{
tableID: c.ruleTable.GetID(),
tableID: ruleTableID,
matchKey: matchKey,
matchValue: matchValue,
priority: priority,
Expand Down Expand Up @@ -625,15 +625,15 @@ func getServiceMatchType(protocol *v1beta2.Protocol, ipv4Enabled, ipv6Enabled bo
return matchKeys
}

func (c *clause) generateServicePortConjMatches(service v1beta2.Service, priority *uint16, ipv4Enabled, ipv6Enabled bool) []*conjunctiveMatch {
func generateServicePortConjMatches(ruleTableID binding.TableIDType, service v1beta2.Service, priority *uint16, ipv4Enabled, ipv6Enabled bool) []*conjunctiveMatch {
matchKeys := getServiceMatchType(service.Protocol, ipv4Enabled, ipv6Enabled)
ovsBitRanges := c.serviceToBitRanges(service)
ovsBitRanges := serviceToBitRanges(service)
var matches []*conjunctiveMatch
for _, matchKey := range matchKeys {
for _, ovsBitRange := range ovsBitRanges {
matches = append(matches,
&conjunctiveMatch{
tableID: c.ruleTable.GetID(),
tableID: ruleTableID,
matchKey: matchKey,
matchValue: ovsBitRange,
priority: priority,
Expand All @@ -644,7 +644,7 @@ func (c *clause) generateServicePortConjMatches(service v1beta2.Service, priorit
}

// serviceToBitRanges converts a Service to a list of BitRange.
func (c *clause) serviceToBitRanges(service v1beta2.Service) []types.BitRange {
func serviceToBitRanges(service v1beta2.Service) []types.BitRange {
var ovsBitRanges []types.BitRange
// If `EndPort` is equal to `Port`, then treat it as single port case.
if service.EndPort != nil && *service.EndPort > service.Port.IntVal {
Expand Down Expand Up @@ -683,7 +683,7 @@ func (c *clause) addAddrFlows(client *client, addrType types.AddressType, addres
var conjMatchFlowContextChanges []*conjMatchFlowContextChange
// Calculate Openflow changes for the added addresses.
for _, addr := range addresses {
match := c.generateAddressConjMatch(addr, addrType, priority)
match := generateAddressConjMatch(c.ruleTable.GetID(), addr, addrType, priority)
ctxChange := c.addConjunctiveMatchFlow(client, match)
if ctxChange != nil {
conjMatchFlowContextChanges = append(conjMatchFlowContextChanges, ctxChange)
Expand All @@ -697,7 +697,7 @@ func (c *clause) addAddrFlows(client *client, addrType types.AddressType, addres
func (c *clause) addServiceFlows(client *client, ports []v1beta2.Service, priority *uint16) []*conjMatchFlowContextChange {
var conjMatchFlowContextChanges []*conjMatchFlowContextChange
for _, port := range ports {
matches := c.generateServicePortConjMatches(port, priority, client.IsIPv4Enabled(), client.IsIPv6Enabled())
matches := generateServicePortConjMatches(c.ruleTable.GetID(), port, priority, client.IsIPv4Enabled(), client.IsIPv6Enabled())
for _, match := range matches {
ctxChange := c.addConjunctiveMatchFlow(client, match)
conjMatchFlowContextChanges = append(conjMatchFlowContextChanges, ctxChange)
Expand Down Expand Up @@ -729,7 +729,7 @@ func (c *clause) deleteConjunctiveMatchFlow(flowContextKey string) *conjMatchFlo
// Delete the conjunctive action if it is in context actions.
action, found := context.actions[conjID]
if found {
ctxChange.matchFlow = context.deleteAction(conjID, ctxChange.context.priority)
ctxChange.matchFlow = context.deleteAction(conjID)
ctxChange.actChange.action = action
expectedConjunctiveActions--
}
Expand Down Expand Up @@ -763,7 +763,7 @@ func (c *clause) deleteConjunctiveMatchFlow(flowContextKey string) *conjMatchFlo
func (c *clause) deleteAddrFlows(addrType types.AddressType, addresses []types.Address, priority *uint16) []*conjMatchFlowContextChange {
var ctxChanges []*conjMatchFlowContextChange
for _, addr := range addresses {
match := c.generateAddressConjMatch(addr, addrType, priority)
match := generateAddressConjMatch(c.ruleTable.GetID(), addr, addrType, priority)
contextKey := match.generateGlobalMapKey()
ctxChange := c.deleteConjunctiveMatchFlow(contextKey)
if ctxChange != nil {
Expand Down Expand Up @@ -826,7 +826,7 @@ func (c *client) InstallPolicyRuleFlows(rule *types.PolicyRule) error {

c.conjMatchFlowLock.Lock()
defer c.conjMatchFlowLock.Unlock()
ctxChanges := c.calculateMatchFlowChangesForRule(conj, rule, false)
ctxChanges := c.calculateMatchFlowChangesForRule(conj, rule)

if err := c.ofEntryOperations.AddAll(conj.metricFlows); err != nil {
return err
Expand Down Expand Up @@ -884,49 +884,120 @@ func (c *client) calculateActionFlowChangesForRule(rule *types.PolicyRule) *poli
}

// calculateMatchFlowChangesForRule calculates the contextChanges for the policyRule, and updates the context status in case of batch install.
func (c *client) calculateMatchFlowChangesForRule(conj *policyRuleConjunction, rule *types.PolicyRule, isBatchInstall bool) []*conjMatchFlowContextChange {
func (c *client) calculateMatchFlowChangesForRule(conj *policyRuleConjunction, rule *types.PolicyRule) []*conjMatchFlowContextChange {
// Calculate the conjMatchFlowContext changes. The changed Openflow entries are included in the conjMatchFlowContext change.
ctxChanges := conj.calculateChangesForRuleCreation(c, rule)
// Update conjunctiveMatchContext if during batch flow install, otherwise the subsequent contextChange
// calculations will not be based on the previous flowChanges that have not been sent to OVS bridge.
if isBatchInstall {
for _, ctxChange := range ctxChanges {
ctxChange.updateContextStatus()
return ctxChanges
}

// addRuleToConjunctiveMatch adds a rule's clauses to corresponding conjunctive match contexts.
// Unlike calculateMatchFlowChangesForRule, it updates the context status directly and doesn't calculate flow changes.
// It's used in initial batch install where we first add all rules then calculates flows change based on final state.
func (c *client) addRuleToConjunctiveMatch(conj *policyRuleConjunction, rule *types.PolicyRule) {
if conj.fromClause != nil {
for _, addr := range rule.From {
match := generateAddressConjMatch(conj.fromClause.ruleTable.GetID(), addr, types.SrcAddress, rule.Priority)
c.addActionToConjunctiveMatch(conj.fromClause, match)
}
}
if conj.toClause != nil {
for _, addr := range rule.To {
match := generateAddressConjMatch(conj.toClause.ruleTable.GetID(), addr, types.DstAddress, rule.Priority)
c.addActionToConjunctiveMatch(conj.toClause, match)
}
}
if conj.serviceClause != nil {
for _, port := range rule.Service {
matches := generateServicePortConjMatches(conj.serviceClause.ruleTable.GetID(), port, rule.Priority, c.IsIPv4Enabled(), c.IsIPv6Enabled())
for _, match := range matches {
c.addActionToConjunctiveMatch(conj.serviceClause, match)
}
}
}
return ctxChanges
}

// addActionToConjunctiveMatch adds a clause to corresponding conjunctive match context.
// It updates the context status directly and doesn't calculate the match flow, which is supposed to be calculated after
// all actions are added. It's used in initial batch install only.
func (c *client) addActionToConjunctiveMatch(clause *clause, match *conjunctiveMatch) {
matcherKey := match.generateGlobalMapKey()
_, found := clause.matches[matcherKey]
if found {
klog.V(2).InfoS("Conjunctive match flow is already added for rule", "matcherKey", matcherKey, "ruleID", clause.action.conjID)
return
}

var context *conjMatchFlowContext
// Get conjMatchFlowContext from globalConjMatchFlowCache. If it doesn't exist, create a new one and add into the cache.
context, found = c.globalConjMatchFlowCache[matcherKey]
if !found {
context = &conjMatchFlowContext{
conjunctiveMatch: match,
actions: make(map[uint32]*conjunctiveAction),
client: c,
}
// Generate the default drop flow if dropTable is not nil.
if clause.dropTable != nil {
context.dropFlow = context.client.defaultDropFlow(clause.dropTable.GetID(), match.matchKey, match.matchValue)
}
c.globalConjMatchFlowCache[matcherKey] = context
}
clause.matches[matcherKey] = context

if clause.action.nClause > 1 {
// Add the conjunction to the conjunctiveFlowContext's actions.
context.actions[clause.action.conjID] = clause.action
} else {
// Add the conjunction ID to the conjunctiveFlowContext's denyAllRules.
context.addDenyAllRule(clause.action.conjID)
}
}

// BatchInstallPolicyRuleFlows installs flows for NetworkPolicy rules in case of agent restart. It calculates and
// accumulates all Openflow entry updates required and installs all of them on OVS bridge in one bundle.
// It resets the global conjunctive match flow cache upon failure, and should NOT be used after any rule is installed
// via the InstallPolicyRuleFlows method. Otherwise the cache would be out of sync.
func (c *client) BatchInstallPolicyRuleFlows(ofPolicyRules []*types.PolicyRule) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

var allCtxChanges []*conjMatchFlowContextChange
var allFlows []binding.Flow
var updatedConjunctions []*policyRuleConjunction
var conjunctions []*policyRuleConjunction

for _, rule := range ofPolicyRules {
conj := c.calculateActionFlowChangesForRule(rule)
ctxChanges := c.calculateMatchFlowChangesForRule(conj, rule, true)
c.addRuleToConjunctiveMatch(conj, rule)
allFlows = append(allFlows, conj.actionFlows...)
allFlows = append(allFlows, conj.metricFlows...)
allCtxChanges = append(allCtxChanges, ctxChanges...)
updatedConjunctions = append(updatedConjunctions, conj)
conjunctions = append(conjunctions, conj)
}

for _, ctx := range c.globalConjMatchFlowCache {
// In theory there must be at least one action but InstallPolicyRuleFlows currently handles the 1 clause case
// and we do the same in addRuleToConjunctiveMatch. The check is added only for consistency. Later we should
// return error if clients install a rule with only 1 clause, and should remove the extra code for processing it.
if len(ctx.actions) > 0 {
actions := make([]*conjunctiveAction, 0, len(ctx.actions))
for _, action := range ctx.actions {
actions = append(actions, action)
}
ctx.flow = c.conjunctiveMatchFlow(ctx.tableID, ctx.matchKey, ctx.matchValue, ctx.priority, actions)
allFlows = append(allFlows, ctx.flow)
}
if ctx.dropFlow != nil {
allFlows = append(allFlows, ctx.dropFlow)
}
}

// Send the changed Openflow entries to the OVS bridge.
if err := c.sendConjunctiveFlows(allCtxChanges, allFlows); err != nil {
if err := c.ofEntryOperations.AddAll(allFlows); err != nil {
// Reset the global conjunctive match flow cache since the OpenFlow bundle, which contains
// all the match flows to be installed, was not applied successfully.
// Conjunction contexts need to be updated during flow change calculations for batch install,
// otherwise subsequent contextChanges would not be correct as they would not take into account
// the previous flowChanges that have not yet have been realized in the OVS bridge.
c.globalConjMatchFlowCache = map[string]*conjMatchFlowContext{}
return err
}
// Update conjMatchFlowContexts as the expected status.
for _, conj := range updatedConjunctions {
for _, conj := range conjunctions {
// Add the policyRuleConjunction into policyCache
c.policyCache.Add(conj)
}
Expand Down
Loading

0 comments on commit ae98a19

Please sign in to comment.