Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network policy info to IPFIX flow records as part of Flow Exporter #1268

Merged
merged 1 commit into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,19 @@ func run(o *Options) error {
// notifying NetworkPolicyController to reconcile rules related to the
// updated Pods.
podUpdates := make(chan v1beta2.PodReference, 100)
// We set flow poll interval as the time interval for rule deletion in the async
// rule cache, which is implemented as part of the idAllocator. This is to preserve
// the rule info for populating NetworkPolicy fields in the Flow Exporter even
// after rule deletion.
asyncRuleDeleteInterval := o.pollInterval
networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
ofClient,
ifaceStore,
nodeConfig.Name,
podUpdates,
features.DefaultFeatureGate.Enabled(features.AntreaPolicy))
features.DefaultFeatureGate.Enabled(features.AntreaPolicy),
asyncRuleDeleteInterval)
if err != nil {
return fmt.Errorf("error creating new NetworkPolicy controller: %v", err)
}
Expand Down Expand Up @@ -296,6 +302,7 @@ func run(o *Options) error {
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, o.config.OVSDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
ifaceStore,
proxier,
networkPolicyController,
o.pollInterval)
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)
Expand Down
17 changes: 13 additions & 4 deletions pkg/agent/controller/networkpolicy/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
asyncDeleteInterval = time.Second * 5
minAsyncDeleteInterval = time.Second * 5
)

// idAllocator provides interfaces to allocate and release uint32 IDs. It's thread-safe.
Expand All @@ -54,6 +54,8 @@ type idAllocator struct {
// deleteQueue is used to place a rule ID after a given delay for deleting the
// the rule in the asyncRuleCache.
deleteQueue workqueue.DelayingInterface
// deleteInterval is the delay interval for deleting the rule in the asyncRuleCache.
deleteInterval time.Duration
}

