Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Follow Up]Antrea Network Policy Audit Logging Deduplication and Unit Tests #2578

Merged
merged 6 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func run(o *Options) error {
// In Antrea agent, status manager and audit logging will automatically be enabled
// if AntreaPolicy feature is enabled.
statusManagerEnabled := antreaPolicyEnabled
loggingEnabled := antreaPolicyEnabled

var denyConnStore *connections.DenyConnectionStore
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
Expand All @@ -211,6 +212,7 @@ func run(o *Options) error {
entityUpdates,
antreaPolicyEnabled,
statusManagerEnabled,
loggingEnabled,
denyConnStore,
asyncRuleDeleteInterval)
if err != nil {
Expand Down
59 changes: 31 additions & 28 deletions pkg/agent/controller/networkpolicy/audit_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
// AntreaPolicyLogger is used for Antrea policy audit logging.
// Includes a lumberjack logger and a map used for log deduplication.
type AntreaPolicyLogger struct {
bufferLength time.Duration
anpLogger *log.Logger
logDeduplication logRecordDedupMap
}
Expand Down Expand Up @@ -67,57 +68,58 @@ type logRecordDedupMap struct {
}

// getLogKey returns the log record in logDeduplication map by logMsg.
func (a *AntreaPolicyLogger) getLogKey(logMsg string) *logDedupRecord {
a.logDeduplication.logMutex.Lock()
defer a.logDeduplication.logMutex.Unlock()
return a.logDeduplication.logMap[logMsg]
func (l *AntreaPolicyLogger) getLogKey(logMsg string) *logDedupRecord {
l.logDeduplication.logMutex.Lock()
defer l.logDeduplication.logMutex.Unlock()
return l.logDeduplication.logMap[logMsg]
}

// logAfterTimer runs concurrently until buffer timer stops, then call terminateLogKey.
func (a *AntreaPolicyLogger) logAfterTimer(logMsg string) {
logRecordTimer := a.getLogKey(logMsg).bufferTimer
func (l *AntreaPolicyLogger) logAfterTimer(logMsg string) {
logRecordTimer := l.getLogKey(logMsg).bufferTimer
<-logRecordTimer.C
a.terminateLogKey(logMsg)
l.terminateLogKey(logMsg)
}

// terminateLogKey logs and deletes the log record in logDeduplication map by logMsg.
func (a *AntreaPolicyLogger) terminateLogKey(logMsg string) {
a.logDeduplication.logMutex.Lock()
defer a.logDeduplication.logMutex.Unlock()
if a.logDeduplication.logMap[logMsg].count == 1 {
a.anpLogger.Printf(logMsg)
func (l *AntreaPolicyLogger) terminateLogKey(logMsg string) {
l.logDeduplication.logMutex.Lock()
defer l.logDeduplication.logMutex.Unlock()
logRecord := l.logDeduplication.logMap[logMsg]
if logRecord.count == 1 {
l.anpLogger.Printf(logMsg)
} else {
a.anpLogger.Printf("%s [%d packets in %s]", logMsg, a.logDeduplication.logMap[logMsg].count, time.Since(a.logDeduplication.logMap[logMsg].initTime))
l.anpLogger.Printf("%s [%d packets in %s]", logMsg, logRecord.count, time.Since(logRecord.initTime))
}
delete(a.logDeduplication.logMap, logMsg)
delete(l.logDeduplication.logMap, logMsg)
}

// updateLogKey initiates record or increases the count in logDeduplication corresponding to given logMsg.
func (a *AntreaPolicyLogger) updateLogKey(logMsg string, bufferLength time.Duration) bool {
a.logDeduplication.logMutex.Lock()
defer a.logDeduplication.logMutex.Unlock()
_, exists := a.logDeduplication.logMap[logMsg]
func (l *AntreaPolicyLogger) updateLogKey(logMsg string, bufferLength time.Duration) bool {
l.logDeduplication.logMutex.Lock()
defer l.logDeduplication.logMutex.Unlock()
_, exists := l.logDeduplication.logMap[logMsg]
if exists {
a.logDeduplication.logMap[logMsg].count++
l.logDeduplication.logMap[logMsg].count++
} else {
record := logDedupRecord{1, time.Now(), time.NewTimer(bufferLength)}
a.logDeduplication.logMap[logMsg] = &record
l.logDeduplication.logMap[logMsg] = &record
}
return exists
}

// logDedupPacket logs information in ob based on disposition and duplication conditions.
func (a *AntreaPolicyLogger) logDedupPacket(ob *logInfo, bufferLength time.Duration) {
// LogDedupPacket logs information in ob based on disposition and duplication conditions.
func (l *AntreaPolicyLogger) LogDedupPacket(ob *logInfo) {
// Deduplicate non-Allow packet log.
logMsg := fmt.Sprintf("%s %s %s %s SRC: %s DEST: %s %d %s", ob.tableName, ob.npRef, ob.disposition, ob.ofPriority, ob.srcIP, ob.destIP, ob.pktLength, ob.protocolStr)
if ob.disposition == openflow.DispositionToString[openflow.DispositionAllow] {
a.anpLogger.Printf(logMsg)
l.anpLogger.Printf(logMsg)
} else {
// Increase count if duplicated within 1 sec, create buffer otherwise.
exists := a.updateLogKey(logMsg, bufferLength)
exists := l.updateLogKey(logMsg, l.bufferLength)
if !exists {
// Go routine for logging when buffer timer stops.
go a.logAfterTimer(logMsg)
go l.logAfterTimer(logMsg)
}
}
}
Expand All @@ -143,10 +145,11 @@ func newAntreaPolicyLogger() (*AntreaPolicyLogger, error) {
Compress: true, // compress the old log files for backup
}

cAntreaPolicyLogger := &AntreaPolicyLogger{
antreaPolicyLogger := &AntreaPolicyLogger{
bufferLength: time.Second,
anpLogger: log.New(logOutput, "", log.Ldate|log.Lmicroseconds),
logDeduplication: logRecordDedupMap{logMap: make(map[string]*logDedupRecord)},
}
klog.V(2).Infof("Initialized Antrea-native Policy Logger for audit logging with log file '%s'", logFile)
return cAntreaPolicyLogger, nil
klog.InfoS("Initialized Antrea-native Policy Logger for audit logging", "logFile", logFile)
return antreaPolicyLogger, nil
}
64 changes: 46 additions & 18 deletions pkg/agent/controller/networkpolicy/audit_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ import (
"errors"
"fmt"
"log"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const (
testBufferLength time.Duration = 100 * time.Millisecond
)

// mockLogger implements io.Writer.
type mockLogger struct {
mu sync.Mutex
Expand All @@ -42,17 +48,18 @@ func (l *mockLogger) Write(p []byte) (n int, err error) {
return len(msg), nil
}

func newTestAntreaPolicyLogger() (*AntreaPolicyLogger, *mockLogger) {
func newTestAntreaPolicyLogger(bufferLength time.Duration) (*AntreaPolicyLogger, *mockLogger) {
mockAnpLogger := &mockLogger{logged: make(chan string, 100)}
antreaLogger := &AntreaPolicyLogger{
bufferLength: bufferLength,
anpLogger: log.New(mockAnpLogger, "", log.Ldate),
logDeduplication: logRecordDedupMap{logMap: make(map[string]*logDedupRecord)},
}
return antreaLogger, mockAnpLogger
}

func newLogInfo(disposition string) (*logInfo, string) {
expected := "AntreaPolicyIngressRule AntreaNetworkPolicy:default/test " + disposition + " 0 SRC: 0.0.0.0 DEST: 1.1.1.1 60 TCP"
expected := fmt.Sprintf("AntreaPolicyIngressRule AntreaNetworkPolicy:default/test %s 0 SRC: 0.0.0.0 DEST: 1.1.1.1 60 TCP", disposition)
return &logInfo{
tableName: "AntreaPolicyIngressRule",
npRef: "AntreaNetworkPolicy:default/test",
Expand All @@ -65,11 +72,11 @@ func newLogInfo(disposition string) (*logInfo, string) {
}, expected
}

func sendMultiplePackets(antreaLogger *AntreaPolicyLogger, ob *logInfo, numPackets int) {
func sendMultiplePackets(antreaLogger *AntreaPolicyLogger, ob *logInfo, numPackets int, sendInterval time.Duration) {
count := 0
for range time.Tick(12 * time.Millisecond) {
for range time.Tick(sendInterval) {
count += 1
antreaLogger.logDedupPacket(ob, 100*time.Millisecond)
antreaLogger.LogDedupPacket(ob)
if count == numPackets {
break
}
Expand All @@ -81,41 +88,62 @@ func expectedLogWithCount(msg string, count int) string {
}

func TestAllowPacketLog(t *testing.T) {
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger()
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength)
ob, expected := newLogInfo("Allow")

antreaLogger.logDedupPacket(ob, 100*time.Millisecond)
antreaLogger.LogDedupPacket(ob)
actual := <-mockAnpLogger.logged
assert.Contains(t, actual, expected)
}

func TestDropPacketLog(t *testing.T) {
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger()
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength)
ob, expected := newLogInfo("Drop")

antreaLogger.logDedupPacket(ob, 100*time.Millisecond)
antreaLogger.LogDedupPacket(ob)
actual := <-mockAnpLogger.logged
assert.Contains(t, actual, expected)
}

func TestDropPacketDedupLog(t *testing.T) {
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger()
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength)
ob, expected := newLogInfo("Drop")
// add the additional log info for duplicate packets
// Add the additional log info for duplicate packets.
expected = expectedLogWithCount(expected, 2)

go sendMultiplePackets(antreaLogger, ob, 2)
go sendMultiplePackets(antreaLogger, ob, 2, time.Millisecond)
actual := <-mockAnpLogger.logged
assert.Contains(t, actual, expected)
}

func TestDropPacketMultiDedupLog(t *testing.T) {
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger()
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength)
ob, expected := newLogInfo("Drop")

go sendMultiplePackets(antreaLogger, ob, 10)
actual := <-mockAnpLogger.logged
assert.Contains(t, actual, expectedLogWithCount(expected, 9))
actual = <-mockAnpLogger.logged
assert.Contains(t, actual, expected)
numPackets := 4
go sendMultiplePackets(antreaLogger, ob, numPackets, 40*time.Millisecond)
// Close the channel listening for logged msg after 500ms.
time.AfterFunc(500*time.Millisecond, func() {
mockAnpLogger.mu.Lock()
defer mockAnpLogger.mu.Unlock()
close(mockAnpLogger.logged)
})

receivedPacket, countLog := 0, 0
for actual := range mockAnpLogger.logged {
assert.Contains(t, actual, expected)
countLog++
begin := strings.Index(actual, "[")
end := strings.Index(actual, " packets")
if begin == -1 {
receivedPacket += 1
} else {
countLoggedMsg, _ := strconv.Atoi(actual[(begin + 1):end])
receivedPacket += countLoggedMsg
}
}
// Test two messages are logged for all packets.
assert.Equal(t, 2, countLog)
// Test all packets are accounted for.
assert.Equal(t, numPackets, receivedPacket)
}
16 changes: 11 additions & 5 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Controller struct {
antreaPolicyEnabled bool
// statusManagerEnabled indicates whether a statusManager is configured.
statusManagerEnabled bool
// loggingEnabled indicates where Antrea policy audit logging is enabled.
loggingEnabled bool
// antreaClientProvider provides interfaces to get antreaClient, which can be
// used to watch Antrea AddressGroups, AppliedToGroups, and NetworkPolicies.
// We need to get antreaClient dynamically because the apiserver cert can be
Expand Down Expand Up @@ -103,6 +105,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
entityUpdates <-chan types.EntityReference,
antreaPolicyEnabled bool,
statusManagerEnabled bool,
loggingEnabled bool,
denyConnStore *connections.DenyConnectionStore,
asyncRuleDeleteInterval time.Duration) (*Controller, error) {
c := &Controller{
Expand All @@ -112,6 +115,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
ofClient: ofClient,
antreaPolicyEnabled: antreaPolicyEnabled,
statusManagerEnabled: statusManagerEnabled,
loggingEnabled: loggingEnabled,
denyConnStore: denyConnStore,
}
c.ruleCache = newRuleCache(c.enqueueRule, entityUpdates)
Expand All @@ -128,12 +132,14 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
if c.ofClient != nil && antreaPolicyEnabled {
// Register packetInHandler
c.ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonNP), "networkpolicy", c)
// Initiate logger for Antrea Policy audit logging
antreaPolicyLogger, err := newAntreaPolicyLogger()
if err != nil {
return nil, err
if loggingEnabled {
// Initiate logger for Antrea Policy audit logging
antreaPolicyLogger, err := newAntreaPolicyLogger()
if err != nil {
return nil, err
}
c.antreaPolicyLogger = antreaPolicyLogger
}
c.antreaPolicyLogger = antreaPolicyLogger
}

// Use nodeName to filter resources when watching resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
clientset := &fake.Clientset{}
ch := make(chan agenttypes.EntityReference, 100)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch,
true, true, nil, testAsyncDeleteInterval)
true, true, true, nil, testAsyncDeleteInterval)
reconciler := newMockReconciler()
controller.reconciler = reconciler
controller.antreaPolicyLogger = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (c *Controller) logPacket(pktIn *ofctrl.PacketIn) error {
}

// Log the ob info to corresponding file w/ deduplication
c.antreaPolicyLogger.logDedupPacket(ob, time.Second)
c.antreaPolicyLogger.LogDedupPacket(ob)
return nil
}

Expand Down