From c3cd2f4198e8dec222afd81d53ddb350668803b6 Mon Sep 17 00:00:00 2001 From: Srikar Tati Date: Fri, 16 Jul 2021 16:55:07 -0700 Subject: [PATCH] Fix the deadlock between exporter and conntrack polling go routines Deadlock is due to the access of connection map from exporter goroutine waiting to acquire connection map lock to update the "DoneExport" flag of the stored connection. At the same time, the connection polling goroutine acquires the connection map lock waiting to acquire flow record map, which is acquired by the exporter goroutine. This was caught in scale testing. Resolved this through a temporary fix by adding the same flag in the flow record data struct. The connection and record deletion logic will be re-evaluated through PR #2360 as it refactors the related code quite a bit. Signed-off-by: Srikar Tati --- .../connections/conntrack_connections.go | 15 +------- .../conntrack_connections_perf_test.go | 8 ++-- pkg/agent/flowexporter/exporter/exporter.go | 13 ++----- .../exporter/exporter_perf_test.go | 2 +- .../flowexporter/exporter/exporter_test.go | 6 ++- .../flowexporter/flowrecords/flow_records.go | 38 +++++++++++-------- pkg/agent/flowexporter/types.go | 5 ++- 7 files changed, 39 insertions(+), 48 deletions(-) diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index 9a6b2a61d11..3b80bd440b6 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -103,7 +103,7 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) { // Reset IsPresent flag for all connections in connection map before dumping flows in conntrack module. // if the connection does not exist in conntrack table and has been exported, we will delete it from connection map. deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error { - if !conn.IsPresent && conn.DoneExport { + if !conn.IsPresent && conn.DyingAndDoneExport { if err := cs.DeleteConnWithoutLock(key); err != nil { return err } @@ -247,16 +247,3 @@ func (cs *ConntrackConnectionStore) DeleteConnWithoutLock(connKey flowexporter.C metrics.TotalAntreaConnectionsInConnTrackTable.Dec() return nil } - -// SetExportDone sets DoneExport field of conntrack connection to true given the connection key. -func (cs *ConntrackConnectionStore) SetExportDone(connKey flowexporter.ConnectionKey) error { - cs.mutex.Lock() - defer cs.mutex.Unlock() - - if conn, found := cs.connections[connKey]; !found { - return fmt.Errorf("connection with key %v does not exist in connection map", connKey) - } else { - conn.DoneExport = true - return nil - } -} diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index cf49bab18f2..0dc44c4ca0e 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -121,7 +121,7 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn updatedConns := make([]*flowexporter.Connection, length) for i := 0; i < len(conns); i++ { // replace deleted connection with new connection - if conns[i].DoneExport == true { + if conns[i].DyingAndDoneExport == true { conns[i] = getNewConn() } else { // update rest of connections conns[i].OriginalPackets += 5 @@ -136,9 +136,9 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn } randomNum := getRandomNum(int64(length - testNumOfDeletedConns)) for i := randomNum; i < testNumOfDeletedConns+randomNum; i++ { - // hardcode DoneExport here for testing deletion of connections + // hardcode DyingAndDoneExport here for testing deletion of connections // not valid for testing update and export of records - updatedConns[i].DoneExport = true + updatedConns[i].DyingAndDoneExport = true } return updatedConns } @@ -154,7 +154,7 @@ func getNewConn() *flowexporter.Connection { StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), StopTime: time.Now(), IsPresent: true, - DoneExport: false, + DyingAndDoneExport: false, FlowKey: flowKey, OriginalPackets: 10, OriginalBytes: 100, diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index dd364c1f337..332f5bfd5eb 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -301,15 +301,10 @@ func (exp *flowExporter) sendFlowRecords() error { exp.numDataSetsSent = exp.numDataSetsSent + 1 if flowexporter.IsConnectionDying(&record.Conn) { - // If the connection is in dying state or connection is not in conntrack table, - // we will delete the flow records from records map. - klog.V(2).Infof("Deleting the inactive flow records with key: %v from record map", key) - if err := exp.flowRecords.DeleteFlowRecordWithoutLock(key); err != nil { - return err - } - if err := exp.conntrackConnStore.SetExportDone(key); err != nil { - return err - } + // If the connection is in dying state or connection is not in conntrack + // table, we set the DyingAndDoneExport flag to do the deletion later. + record.DyingAndDoneExport = true + exp.flowRecords.AddFlowRecordWithoutLock(&key, &record) } else { exp.flowRecords.ValidateAndUpdateStats(key, record) } diff --git a/pkg/agent/flowexporter/exporter/exporter_perf_test.go b/pkg/agent/flowexporter/exporter/exporter_perf_test.go index 0ea6fc63c7b..db967c703ce 100644 --- a/pkg/agent/flowexporter/exporter/exporter_perf_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_perf_test.go @@ -187,7 +187,7 @@ func addConnsAndGetRecords(connStore *connections.ConntrackConnectionStore) *flo StopTime: time.Now(), LastExportTime: time.Now().Add(-time.Duration(randomNum1)*time.Millisecond - testActiveFlowTimeout), IsPresent: true, - DoneExport: false, + DyingAndDoneExport: false, FlowKey: flowKey, OriginalPackets: 100, OriginalBytes: 10, diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 0d23049c893..d5e795d2179 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -452,7 +452,7 @@ func runSendFlowRecordTests(t *testing.T, flowExp *flowExporter, isIPv6 bool) { connKey := flowexporter.NewConnectionKey(conn) flowExp.conntrackConnStore.AddOrUpdateConn(conn) flowExp.flowRecords = flowrecords.NewFlowRecords() - err := flowExp.flowRecords.AddOrUpdateFlowRecord(connKey, conn) + err := flowExp.conntrackConnStore.ForAllConnectionsDo(flowExp.flowRecords.AddOrUpdateFlowRecord) assert.NoError(t, err) flowExp.numDataSetsSent = 0 @@ -498,10 +498,12 @@ func runSendFlowRecordTests(t *testing.T, flowExp *flowExporter, isIPv6 bool) { assert.Equal(t, getNumOfConnections(flowExp.denyConnStore), 0) } if tt.isRecordActive && flowexporter.IsConnectionDying(conn) { + err = flowExp.conntrackConnStore.ForAllConnectionsDo(flowExp.flowRecords.AddOrUpdateFlowRecord) + assert.NoError(t, err) _, recPresent := flowExp.flowRecords.GetFlowRecordFromMap(&connKey) assert.Falsef(t, recPresent, "record should not be in the map") connection, _ := flowExp.conntrackConnStore.GetConnByKey(connKey) - assert.True(t, connection.DoneExport) + assert.True(t, connection.DyingAndDoneExport) } }) } diff --git a/pkg/agent/flowexporter/flowrecords/flow_records.go b/pkg/agent/flowexporter/flowrecords/flow_records.go index 6805d360c35..1c8a61e99bf 100644 --- a/pkg/agent/flowexporter/flowrecords/flow_records.go +++ b/pkg/agent/flowexporter/flowrecords/flow_records.go @@ -15,7 +15,6 @@ package flowrecords import ( - "fmt" "sync" "time" @@ -37,11 +36,12 @@ func NewFlowRecords() *FlowRecords { // AddOrUpdateFlowRecord adds or updates the flow record in the record map given the connection. // It makes a copy of the connection object to record, to avoid race conditions between the -// connection store and the flow exporter. +// connection store and the flow exporter. We expect caller to hold the lock for +// the connection store. func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error { - // If the connection is in dying state and the corresponding flow records are already - // exported, then there is no need to add or update the record. - if flowexporter.IsConnectionDying(conn) && conn.DoneExport { + // If the connection is in dying state and is already exported, then there is + // no need to add or update the record. + if conn.DyingAndDoneExport { return nil } @@ -63,8 +63,19 @@ func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, con IsIPv6: isIPv6, LastExportTime: conn.StartTime, IsActive: true, + DyingAndDoneExport: false, } } else { + // If the connection is in dying state and the corresponding flow records are already + // exported, then update the DyingAndDoneExport flag on the connection. + if record.DyingAndDoneExport { + // It is safe to update the connection as we hold the connection map + // lock when calling this function. + conn.DyingAndDoneExport = true + delete(fr.recordsMap, key) + klog.V(2).InfoS("Deleting the inactive flow records in record map", "FlowKey", key) + return nil + } // set IsActive flag to true when there are changes either in stats or TCP state if (conn.OriginalPackets > record.PrevPackets) || (conn.ReversePackets > record.PrevReversePackets) || record.Conn.TCPState != conn.TCPState { record.IsActive = true @@ -83,6 +94,12 @@ func (fr *FlowRecords) AddFlowRecordToMap(connKey *flowexporter.ConnectionKey, r fr.recordsMap[*connKey] = *record } +// AddFlowRecordWithoutLock adds the flow record from record map given connection key. +// Caller is expected to grab the lock the record map. +func (fr *FlowRecords) AddFlowRecordWithoutLock(connKey *flowexporter.ConnectionKey, record *flowexporter.FlowRecord) { + fr.recordsMap[*connKey] = *record +} + // GetFlowRecordFromMap gets the flow record from record map given connection key. // This is used only for unit tests. func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey) (*flowexporter.FlowRecord, bool) { @@ -92,17 +109,6 @@ func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey) return &record, exists } -// DeleteFlowRecordWithoutLock deletes the record from the record map given -// the connection key without grabbing the lock. Caller is expected to grab lock. -func (fr *FlowRecords) DeleteFlowRecordWithoutLock(connKey flowexporter.ConnectionKey) error { - _, exists := fr.recordsMap[connKey] - if !exists { - return fmt.Errorf("flow record with key %v doesn't exist in map", connKey) - } - delete(fr.recordsMap, connKey) - return nil -} - // ValidateAndUpdateStats validates and updates the flow record given the connection // key. Caller is expected to grab lock. func (fr *FlowRecords) ValidateAndUpdateStats(connKey flowexporter.ConnectionKey, record flowexporter.FlowRecord) { diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go index 182d98b34df..4cb01bd849a 100644 --- a/pkg/agent/flowexporter/types.go +++ b/pkg/agent/flowexporter/types.go @@ -43,9 +43,9 @@ type Connection struct { StopTime time.Time // IsPresent flag helps in cleaning up connections when they are not in conntrack table anymore. IsPresent bool - // DoneExport marks whether the related flow records are already exported or not so that we can + // DyingAndDoneExport marks whether the related flow records are already exported or not so that we can // safely delete the connection from the connection map. - DoneExport bool + DyingAndDoneExport bool Zone uint16 Mark uint32 StatusFlag uint32 @@ -88,5 +88,6 @@ type FlowRecord struct { PrevReverseBytes uint64 IsIPv6 bool LastExportTime time.Time + DyingAndDoneExport bool IsActive bool }