// asyncRuleCacheKeyFunc knows how to get key of a *rule.
Expand All @@ -64,13 +66,20 @@ func asyncRuleCacheKeyFunc(obj interface{}) (string, error) {

// newIDAllocator returns a new *idAllocator.
// It takes a list of allocated IDs, which can be used for the restart case.
func newIDAllocator(allocatedIDs ...uint32) *idAllocator {
func newIDAllocator(asyncRuleDeleteInterval time.Duration, allocatedIDs ...uint32) *idAllocator {
allocator := &idAllocator{
availableSet: make(map[uint32]struct{}),
asyncRuleCache: cache.NewStore(asyncRuleCacheKeyFunc),
deleteQueue: workqueue.NewNamedDelayingQueue("async_delete_networkpolicyrule"),
}

// Set the deleteInterval.
if minAsyncDeleteInterval > asyncRuleDeleteInterval {
allocator.deleteInterval = minAsyncDeleteInterval
} else {
allocator.deleteInterval = asyncRuleDeleteInterval
}

var maxID uint32
allocatedSet := make(map[uint32]struct{}, len(allocatedIDs))
for _, id := range allocatedIDs {
Expand Down Expand Up @@ -120,8 +129,8 @@ func (a *idAllocator) allocateForRule(rule *types.PolicyRule) error {
}

// forgetRule adds the rule to the async delete queue with a given delay.
func (a *idAllocator) forgetRule(ruleID uint32, deleteAfter time.Duration) {
a.deleteQueue.AddAfter(ruleID, deleteAfter)
func (a *idAllocator) forgetRule(ruleID uint32) {
a.deleteQueue.AddAfter(ruleID, a.deleteInterval)
}

func (a *idAllocator) getRuleFromAsyncCache(ruleID uint32) (*types.PolicyRule, bool, error) {
Expand Down
43 changes: 31 additions & 12 deletions pkg/agent/controller/networkpolicy/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
)

var (
testDeleteInterval = 5 * time.Millisecond
)

func TestNewIDAllocator(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -59,7 +63,7 @@ func TestNewIDAllocator(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newIDAllocator(tt.args...)
got := newIDAllocator(testDeleteInterval, tt.args...)
assert.Equalf(t, tt.expectedLastAllocatedID, got.lastAllocatedID, "Got lastAllocatedID %v, expected %v", got.lastAllocatedID, tt.expectedLastAllocatedID)
assert.Equalf(t, tt.expectedAvailableSets, got.availableSet, "Got availableSet %v, expected %v", got.availableSet, tt.expectedAvailableSets)
assert.Equalf(t, tt.expectedAvailableSlice, got.availableSlice, "Got availableSlice %v, expected %v", got.availableSlice, tt.expectedAvailableSlice)
Expand Down Expand Up @@ -105,7 +109,7 @@ func TestAllocateForRule(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newIDAllocator(tt.args...)
a := newIDAllocator(testDeleteInterval, tt.args...)
actualErr := a.allocateForRule(tt.rule)
if actualErr != tt.expectedErr {
t.Fatalf("Got error %v, expected %v", actualErr, tt.expectedErr)
Expand Down Expand Up @@ -155,7 +159,7 @@ func TestRelease(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newIDAllocator(tt.newArgs...)
a := newIDAllocator(testDeleteInterval, tt.newArgs...)
actualErr := a.release(tt.releaseArgs)
assert.Equalf(t, tt.expectedErr, actualErr, "Got error %v, expected %v", actualErr, tt.expectedErr)
assert.Equalf(t, tt.expectedAvailableSets, a.availableSet, "Got availableSet %v, expected %v", a.availableSet, tt.expectedAvailableSets)
Expand All @@ -172,23 +176,34 @@ func TestWorker(t *testing.T) {
Service: nil,
}
tests := []struct {
name string
args []uint32
rule *types.PolicyRule
expectedID uint32
expectedErr error
name string
args []uint32
minDeleteInterval time.Duration
rule *types.PolicyRule
expectedID uint32
expectedErr error
}{
{
"delete-rule-from-async-rule-cache",
"delete-rule-with-async-delete-interval",
nil,
5 * time.Millisecond,
rule,
1,
nil,
},
{
"delete-rule-with-flow-poll-interval",
nil,
1 * time.Millisecond,
rule,
1,
nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newIDAllocator(tt.args...)
minAsyncDeleteInterval = tt.minDeleteInterval
a := newIDAllocator(testDeleteInterval, tt.args...)
actualErr := a.allocateForRule(tt.rule)
if actualErr != tt.expectedErr {
t.Fatalf("Got error %v, expected %v", actualErr, tt.expectedErr)
Expand All @@ -197,7 +212,8 @@ func TestWorker(t *testing.T) {
defer close(stopCh)
go wait.Until(a.worker, time.Millisecond, stopCh)

a.forgetRule(tt.rule.FlowID, 5*time.Millisecond)
start := time.Now()
a.forgetRule(tt.rule.FlowID)
conditionFunc := func() (bool, error) {
a.Lock()
defer a.Unlock()
Expand All @@ -206,12 +222,15 @@ func TestWorker(t *testing.T) {
}
return false, nil
}
if err := wait.Poll(time.Millisecond, time.Millisecond*10, conditionFunc); err != nil {
if err := wait.PollImmediate(time.Millisecond, time.Millisecond*10, conditionFunc); err != nil {
t.Fatalf("Expect the rule with id %v to be deleted from async rule cache", tt.expectedID)
}
_, exists, err := a.getRuleFromAsyncCache(tt.expectedID)
assert.Falsef(t, exists, "Expect rule to be not present in asyncRuleCache")
assert.NoErrorf(t, err, "getRuleFromAsyncCache should not return any error")

elapsedTime := time.Since(start)
assert.GreaterOrEqualf(t, int64(elapsedTime)/int64(time.Millisecond), int64(5), "rule should be there for at least 5ms")
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
ifaceStore interfacestore.InterfaceStore,
nodeName string,
podUpdates <-chan v1beta2.PodReference,
antreaPolicyEnabled bool) (*Controller, error) {
antreaPolicyEnabled bool,
asyncRuleDeleteInterval time.Duration) (*Controller, error) {
c := &Controller{
antreaClientProvider: antreaClientGetter,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "networkpolicyrule"),
reconciler: newReconciler(ofClient, ifaceStore),
reconciler: newReconciler(ofClient, ifaceStore, asyncRuleDeleteInterval),
ofClient: ofClient,
antreaPolicyEnabled: antreaPolicyEnabled,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (g *antreaClientGetter) GetAntreaClient() (versioned.Interface, error) {
func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
clientset := &fake.Clientset{}
ch := make(chan v1beta2.PodReference, 100)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, true)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, true, testDeleteInterval)
reconciler := newMockReconciler()
controller.reconciler = reconciler
return controller, clientset, reconciler
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ type reconciler struct {
}

// newReconciler returns a new *reconciler.
func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore) *reconciler {
func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, asyncRuleDeleteInterval time.Duration) *reconciler {
priorityAssigners := map[binding.TableIDType]*tablePriorityAssigner{}
for _, table := range openflow.GetAntreaPolicyBaselineTierTables() {
priorityAssigners[table] = &tablePriorityAssigner{
Expand All @@ -209,7 +209,7 @@ func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.Interface
ofClient: ofClient,
ifaceStore: ifaceStore,
lastRealizeds: sync.Map{},
idAllocator: newIDAllocator(),
idAllocator: newIDAllocator(asyncRuleDeleteInterval),
priorityAssigners: priorityAssigners,
}
// Check if ofClient is nil or not to be compatible with unit tests.
Expand Down Expand Up @@ -513,7 +513,7 @@ func (r *reconciler) batchAdd(rules []*CompletedRule, ofPriorities []*uint16) er
}
if err := r.ofClient.BatchInstallPolicyRuleFlows(allOFRules); err != nil {
for _, rule := range allOFRules {
r.idAllocator.forgetRule(rule.FlowID, asyncDeleteInterval)
r.idAllocator.forgetRule(rule.FlowID)
}
return err
}
Expand Down Expand Up @@ -648,7 +648,7 @@ func (r *reconciler) installOFRule(ofRule *types.PolicyRule) error {
klog.V(2).Infof("Installing ofRule %d (Direction: %v, From: %d, To: %d, Service: %d)",
ofRule.FlowID, ofRule.Direction, len(ofRule.From), len(ofRule.To), len(ofRule.Service))
if err := r.ofClient.InstallPolicyRuleFlows(ofRule); err != nil {
r.idAllocator.forgetRule(ofRule.FlowID, asyncDeleteInterval)
r.idAllocator.forgetRule(ofRule.FlowID)
return fmt.Errorf("error installing ofRule %v: %v", ofRule.FlowID, err)
}
return nil
Expand Down Expand Up @@ -700,7 +700,7 @@ func (r *reconciler) uninstallOFRule(ofID uint32, table binding.TableIDType) err
priorityAssigner.assigner.Release(uint16(priorityNum))
}
}
r.idAllocator.forgetRule(ofID, asyncDeleteInterval)
r.idAllocator.forgetRule(ofID)
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/agent/controller/networkpolicy/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestReconcilerForget(t *testing.T) {
mockOFClient.EXPECT().UninstallPolicyRuleFlows(ofID)
}
}
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
for key, value := range tt.lastRealizeds {
r.lastRealizeds.Store(key, value)
}
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestReconcilerReconcile(t *testing.T) {
for i := 0; i < len(tt.expectedOFRules); i++ {
mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any())
}
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
if err := r.Reconcile(tt.args); (err != nil) != tt.wantErr {
t.Fatalf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down Expand Up @@ -627,7 +627,7 @@ func TestReconcilerBatchReconcile(t *testing.T) {
mockOFClient := openflowtest.NewMockClient(controller)
mockOFClient.EXPECT().IsIPv4Enabled().Return(true).AnyTimes()
mockOFClient.EXPECT().IsIPv6Enabled().Return(false).AnyTimes()
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
if tt.numInstalledRules > 0 {
// BatchInstall should skip rules already installed
r.lastRealizeds.Store(tt.args[0].ID, newLastRealized(tt.args[0]))
Expand Down Expand Up @@ -843,7 +843,7 @@ func TestReconcilerUpdate(t *testing.T) {
if len(tt.expectedDeletedTo) > 0 {
mockOFClient.EXPECT().DeletePolicyRuleAddress(gomock.Any(), types.DstAddress, gomock.Eq(tt.expectedDeletedTo), priority)
}
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
if err := r.Reconcile(tt.originalRule); (err != nil) != tt.wantErr {
t.Fatalf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func TestReconcilerReconcileIPv6Only(t *testing.T) {
for i := 0; i < len(tt.expectedOFRules); i++ {
mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any())
}
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
if err := r.Reconcile(tt.args); (err != nil) != tt.wantErr {
t.Fatalf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down Expand Up @@ -1752,7 +1752,7 @@ func TestReconcilerReconcileDualStack(t *testing.T) {
for i := 0; i < len(tt.expectedOFRules); i++ {
mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any())
}
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
if err := r.Reconcile(tt.args); (err != nil) != tt.wantErr {
t.Fatalf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down
Loading