Skip to content

Commit

Permalink
Add asyncRuleCache to idAllocator in Antrea Agent (#1411)
Browse files Browse the repository at this point in the history
* Add asyncRuleCache to idAllocator in Agent

This is for services such as networkPolicy stats, flow exporter,
traceflow etc. to query networkPolicy/networkPolicyRule using
rule flow ID (also tagged as ruleOfID in pkg/agent/openflow).

We delete the rules asynchronously after a delay as the above mentioned
services would need the info of network policy rule even after its
deletion.

We provide the interface to this query in AgentNetworkPolicyInfoQuerier.
  • Loading branch information
srikartati authored Nov 10, 2020
1 parent 71671df commit 85dedc3
Show file tree
Hide file tree
Showing 18 changed files with 310 additions and 123 deletions.
46 changes: 24 additions & 22 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,6 @@ func run(o *Options) error {
features.DefaultFeatureGate.Enabled(features.AntreaProxy),
features.DefaultFeatureGate.Enabled(features.AntreaPolicy))

// statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for
// NetworkPolicy stats.
var statsCollector *stats.Collector
if features.DefaultFeatureGate.Enabled(features.NetworkPolicyStats) {
statsCollector = stats.NewCollector(antreaClientProvider, ofClient)
}

_, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR)
_, encapMode := config.GetTrafficEncapModeFromStr(o.config.TrafficEncapMode)
networkConfig := &config.NetworkConfig{
Expand Down Expand Up @@ -148,21 +141,6 @@ func run(o *Options) error {
networkConfig,
nodeConfig)

var traceflowController *traceflow.Controller
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(
k8sClient,
informerFactory,
crdClient,
traceflowInformer,
ofClient,
ovsBridgeClient,
ifaceStore,
networkConfig,
nodeConfig,
serviceCIDRNet)
}

// podUpdates is a channel for receiving Pod updates from CNIServer and
// notifying NetworkPolicyController to reconcile rules related to the
// updated Pods.
Expand All @@ -177,6 +155,14 @@ func run(o *Options) error {
if err != nil {
return fmt.Errorf("error creating new NetworkPolicy controller: %v", err)
}

// statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for
// NetworkPolicy stats.
var statsCollector *stats.Collector
if features.DefaultFeatureGate.Enabled(features.NetworkPolicyStats) {
statsCollector = stats.NewCollector(antreaClientProvider, ofClient, networkPolicyController)
}

isChaining := false
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
isChaining = true
Expand All @@ -199,6 +185,22 @@ func run(o *Options) error {
return fmt.Errorf("error initializing CNI server: %v", err)
}

var traceflowController *traceflow.Controller
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(
k8sClient,
informerFactory,
crdClient,
traceflowInformer,
ofClient,
networkPolicyController,
ovsBridgeClient,
ifaceStore,
networkConfig,
nodeConfig,
serviceCIDRNet)
}

