diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index 9a6b2a61d11..3b80bd440b6 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -103,7 +103,7 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) { // Reset IsPresent flag for all connections in connection map before dumping flows in conntrack module. // if the connection does not exist in conntrack table and has been exported, we will delete it from connection map. deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error { - if !conn.IsPresent && conn.DoneExport { + if !conn.IsPresent && conn.DyingAndDoneExport { if err := cs.DeleteConnWithoutLock(key); err != nil { return err } @@ -247,16 +247,3 @@ func (cs *ConntrackConnectionStore) DeleteConnWithoutLock(connKey flowexporter.C metrics.TotalAntreaConnectionsInConnTrackTable.Dec() return nil } - -// SetExportDone sets DoneExport field of conntrack connection to true given the connection key. -func (cs *ConntrackConnectionStore) SetExportDone(connKey flowexporter.ConnectionKey) error { - cs.mutex.Lock() - defer cs.mutex.Unlock() - - if conn, found := cs.connections[connKey]; !found { - return fmt.Errorf("connection with key %v does not exist in connection map", connKey) - } else { - conn.DoneExport = true - return nil - } -} diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index cf49bab18f2..0dc44c4ca0e 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -121,7 +121,7 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn updatedConns := make([]*flowexporter.Connection, length) for i := 0; i < len(conns); i++ { // replace deleted connection with new connection - if conns[i].DoneExport == true { + if conns[i].DyingAndDoneExport == true { conns[i] = getNewConn() } else { // update rest of connections conns[i].OriginalPackets += 5 @@ -136,9 +136,9 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn } randomNum := getRandomNum(int64(length - testNumOfDeletedConns)) for i := randomNum; i < testNumOfDeletedConns+randomNum; i++ { - // hardcode DoneExport here for testing deletion of connections + // hardcode DyingAndDoneExport here for testing deletion of connections // not valid for testing update and export of records - updatedConns[i].DoneExport = true + updatedConns[i].DyingAndDoneExport = true } return updatedConns } @@ -154,7 +154,7 @@ func getNewConn() *flowexporter.Connection { StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), StopTime: time.Now(), IsPresent: true, - DoneExport: false, + DyingAndDoneExport: false, FlowKey: flowKey, OriginalPackets: 10, OriginalBytes: 100, diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index dd364c1f337..332f5bfd5eb 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -301,15 +301,10 @@ func (exp *flowExporter) sendFlowRecords() error { exp.numDataSetsSent = exp.numDataSetsSent + 1 if flowexporter.IsConnectionDying(&record.Conn) { - // If the connection is in dying state or connection is not in conntrack table, - // we will delete the flow records from records map. - klog.V(2).Infof("Deleting the inactive flow records with key: %v from record map", key) - if err := exp.flowRecords.DeleteFlowRecordWithoutLock(key); err != nil { - return err - } - if err := exp.conntrackConnStore.SetExportDone(key); err != nil { - return err - } + // If the connection is in dying state or connection is not in conntrack + // table, we set the DyingAndDoneExport flag to do the deletion later. + record.DyingAndDoneExport = true + exp.flowRecords.AddFlowRecordWithoutLock(&key, &record) } else { exp.flowRecords.ValidateAndUpdateStats(key, record) } diff --git a/pkg/agent/flowexporter/exporter/exporter_perf_test.go b/pkg/agent/flowexporter/exporter/exporter_perf_test.go index 0ea6fc63c7b..db967c703ce 100644 --- a/pkg/agent/flowexporter/exporter/exporter_perf_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_perf_test.go @@ -187,7 +187,7 @@ func addConnsAndGetRecords(connStore *connections.ConntrackConnectionStore) *flo StopTime: time.Now(), LastExportTime: time.Now().Add(-time.Duration(randomNum1)*time.Millisecond - testActiveFlowTimeout), IsPresent: true, - DoneExport: false, + DyingAndDoneExport: false, FlowKey: flowKey, OriginalPackets: 100, OriginalBytes: 10, diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 0d23049c893..d5e795d2179 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -452,7 +452,7 @@ func runSendFlowRecordTests(t *testing.T, flowExp *flowExporter, isIPv6 bool) { connKey := flowexporter.NewConnectionKey(conn) flowExp.conntrackConnStore.AddOrUpdateConn(conn) flowExp.flowRecords = flowrecords.NewFlowRecords() - err := flowExp.flowRecords.AddOrUpdateFlowRecord(connKey, conn) + err := flowExp.conntrackConnStore.ForAllConnectionsDo(flowExp.flowRecords.AddOrUpdateFlowRecord) assert.NoError(t, err) flowExp.numDataSetsSent = 0 @@ -498,10 +498,12 @@ func runSendFlowRecordTests(t *testing.T, flowExp *flowExporter, isIPv6 bool) { assert.Equal(t, getNumOfConnections(flowExp.denyConnStore), 0) } if tt.isRecordActive && flowexporter.IsConnectionDying(conn) { + err = flowExp.conntrackConnStore.ForAllConnectionsDo(flowExp.flowRecords.AddOrUpdateFlowRecord) + assert.NoError(t, err) _, recPresent := flowExp.flowRecords.GetFlowRecordFromMap(&connKey) assert.Falsef(t, recPresent, "record should not be in the map") connection, _ := flowExp.conntrackConnStore.GetConnByKey(connKey) - assert.True(t, connection.DoneExport) + assert.True(t, connection.DyingAndDoneExport) } }) } diff --git a/pkg/agent/flowexporter/flowrecords/flow_records.go b/pkg/agent/flowexporter/flowrecords/flow_records.go index 6805d360c35..1c8a61e99bf 100644 --- a/pkg/agent/flowexporter/flowrecords/flow_records.go +++ b/pkg/agent/flowexporter/flowrecords/flow_records.go @@ -15,7 +15,6 @@ package flowrecords import ( - "fmt" "sync" "time" @@ -37,11 +36,12 @@ func NewFlowRecords() *FlowRecords { // AddOrUpdateFlowRecord adds or updates the flow record in the record map given the connection. // It makes a copy of the connection object to record, to avoid race conditions between the -// connection store and the flow exporter. +// connection store and the flow exporter. We expect caller to hold the lock for +// the connection store. func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error { - // If the connection is in dying state and the corresponding flow records are already - // exported, then there is no need to add or update the record. - if flowexporter.IsConnectionDying(conn) && conn.DoneExport { + // If the connection is in dying state and is already exported, then there is + // no need to add or update the record. + if conn.DyingAndDoneExport { return nil } @@ -63,8 +63,19 @@ func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, con IsIPv6: isIPv6, LastExportTime: conn.StartTime, IsActive: true, + DyingAndDoneExport: false, } } else { + // If the connection is in dying state and the corresponding flow records are already + // exported, then update the DyingAndDoneExport flag on the connection. + if record.DyingAndDoneExport { + // It is safe to update the connection as we hold the connection map + // lock when calling this function. + conn.DyingAndDoneExport = true + delete(fr.recordsMap, key) + klog.V(2).InfoS("Deleting the inactive flow records in record map", "FlowKey", key) + return nil + } // set IsActive flag to true when there are changes either in stats or TCP state if (conn.OriginalPackets > record.PrevPackets) || (conn.ReversePackets > record.PrevReversePackets) || record.Conn.TCPState != conn.TCPState { record.IsActive = true @@ -83,6 +94,12 @@ func (fr *FlowRecords) AddFlowRecordToMap(connKey *flowexporter.ConnectionKey, r fr.recordsMap[*connKey] = *record } +// AddFlowRecordWithoutLock adds the flow record from record map given connection key. +// Caller is expected to grab the lock the record map. +func (fr *FlowRecords) AddFlowRecordWithoutLock(connKey *flowexporter.ConnectionKey, record *flowexporter.FlowRecord) { + fr.recordsMap[*connKey] = *record +} + // GetFlowRecordFromMap gets the flow record from record map given connection key. // This is used only for unit tests. func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey) (*flowexporter.FlowRecord, bool) { @@ -92,17 +109,6 @@ func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey) return &record, exists } -// DeleteFlowRecordWithoutLock deletes the record from the record map given -// the connection key without grabbing the lock. Caller is expected to grab lock. -func (fr *FlowRecords) DeleteFlowRecordWithoutLock(connKey flowexporter.ConnectionKey) error { - _, exists := fr.recordsMap[connKey] - if !exists { - return fmt.Errorf("flow record with key %v doesn't exist in map", connKey) - } - delete(fr.recordsMap, connKey) - return nil -} - // ValidateAndUpdateStats validates and updates the flow record given the connection // key. Caller is expected to grab lock. func (fr *FlowRecords) ValidateAndUpdateStats(connKey flowexporter.ConnectionKey, record flowexporter.FlowRecord) { diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go index 182d98b34df..4cb01bd849a 100644 --- a/pkg/agent/flowexporter/types.go +++ b/pkg/agent/flowexporter/types.go @@ -43,9 +43,9 @@ type Connection struct { StopTime time.Time // IsPresent flag helps in cleaning up connections when they are not in conntrack table anymore. IsPresent bool - // DoneExport marks whether the related flow records are already exported or not so that we can + // DyingAndDoneExport marks whether the related flow records are already exported or not so that we can // safely delete the connection from the connection map. - DoneExport bool + DyingAndDoneExport bool Zone uint16 Mark uint32 StatusFlag uint32 @@ -88,5 +88,6 @@ type FlowRecord struct { PrevReverseBytes uint64 IsIPv6 bool LastExportTime time.Time + DyingAndDoneExport bool IsActive bool }