Skip to content

Commit

Permalink
Fix network policy issue in userspace datapath of Flow Exporter
Browse files Browse the repository at this point in the history
In this commit, we add the parsing code of labels field for
dump-conntrack command output in OVS userspace datapath situation.
This will fix the missing network policy information issue in flow
records in OVS userspace datapath, like in Kind cluster.

Signed-off-by: Yongming Ding <dyongming@vmware.com>
  • Loading branch information
dreamtalen committed May 21, 2021
1 parent 335e6bb commit 3306145
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 11 deletions.
3 changes: 3 additions & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
// IDs stored in the connection label.
if len(conn.Labels) != 0 {
klog.V(4).Infof("connection label: %x; label masks: %x", conn.Labels, conn.LabelsMask)
// We always expect labels from conntrack dumper to be added in little-endian format right now
// In kernel datapath, the labels uses the "native" endianness for the system, which are little-endian
// on most of the modern CPUs based on x86 architecture like Intel, AMD, etc.
ingressOfID := binary.LittleEndian.Uint32(conn.Labels[:4])
egressOfID := binary.LittleEndian.Uint32(conn.Labels[4:8])
if ingressOfID != 0 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type connTrackSystem struct {
connTrack NetFilterConnTrack
}

// TODO: detect the endianness of the system when initializing conntrack dumper to handle situations on big-endian platforms.
// All connection labels are required to store in little endian format in conntrack dumper.
func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDRv4 *net.IPNet, serviceCIDRv6 *net.IPNet, isAntreaProxyEnabled bool) *connTrackSystem {
if err := SetupConntrackParameters(); err != nil {
// Do not fail, but continue after logging an error as we can still dump flows with missing information.
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) {
// Set expect call for mock ovsCtlClient
ovsctlCmdOutput := []byte("tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=127.0.0.1,dst=8.7.6.5,sport=45170,dport=2379,packets=80743,bytes=5416239),reply=(src=8.7.6.5,dst=127.0.0.1,sport=2379,dport=45170,packets=63361,bytes=4811261),start=2020-07-24T05:07:01.591,id=462801621,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86397,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,mark=33,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,labels=0x200000001,mark=33,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
outputFlow := strings.Split(string(ovsctlCmdOutput), "\n")
expConn := &flowexporter.Connection{
ID: 982464968,
Expand Down Expand Up @@ -177,6 +177,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) {
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "ESTABLISHED",
Labels: []byte{1, 0, 0, 0, 2, 0, 0, 0},
}
mockOVSCtlClient.EXPECT().RunAppctlCmd("dpctl/dump-conntrack", false, "-m", "-s").Return(ovsctlCmdOutput, nil)

Expand Down
37 changes: 27 additions & 10 deletions pkg/agent/flowexporter/connections/conntrack_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package connections

import (
"encoding/hex"
"fmt"
"net"
"strconv"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe

// flowStringToAntreaConnection parses the flow string and converts to Antrea connection.
// Example of flow string:
// "tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)"
// "tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,labels=0x200000001,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)"
func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter.Connection, error) {
conn := flowexporter.Connection{}
flowSlice := strings.Split(flow, ",")
Expand Down Expand Up @@ -163,7 +164,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of sport %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of sport %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.TupleOrig.SourcePort = uint16(val)
Expand All @@ -176,7 +177,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of dport %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of dport %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.TupleOrig.DestinationPort = uint16(val)
Expand All @@ -187,7 +188,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 64)
if err != nil {
return nil, fmt.Errorf("conversion of packets %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of packets %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.OriginalPackets = uint64(val)
Expand All @@ -199,7 +200,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 64)
if err != nil {
return nil, fmt.Errorf("conversion of bytes %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of bytes %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.OriginalBytes = uint64(val)
Expand All @@ -213,7 +214,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
timeString := fields[len(fields)-1] + "Z"
val, err := time.Parse(time.RFC3339, timeString)
if err != nil {
return nil, fmt.Errorf("parsing start time %s failed", timeString)
return nil, fmt.Errorf("parsing start time %s failed: %v", timeString, err)
}
conn.StartTime = val
// TODO: We didn't find stoptime related field in flow string right now, need to investigate how stoptime is recorded and dumped.
Expand All @@ -224,7 +225,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of zone %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of zone %s to int failed: %v", fields[len(fields)-1], err)
}
if zoneFilter != uint16(val) {
break
Expand All @@ -236,21 +237,37 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of mark '%s' to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of mark '%s' to int failed: %v", fields[len(fields)-1], err)
}
conn.Mark = uint32(val)
case strings.Contains(fs, "timeout"):
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of timeout %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of timeout %s to int failed: %v", fields[len(fields)-1], err)
}
conn.Timeout = uint32(val)
case strings.Contains(fs, "labels"):
fields := strings.Split(fs, "=")
labelStr := strings.Replace(fields[len(fields)-1], "0x", "", -1)
// Add leading zeros since DecodeString() expects the input string has even length
if len(labelStr) < 16 {
labelStr = strings.Repeat("0", 16-len(labelStr)) + labelStr
}
hexval, err := hex.DecodeString(labelStr)
if err != nil {
return nil, fmt.Errorf("conversion of label string %s to []byte failed: %v", labelStr, err)
}
// Reverse the []byte slice to align with kernel side's result which is little endian
for i := 0; i < len(hexval)/2; i++ {
hexval[i], hexval[len(hexval)-i-1] = hexval[len(hexval)-i-1], hexval[i]
}
conn.Labels = hexval
case strings.Contains(fs, "id"):
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of id %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of id %s to int failed: %v", fields[len(fields)-1], err)
}
conn.ID = uint32(val)
case strings.Contains(fs, "protoinfo"):
Expand Down

0 comments on commit 3306145

Please sign in to comment.