Skip to content

Commit

Permalink
Do not send double copies of flow records
Browse files Browse the repository at this point in the history
This will be removed when network policy info is added in flow records.
  • Loading branch information
srikartati committed Jul 30, 2020
1 parent 7694330 commit 9f04cb6
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 4 deletions.
9 changes: 8 additions & 1 deletion pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
conn.DestinationPodName = dIface.ContainerInterfaceConfig.PodName
conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace
}
// Do not export flow records of connections, who destination is local pod and source is remote pod.
// We export flow records only form "source node", where the connection is originated from. This is to avoid
// 2 copies of flow records. This restriction will be removed when flow records store network policy rule ID.
// TODO: Remove this when network policy rule ID are added to flow records.
if !srcFound && dstFound {
conn.DoExport = false
}
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
cs.connections[connKey] = *conn
Expand All @@ -149,7 +156,7 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM
return nil
}

// poll returns number of filtered connections after poll cycle
// Poll returns number of filtered connections after poll cycle
// TODO: Optimize polling cycle--Only poll invalid/close connection during every poll. Poll established right before export
func (cs *connectionStore) Poll() (int, error) {
klog.V(2).Infof("Polling conntrack")
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (ctnd *connTrackNetdev) DumpFilter(filter interface{}) ([]*flowexporter.Con
}
if inZone {
conn.IsActive = true
conn.DoExport = true
antreaConns = append(antreaConns, &conn)
}
}
Expand Down Expand Up @@ -332,6 +333,7 @@ func createAntreaConn(conn *conntrack.Flow) *flowexporter.Connection {
conn.Timestamp.Start,
conn.Timestamp.Stop,
true,
true,
conn.Zone,
uint32(conn.Status.Value),
tupleOrig,
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/connections/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestConnTackNetdev_DumpFilter(t *testing.T) {
StartTime: time.Time{},
StopTime: time.Time{},
IsActive: true,
DoExport: true,
Zone: 65520,
StatusFlag: 0,
TupleOrig: flowexporter.Tuple{
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/flowexporter/flowrecords/flow_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func (fr *flowRecords) ForAllFlowRecordsDo(callback flowexporter.FlowRecordCallB
}

func (fr *flowRecords) addOrUpdateFlowRecord(key flowexporter.ConnectionKey, conn flowexporter.Connection) error {
// If DoExport flag is not set return immediately.
if !conn.DoExport {
return nil
}

record, exists := fr.recordsMap[key]
if !exists {
record = flowexporter.FlowRecord{
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type Connection struct {
// For established connections: StopTime is latest time when it was polled.
StopTime time.Time
// IsActive flag helps in cleaning up connections when they are not in conntrack any module more.
IsActive bool
IsActive bool
// DoExport flag helps in tagging connections that can be exported by Flow Exporter
DoExport bool
Zone uint16
StatusFlag uint32
// TODO: Have a separate field for protocol. No need to keep it in Tuple.
Expand Down
12 changes: 10 additions & 2 deletions test/integration/agent/flowexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func createConnsForTest() ([]*flowexporter.Connection, []*flowexporter.Connectio
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
DoExport: true,
}
testConnKey1 := flowexporter.NewConnectionKey(testConn1)
testConns[0] = testConn1
Expand All @@ -69,6 +70,7 @@ func createConnsForTest() ([]*flowexporter.Connection, []*flowexporter.Connectio
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
DoExport: true,
}
testConnKey2 := flowexporter.NewConnectionKey(testConn2)
testConns[1] = testConn2
Expand Down Expand Up @@ -97,8 +99,12 @@ func testBuildFlowRecords(t *testing.T, flowRecords flowrecords.FlowRecords, con
// Check if records in flow records are built as expected or not
for i, expRecConn := range conns {
actualRec, found := flowRecords.GetFlowRecordByConnKey(*connKeys[i])
assert.Equal(t, found, true, "testConn should be part of flow records")
assert.Equal(t, actualRec.Conn, expRecConn, "testConn and connection in connection store should be equal")
if expRecConn.DoExport {
assert.Equal(t, found, true, "testConn should be part of flow records")
assert.Equal(t, actualRec.Conn, expRecConn, "testConn and connection in connection store should be equal")
} else {
assert.Equal(t, found, false, "testConn should be not part of flow records")
}
}
}

Expand Down Expand Up @@ -142,9 +148,11 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) {
if i == 0 {
expConn.SourcePodName = testIfConfigs[i].PodName
expConn.SourcePodNamespace = testIfConfigs[i].PodNamespace
expConn.DoExport = true
} else {
expConn.DestinationPodName = testIfConfigs[i].PodName
expConn.DestinationPodNamespace = testIfConfigs[i].PodNamespace
expConn.DoExport = false
}
actualConn, found := connStore.GetConnByKey(*testConnKeys[i])
assert.Equal(t, found, true, "testConn should be present in connection store")
Expand Down

0 comments on commit 9f04cb6

Please sign in to comment.