Skip to content

Commit

Permalink
Improve unit test of flowexporter package (antrea-io#4182)
Browse files Browse the repository at this point in the history
Add more test cases for pkg/agent/flowexporter package and
its subpackages: connections, exporter, priorityqueue.

For antrea-io#4142

Signed-off-by: heanlan <hanlan@vmware.com>
  • Loading branch information
heanlan committed Mar 29, 2023
1 parent c175dc0 commit e6c04f9
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 15 deletions.
21 changes: 21 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,24 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) {
antreaFlow = NetlinkFlowToAntreaConnection(netlinkFlow)
assert.Equalf(t, expectedAntreaFlow, antreaFlow, "both flows should be equal")
}

func TestStateToString(t *testing.T) {
for _, tc := range []struct {
state uint8
expectedResult string
}{
{0, "NONE"},
{1, "SYN_SENT"},
{2, "SYN_RECV"},
{3, "ESTABLISHED"},
{4, "FIN_WAIT"},
{5, "CLOSE_WAIT"},
{6, "LAST_ACK"},
{7, "TIME_WAIT"},
{8, "CLOSE"},
{9, "SYN_SENT2"},
} {
result := stateToString(tc.state)
assert.Equal(t, tc.expectedResult, result)
}
}
93 changes: 78 additions & 15 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,23 +266,44 @@ func TestFlowExporter_initFlowExporter(t *testing.T) {
metrics.InitializeConnectionMetrics()
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("error when resolving UDP address: %v", err)
t.Fatalf("Error when resolving UDP address: %v", err)
}
conn, err := net.ListenUDP("udp", udpAddr)
conn1, err := net.ListenUDP("udp", udpAddr)
if err != nil {
t.Fatalf("error when creating a local server: %v", err)
}
defer conn.Close()
exp := &FlowExporter{
process: nil,
exporterInput: exporter.ExporterInput{
CollectorProtocol: conn.LocalAddr().Network(),
CollectorAddress: conn.LocalAddr().String(),
},
t.Fatalf("Error when creating a local server: %v", err)
}
defer conn1.Close()
tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Error when resolving TCP address: %v", err)
}
conn2, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
t.Fatalf("Error when creating a local server: %v", err)
}
defer conn2.Close()

for _, tc := range []struct {
protocol string
address string
expectedTempRefTimeout uint32
}{
{conn1.LocalAddr().Network(), conn1.LocalAddr().String(), uint32(1800)},
{conn2.Addr().Network(), conn2.Addr().String(), uint32(0)},
} {
exp := &FlowExporter{
process: nil,
exporterInput: exporter.ExporterInput{
CollectorProtocol: tc.protocol,
CollectorAddress: tc.address,
},
}
err = exp.initFlowExporter()
assert.NoError(t, err)
assert.Equal(t, tc.expectedTempRefTimeout, exp.exporterInput.TempRefTimeout)
checkTotalReconnectionsMetric(t)
metrics.ReconnectionsToFlowCollector.Dec()
}
err = exp.initFlowExporter()
assert.NoError(t, err)
checkTotalReconnectionsMetric(t)
}

func checkTotalReconnectionsMetric(t *testing.T) {
Expand Down Expand Up @@ -422,7 +443,8 @@ func testSendFlowRecords(t *testing.T, v4Enabled bool, v6Enabled bool) {
elementsListv6: elemListv6,
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
v4Enabled: true}
v4Enabled: v4Enabled,
v6Enabled: v6Enabled}

if v4Enabled {
runSendFlowRecordTests(t, flowExp, false)
Expand Down Expand Up @@ -651,3 +673,44 @@ func createElement(name string, enterpriseID uint32) ipfixentities.InfoElementWi
ieWithValue, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
return ieWithValue
}

func TestFlowExporter_prepareExporterInputArgs(t *testing.T) {
for _, tc := range []struct {
collectorAddr string
collectorProto string
nodeName string
expectedObservationDomainID uint32
expectedIsEncrypted bool
expectedProto string
}{
{"10.10.0.79:4739", "tls", "kind-worker", 801257890, true, "tcp"},
{"10.10.0.80:4739", "tcp", "kind-worker", 801257890, false, "tcp"},
{"10.10.0.81:4739", "udp", "kind-worker", 801257890, false, "udp"},
} {
expInput := prepareExporterInputArgs(tc.collectorAddr, tc.collectorProto, tc.nodeName)
assert.Equal(t, tc.collectorAddr, expInput.CollectorAddress)
assert.Equal(t, tc.expectedObservationDomainID, expInput.ObservationDomainID)
assert.Equal(t, tc.expectedIsEncrypted, expInput.IsEncrypted)
assert.Equal(t, tc.expectedProto, expInput.CollectorProtocol)
}
}

func TestFlowExporter_findFlowType(t *testing.T) {
conn1 := flowexporter.Connection{SourcePodName: "podA", DestinationPodName: "podB"}
conn2 := flowexporter.Connection{SourcePodName: "podA", DestinationPodName: ""}
for _, tc := range []struct {
isNetworkPolicyOnly bool
conn flowexporter.Connection
expectedFlowType uint8
}{
{true, conn1, 1},
{true, conn2, 2},
{false, conn1, 0},
} {
flowExp := &FlowExporter{
isNetworkPolicyOnly: tc.isNetworkPolicyOnly,
}
flowType := flowExp.findFlowType(tc.conn)
assert.Equal(t, tc.expectedFlowType, flowType)
}
}
70 changes: 70 additions & 0 deletions pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,73 @@ func TestExpirePriorityQueue(t *testing.T) {
}
}
}