// TODO: we should call this after installing flows for initial node routes
// and initial NetworkPolicies so that no packets will be mishandled.
if err := agentInitializer.FlowRestoreComplete(); err != nil {
Expand Down
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cheggaaa/pb/v3 v3.0.4
github.com/confluentinc/bincover v0.0.0-20200910210245-839e88185831
github.com/confluentinc/bincover v0.1.0
github.com/containernetworking/cni v0.7.1
github.com/containernetworking/plugins v0.8.2-0.20190724153215-ded2f1757770
github.com/contiv/libOpenflow v0.0.0-20201014051314-c1702744526c
Expand All @@ -41,13 +41,11 @@ require (
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.2.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/grpc v1.26.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.8
Expand Down
16 changes: 6 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWc
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/confluentinc/bincover v0.0.0-20200910210245-839e88185831 h1:ywdQifxYw0VXYZfWtykHW785ueW1PgLrYuSdHA31gk4=
github.com/confluentinc/bincover v0.0.0-20200910210245-839e88185831/go.mod h1:qeI1wx0RxdGTZtrJY0HVlgJ4NqC/X2Z+fHbvy87tgHE=
github.com/confluentinc/bincover v0.1.0 h1:M4Gfj4rCXuUQVe8TqT/VXcAMjLyvN81oDRy79fjSv3o=
github.com/confluentinc/bincover v0.1.0/go.mod h1:qeI1wx0RxdGTZtrJY0HVlgJ4NqC/X2Z+fHbvy87tgHE=
github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f h1:tSNMc+rJDfmYntojat8lljbt1mgKNpTxUZJsSzJ9Y1s=
github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f/go.mod h1:OApqhQ4XNSNC13gXIwDjhOQxjWa/NxkwZXJ1EvqT0ko=
github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw=
Expand Down Expand Up @@ -421,9 +421,8 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 h1:/Tl7pH94bvbAAHBdZJT947M/+gp0+CqQXDtMRC0fseo=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495 h1:I6A9Ag9FpEKOjcKrRNjQkPHawoXIhKyTGfvvjFAiiAk=
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -452,9 +451,8 @@ golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
Expand All @@ -464,9 +462,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -521,9 +518,8 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
101 changes: 93 additions & 8 deletions pkg/agent/controller/networkpolicy/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,19 @@ package networkpolicy
import (
"fmt"
"math"
"strconv"
"sync"
"time"

"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/types"
)

var (
asyncDeleteInterval = time.Second * 5
)

// idAllocator provides interfaces to allocate and release uint32 IDs. It's thread-safe.
Expand All @@ -36,13 +48,27 @@ type idAllocator struct {
availableSet map[uint32]struct{}
// availableSlice maintains the order of release.
availableSlice []uint32
// asyncRuleCache maintains rules in a cache and deletes the rules asynchronously
// after a given delete interval.
asyncRuleCache cache.Store
// deleteQueue is used to place a rule ID after a given delay for deleting the
// the rule in the asyncRuleCache.
deleteQueue workqueue.DelayingInterface
}

// asyncRuleCacheKeyFunc knows how to get key of a *rule.
func asyncRuleCacheKeyFunc(obj interface{}) (string, error) {
rule := obj.(*types.PolicyRule)
return strconv.Itoa(int(rule.FlowID)), nil
}

// newIDAllocator returns a new *idAllocator.
// It takes a list of allocated IDs, which can be used for the restart case.
func newIDAllocator(allocatedIDs ...uint32) *idAllocator {
allocator := &idAllocator{
availableSet: make(map[uint32]struct{}),
availableSet: make(map[uint32]struct{}),
asyncRuleCache: cache.NewStore(asyncRuleCacheKeyFunc),
deleteQueue: workqueue.NewNamedDelayingQueue("async_delete_networkpolicyrule"),
}

var maxID uint32
Expand All @@ -63,24 +89,83 @@ func newIDAllocator(allocatedIDs ...uint32) *idAllocator {
return allocator
}

// allocate allocates an uint32 ID if there's available, otherwise error is returned.
// It will try to reuse IDs that have been released first, then allocate a new ID by
// incrementing the last allocated one.
func (a *idAllocator) allocate() (uint32, error) {
// allocateForRule allocates an uint32 ID for a given rule if it's available, otherwise
// an error is returned. It will try to reuse the IDs that have been released first,
// then allocate a new ID by incrementing the last allocated one.
func (a *idAllocator) allocateForRule(rule *types.PolicyRule) error {
a.Lock()
defer a.Unlock()

if len(a.availableSlice) > 0 {
var id uint32
id, a.availableSlice = a.availableSlice[0], a.availableSlice[1:]
delete(a.availableSet, id)
return id, nil

// Add ID to the rule and the rule to asyncRuleCache.
rule.FlowID = id
a.asyncRuleCache.Add(rule)

return nil
}
if a.lastAllocatedID == math.MaxUint32 {
return 0, fmt.Errorf("no ID available")
return fmt.Errorf("no ID available")
}
a.lastAllocatedID++
return a.lastAllocatedID, nil

// Add ID to the rule and the rule to asyncRuleCache.
rule.FlowID = a.lastAllocatedID
a.asyncRuleCache.Add(rule)

return nil
}

// forgetRule adds the rule to the async delete queue with a given delay.
func (a *idAllocator) forgetRule(ruleID uint32, deleteAfter time.Duration) {
a.deleteQueue.AddAfter(ruleID, deleteAfter)
}

func (a *idAllocator) getRuleFromAsyncCache(ruleID uint32) (*types.PolicyRule, bool, error) {
rule, exists, err := a.asyncRuleCache.GetByKey(strconv.Itoa(int(ruleID)))
if err != nil || !exists {
return nil, exists, err
}
return rule.(*types.PolicyRule), exists, nil
}

// worker runs a worker thread that just dequeues item from deleteQueue,
// deletes them from the asyncRuleCache, and releases the associated ID.
func (a *idAllocator) worker() {
for a.processDeleteQueueItem() {
}
}

func (a *idAllocator) processDeleteQueueItem() bool {
key, quit := a.deleteQueue.Get()
if quit {
return false
}
defer a.deleteQueue.Done(key)

rule, exists, err := a.getRuleFromAsyncCache(key.(uint32))
if !exists {
klog.Warningf("Rule with id %v is not present in the async rule cache", key.(uint32))
return true
}
if err != nil {
klog.Errorf("Unexpected error when trying to get rule with id %d: %v", key.(uint32), err)
return true
}
if err := a.asyncRuleCache.Delete(rule); err != nil {
klog.Errorf("Unexpected error when trying to delete rule: %v", err)
return true
}

if err := a.release(key.(uint32)); err != nil {
klog.Errorf("Unexpected error when releasing id %d: %v", key.(uint32), err)
return true
}

return true
}

// release releases an uint32 ID if it has been allocated before, otherwise error is returned.
Expand Down
Loading

0 comments on commit 85dedc3

Please sign in to comment.