Skip to content

Commit

Permalink
Fix network policy issue in userspace datapath of Flow Exporter
Browse files Browse the repository at this point in the history
In this commit, we add the parsing code of labels field for
dump-conntrack command output in OVS userspace datapath situation.
This will fix the missing network policy information issue in flow
records in OVS userspace datapath, like in Kind cluster.

Signed-off-by: Yongming Ding <dyongming@vmware.com>
  • Loading branch information
dreamtalen committed May 19, 2021
1 parent 335e6bb commit 321422e
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions pkg/agent/flowexporter/connections/conntrack_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package connections

import (
"encoding/hex"
"fmt"
"net"
"strconv"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe

// flowStringToAntreaConnection parses the flow string and converts to Antrea connection.
// Example of flow string:
// "tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)"
// "tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,labels=0x200000001,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)"
func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter.Connection, error) {
conn := flowexporter.Connection{}
flowSlice := strings.Split(flow, ",")
Expand Down Expand Up @@ -163,7 +164,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of sport %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of sport %s to int failed with error: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.TupleOrig.SourcePort = uint16(val)
Expand All @@ -176,7 +177,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of dport %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of dport %s to int failed with error: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.TupleOrig.DestinationPort = uint16(val)
Expand All @@ -187,7 +188,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 64)
if err != nil {
return nil, fmt.Errorf("conversion of packets %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of packets %s to int failed with error: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.OriginalPackets = uint64(val)
Expand All @@ -199,7 +200,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 64)
if err != nil {
return nil, fmt.Errorf("conversion of bytes %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of bytes %s to int failed with error: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.OriginalBytes = uint64(val)
Expand All @@ -213,7 +214,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
timeString := fields[len(fields)-1] + "Z"
val, err := time.Parse(time.RFC3339, timeString)
if err != nil {
return nil, fmt.Errorf("parsing start time %s failed", timeString)
return nil, fmt.Errorf("parsing start time %s failed with error: %v", timeString, err)
}
conn.StartTime = val
// TODO: We didn't find stoptime related field in flow string right now, need to investigate how stoptime is recorded and dumped.
Expand All @@ -224,7 +225,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of zone %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of zone %s to int failed with error: %v", fields[len(fields)-1], err)
}
if zoneFilter != uint16(val) {
break
Expand All @@ -236,21 +237,37 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of mark '%s' to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of mark '%s' to int failed with error: %v", fields[len(fields)-1], err)
}
conn.Mark = uint32(val)
case strings.Contains(fs, "timeout"):
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of timeout %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of timeout %s to int failed with error: %v", fields[len(fields)-1], err)
}
conn.Timeout = uint32(val)
case strings.Contains(fs, "labels"):
fields := strings.Split(fs, "=")
labelStr := strings.Replace(fields[len(fields)-1], "0x", "", -1)
// Add leading zeros since DecodeString() expects the input string has even length
if len(labelStr) < 16 {
labelStr = strings.Repeat("0", 16-len(labelStr)) + labelStr
}
hexval, err := hex.DecodeString(labelStr)
if err != nil {
return nil, fmt.Errorf("conversion of label string %s to []byte failed with error: %v", labelStr, err)
}
// Reverse the []byte slice to align with kernel side's result which is little endian
for i := 0; i < len(hexval)/2; i++ {
hexval[i], hexval[len(hexval)-i-1] = hexval[len(hexval)-i-1], hexval[i]
}
conn.Labels = hexval
case strings.Contains(fs, "id"):
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of id %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of id %s to int failed with error: %v", fields[len(fields)-1], err)
}
conn.ID = uint32(val)
case strings.Contains(fs, "protoinfo"):
Expand Down

0 comments on commit 321422e

Please sign in to comment.