Skip to content

Commit

Permalink
Fix the deadlock between exporter and conntrack polling go routines (#…
Browse files Browse the repository at this point in the history
…2429)

Deadlock is due to the access of connection map from exporter goroutine waiting
to acquire connection map lock to update the "DoneExport" flag of the stored
connection. At the same time, the connection polling goroutine acquires the
connection map lock waiting to acquire flow record map, which is acquired by the
exporter goroutine.
This was caught in scale testing.
Resolved this through a temporary fix by adding the same flag in the flow record data struct.

The connection and record deletion logic will be re-evaluated through PR #2360 as it refactors the related code quite a bit.

Signed-off-by: Srikar Tati <stati@vmware.com>
  • Loading branch information
srikartati authored Jul 21, 2021
1 parent 7469d7e commit 365623f
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 48 deletions.
15 changes: 1 addition & 14 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
// Reset IsPresent flag for all connections in connection map before dumping flows in conntrack module.
// if the connection does not exist in conntrack table and has been exported, we will delete it from connection map.
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if !conn.IsPresent && conn.DoneExport {
if !conn.IsPresent && conn.DyingAndDoneExport {
if err := cs.DeleteConnWithoutLock(key); err != nil {
return err
}
Expand Down Expand Up @@ -247,16 +247,3 @@ func (cs *ConntrackConnectionStore) DeleteConnWithoutLock(connKey flowexporter.C
metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
return nil
}

// SetExportDone sets DoneExport field of conntrack connection to true given the connection key.
func (cs *ConntrackConnectionStore) SetExportDone(connKey flowexporter.ConnectionKey) error {
cs.mutex.Lock()
defer cs.mutex.Unlock()

if conn, found := cs.connections[connKey]; !found {
return fmt.Errorf("connection with key %v does not exist in connection map", connKey)
} else {
conn.DoneExport = true
return nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn
updatedConns := make([]*flowexporter.Connection, length)
for i := 0; i < len(conns); i++ {
// replace deleted connection with new connection
if conns[i].DoneExport == true {
if conns[i].DyingAndDoneExport == true {
conns[i] = getNewConn()
} else { // update rest of connections
conns[i].OriginalPackets += 5
Expand All @@ -136,9 +136,9 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn
}
randomNum := getRandomNum(int64(length - testNumOfDeletedConns))
for i := randomNum; i < testNumOfDeletedConns+randomNum; i++ {
// hardcode DoneExport here for testing deletion of connections
// hardcode DyingAndDoneExport here for testing deletion of connections
// not valid for testing update and export of records
updatedConns[i].DoneExport = true
updatedConns[i].DyingAndDoneExport = true
}
return updatedConns
}
Expand All @@ -154,7 +154,7 @@ func getNewConn() *flowexporter.Connection {
StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second),
StopTime: time.Now(),
IsPresent: true,
DoneExport: false,
DyingAndDoneExport: false,
FlowKey: flowKey,
OriginalPackets: 10,
OriginalBytes: 100,
Expand Down
13 changes: 4 additions & 9 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,10 @@ func (exp *flowExporter) sendFlowRecords() error {
exp.numDataSetsSent = exp.numDataSetsSent + 1

if flowexporter.IsConnectionDying(&record.Conn) {
// If the connection is in dying state or connection is not in conntrack table,
// we will delete the flow records from records map.
klog.V(2).Infof("Deleting the inactive flow records with key: %v from record map", key)
if err := exp.flowRecords.DeleteFlowRecordWithoutLock(key); err != nil {
return err
}
if err := exp.conntrackConnStore.SetExportDone(key); err != nil {
return err
}
// If the connection is in dying state or connection is not in conntrack
// table, we set the DyingAndDoneExport flag to do the deletion later.
record.DyingAndDoneExport = true
exp.flowRecords.AddFlowRecordWithoutLock(&key, &record)
} else {
exp.flowRecords.ValidateAndUpdateStats(key, record)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/exporter/exporter_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func addConnsAndGetRecords(connStore *connections.ConntrackConnectionStore) *flo
StopTime: time.Now(),
LastExportTime: time.Now().Add(-time.Duration(randomNum1)*time.Millisecond - testActiveFlowTimeout),
IsPresent: true,
DoneExport: false,
DyingAndDoneExport: false,
FlowKey: flowKey,
OriginalPackets: 100,
OriginalBytes: 10,
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func runSendFlowRecordTests(t *testing.T, flowExp *flowExporter, isIPv6 bool) {
connKey := flowexporter.NewConnectionKey(conn)
flowExp.conntrackConnStore.AddOrUpdateConn(conn)
flowExp.flowRecords = flowrecords.NewFlowRecords()
err := flowExp.flowRecords.AddOrUpdateFlowRecord(connKey, conn)
err := flowExp.conntrackConnStore.ForAllConnectionsDo(flowExp.flowRecords.AddOrUpdateFlowRecord)
assert.NoError(t, err)
flowExp.numDataSetsSent = 0

Expand Down Expand Up @@ -498,10 +498,12 @@ func runSendFlowRecordTests(t *testing.T, flowExp *flowExporter, isIPv6 bool) {
assert.Equal(t, getNumOfConnections(flowExp.denyConnStore), 0)
}
if tt.isRecordActive && flowexporter.IsConnectionDying(conn) {
err = flowExp.conntrackConnStore.ForAllConnectionsDo(flowExp.flowRecords.AddOrUpdateFlowRecord)
assert.NoError(t, err)
_, recPresent := flowExp.flowRecords.GetFlowRecordFromMap(&connKey)
assert.Falsef(t, recPresent, "record should not be in the map")
connection, _ := flowExp.conntrackConnStore.GetConnByKey(connKey)
assert.True(t, connection.DoneExport)
assert.True(t, connection.DyingAndDoneExport)
}
})
}
Expand Down
38 changes: 22 additions & 16 deletions pkg/agent/flowexporter/flowrecords/flow_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package flowrecords

import (
"fmt"
"sync"
"time"

Expand All @@ -37,11 +36,12 @@ func NewFlowRecords() *FlowRecords {

// AddOrUpdateFlowRecord adds or updates the flow record in the record map given the connection.
// It makes a copy of the connection object to record, to avoid race conditions between the
// connection store and the flow exporter.
// connection store and the flow exporter. We expect caller to hold the lock for
// the connection store.
func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
// If the connection is in dying state and the corresponding flow records are already
// exported, then there is no need to add or update the record.
if flowexporter.IsConnectionDying(conn) && conn.DoneExport {
// If the connection is in dying state and is already exported, then there is
// no need to add or update the record.
if conn.DyingAndDoneExport {
return nil
}

Expand All @@ -63,8 +63,19 @@ func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, con
IsIPv6: isIPv6,
LastExportTime: conn.StartTime,
IsActive: true,
DyingAndDoneExport: false,
}
} else {
// If the connection is in dying state and the corresponding flow records are already
// exported, then update the DyingAndDoneExport flag on the connection.
if record.DyingAndDoneExport {
// It is safe to update the connection as we hold the connection map
// lock when calling this function.
conn.DyingAndDoneExport = true
delete(fr.recordsMap, key)
klog.V(2).InfoS("Deleting the inactive flow records in record map", "FlowKey", key)
return nil
}
// set IsActive flag to true when there are changes either in stats or TCP state
if (conn.OriginalPackets > record.PrevPackets) || (conn.ReversePackets > record.PrevReversePackets) || record.Conn.TCPState != conn.TCPState {
record.IsActive = true
Expand All @@ -83,6 +94,12 @@ func (fr *FlowRecords) AddFlowRecordToMap(connKey *flowexporter.ConnectionKey, r
fr.recordsMap[*connKey] = *record
}

// AddFlowRecordWithoutLock adds the flow record from record map given connection key.
// Caller is expected to grab the lock the record map.
func (fr *FlowRecords) AddFlowRecordWithoutLock(connKey *flowexporter.ConnectionKey, record *flowexporter.FlowRecord) {
fr.recordsMap[*connKey] = *record
}

// GetFlowRecordFromMap gets the flow record from record map given connection key.
// This is used only for unit tests.
func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey) (*flowexporter.FlowRecord, bool) {
Expand All @@ -92,17 +109,6 @@ func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey)
return &record, exists
}

// DeleteFlowRecordWithoutLock deletes the record from the record map given
// the connection key without grabbing the lock. Caller is expected to grab lock.
func (fr *FlowRecords) DeleteFlowRecordWithoutLock(connKey flowexporter.ConnectionKey) error {
_, exists := fr.recordsMap[connKey]
if !exists {
return fmt.Errorf("flow record with key %v doesn't exist in map", connKey)
}
delete(fr.recordsMap, connKey)
return nil
}

// ValidateAndUpdateStats validates and updates the flow record given the connection
// key. Caller is expected to grab lock.
func (fr *FlowRecords) ValidateAndUpdateStats(connKey flowexporter.ConnectionKey, record flowexporter.FlowRecord) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type Connection struct {
StopTime time.Time
// IsPresent flag helps in cleaning up connections when they are not in conntrack table anymore.
IsPresent bool
// DoneExport marks whether the related flow records are already exported or not so that we can
// DyingAndDoneExport marks whether the related flow records are already exported or not so that we can
// safely delete the connection from the connection map.
DoneExport bool
DyingAndDoneExport bool
Zone uint16
Mark uint32
StatusFlag uint32
Expand Down Expand Up @@ -88,5 +88,6 @@ type FlowRecord struct {
PrevReverseBytes uint64
IsIPv6 bool
LastExportTime time.Time
DyingAndDoneExport bool
IsActive bool
}

0 comments on commit 365623f

Please sign in to comment.