func TestExpirePriorityQueue_GetExpiryFromExpirePriorityQueue(t *testing.T) {
startTime := time.Now()
item1 := &flowexporter.ItemToExpire{
ActiveExpireTime: startTime.Add(10 * time.Second),
IdleExpireTime: startTime.Add(20 * time.Second),
Index: 0,
}
item2 := &flowexporter.ItemToExpire{
ActiveExpireTime: startTime.Add(-10 * time.Second),
IdleExpireTime: startTime,
Index: 0,
}

for _, tc := range []struct {
pqActiveTimeout time.Duration
pqIdleTimeout time.Duration
pqItem *flowexporter.ItemToExpire
expectedResult time.Duration
}{
{1 * time.Second, 1 * time.Second, item1, minExpiryTime + 10*time.Second}, // should return expiryDuration
{1 * time.Second, 1 * time.Second, item2, minExpiryTime}, // should return minExpiryTime
{1 * time.Second, 2 * time.Second, nil, 1 * time.Second}, // should return activeFlowTimeout
{1 * time.Second, 500 * time.Millisecond, nil, 500 * time.Millisecond}, // should return idleFlowTimeout
} {
pq := NewExpirePriorityQueue(tc.pqActiveTimeout, tc.pqIdleTimeout)
if tc.pqItem != nil {
heap.Push(pq, tc.pqItem)
}
result := pq.GetExpiryFromExpirePriorityQueue()
// We are unable to get the real currTime value in while executing
// GetExpiryFromExpirePriorityQueue, but it should be greater than startTime.
// Therefore, minExpiryTime + startTime.Add(10 * time.Second).Sub(currTime)
// should be less than minExpiryTime + 10 * time.Second
if tc.pqItem == item1 {
assert.GreaterOrEqual(t, tc.expectedResult, result)
assert.NotEqual(t, minExpiryTime, result)
assert.NotEqual(t, tc.pqActiveTimeout, result)
assert.NotEqual(t, tc.pqIdleTimeout, result)
} else {
assert.Equal(t, tc.expectedResult, result)
}

}
}

func TestExpirePriorityQueue_GetTopExpiredItem(t *testing.T) {
startTime := time.Now()
item := &flowexporter.ItemToExpire{
ActiveExpireTime: startTime.Add(10 * time.Second),
IdleExpireTime: startTime.Add(20 * time.Second),
Index: 0,
}
for _, tc := range []struct {
currTime time.Time
topItem *flowexporter.ItemToExpire
expectedResult *flowexporter.ItemToExpire
}{
{startTime, nil, nil}, // topItem is nil
{startTime, item, nil}, // topItem has not expired
{startTime.Add(15 * time.Second), item, item}, // topItem has expired
} {
pq := NewExpirePriorityQueue(1*time.Second, 1*time.Second)
if tc.topItem != nil {
heap.Push(pq, tc.topItem)
}
result := pq.GetTopExpiredItem(tc.currTime)
assert.Equal(t, tc.expectedResult, result)
}
}
98 changes: 98 additions & 0 deletions pkg/agent/flowexporter/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the “License”);
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an “AS IS” BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flowexporter

import (
"testing"

"github.com/stretchr/testify/assert"

"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
)

func TestIsConnectionDying(t *testing.T) {
for _, tc := range []struct {
tcpState string
statusFlag uint32
isPresent bool
expectedResult bool
}{
{"ESTABLISHED", 256, true, false},
{"TIME_WAIT", 256, true, true},
{"CLOSE", 256, true, true},
{"", 512, true, true},
{"ESTABLISHED", 256, false, true},
} {
conn := &Connection{
TCPState: tc.tcpState,
StatusFlag: tc.statusFlag,
IsPresent: tc.isPresent,
}
result := IsConnectionDying(conn)
assert.Equal(t, tc.expectedResult, result)
}
}

func TestConntrackConnActive(t *testing.T) {
for _, tc := range []struct {
originalPackets, prevPackets, reversePackets, prevReversePackets uint64
tcpState, prevTCPState string
expectedResult bool
}{
{1, 0, 0, 0, "ESTABLISHED", "ESTABLISHED", true},
{0, 0, 1, 0, "ESTABLISHED", "ESTABLISHED", true},
{0, 0, 0, 0, "TIME_WAIT", "ESTABLISHED", true},
{0, 0, 0, 0, "ESTABLISHED", "ESTABLISHED", false},
} {
conn := &Connection{
OriginalPackets: tc.originalPackets,
PrevPackets: tc.prevPackets,
ReversePackets: tc.reversePackets,
PrevReversePackets: tc.prevReversePackets,
TCPState: tc.tcpState,
PrevTCPState: tc.prevTCPState,
}
result := CheckConntrackConnActive(conn)
assert.Equal(t, tc.expectedResult, result)
}
}

func TestRuleActionToUint8(t *testing.T) {
for _, tc := range []struct {
action string
expectedResult uint8
}{
{"Allow", 1},
{"Drop", 2},
{"Reject", 3},
{"", 0},
} {
result := RuleActionToUint8(tc.action)
assert.Equal(t, tc.expectedResult, result)
}
}

func TestPolicyTypeToUint8(t *testing.T) {
for _, tc := range []struct {
policyType v1beta2.NetworkPolicyType
expectedResult uint8
}{
{v1beta2.K8sNetworkPolicy, 1},
{v1beta2.AntreaNetworkPolicy, 2},
{v1beta2.AntreaClusterNetworkPolicy, 3},
} {
result := PolicyTypeToUint8(tc.policyType)
assert.Equal(t, tc.expectedResult, result)
}
}

0 comments on commit e6c04f9

Please sign in to comment.