diff --git a/pkg/agent/controller/networkpolicy/audit_logging.go b/pkg/agent/controller/networkpolicy/audit_logging.go index 5077e76adc0..f0077d81239 100644 --- a/pkg/agent/controller/networkpolicy/audit_logging.go +++ b/pkg/agent/controller/networkpolicy/audit_logging.go @@ -29,6 +29,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/clock" + "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -37,8 +38,9 @@ import ( ) const ( - logfileSubdir string = "networkpolicy" - logfileName string = "np.log" + logfileSubdir string = "networkpolicy" + logfileName string = "np.log" + nullPlaceholder = "" ) // AntreaPolicyLogger is used for Antrea policy audit logging. @@ -52,18 +54,20 @@ type AntreaPolicyLogger struct { // logInfo will be set by retrieving info from packetin and register. type logInfo struct { - tableName string // name of the table sending packetin - npRef string // Network Policy name reference - ruleName string // Network Policy rule name for Antrea-native policies - logLabel string // Network Policy user-defined log label - disposition string // Allow/Drop of the rule sending packetin - ofPriority string // openflow priority of the flow sending packetin - srcIP string // source IP of the traffic logged - srcPort string // source port of the traffic logged - destIP string // destination IP of the traffic logged - destPort string // destination port of the traffic logged - pktLength uint16 // packet length of packetin - protocolStr string // protocol of the traffic logged + tableName string // name of the table sending packetin + npRef string // Network Policy name reference + ruleName string // Network Policy rule name for Antrea-native policies + direction string // Direction of the Network Policy rule (Ingress / Egress) + logLabel string // Network Policy user-defined log label + disposition string // Allow/Drop of the rule sending packetin + ofPriority string // openflow priority of the flow sending packetin + appliedToRef string // namespace and name of the Pod to which the Network Policy is applied + srcIP string // source IP of the traffic logged + srcPort string // source port of the traffic logged + destIP string // destination IP of the traffic logged + destPort string // destination port of the traffic logged + pktLength string // packet length of packetin + protocolStr string // protocol of the traffic logged } // logDedupRecord will be used as 1 sec buffer for log deduplication. @@ -125,14 +129,16 @@ func buildLogMsg(ob *logInfo) string { ob.tableName, ob.npRef, ob.ruleName, + ob.direction, ob.disposition, ob.ofPriority, + ob.appliedToRef, ob.srcIP, ob.srcPort, ob.destIP, ob.destPort, ob.protocolStr, - strconv.FormatUint(uint64(ob.pktLength), 10), + ob.pktLength, ob.logLabel, }, " ") } @@ -185,13 +191,39 @@ func newAntreaPolicyLogger() (*AntreaPolicyLogger, error) { } // getNetworkPolicyInfo fills in tableName, npName, ofPriority, disposition of logInfo ob. -func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, c *Controller, ob *logInfo) error { +func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, packet *binding.Packet, c *Controller, ob *logInfo) error { matchers := pktIn.GetMatches() var match *ofctrl.MatchField // Get table name. tableID := pktIn.TableId ob.tableName = openflow.GetFlowTableName(tableID) + var localIP string + // We use the tableID to determine the direction of the NP rule. + // The advantage of this method is that it should work for all NP types. + if isAntreaPolicyIngressTable(tableID) || tableID == openflow.IngressRuleTable.GetID() { + ob.direction = "Ingress" + localIP = packet.DestinationIP.String() + } else if isAntreaPolicyEgressTable(tableID) || tableID == openflow.EgressRuleTable.GetID() { + ob.direction = "Egress" + localIP = packet.SourceIP.String() + } else { + // this case should not be possible + klog.InfoS("Cannot determine direction of NetworkPolicy rule") + ob.direction = nullPlaceholder + } + + if localIP != "" { + iface, ok := c.ifaceStore.GetInterfaceByIP(localIP) + if ok && iface.Type == interfacestore.ContainerInterface { + ob.appliedToRef = fmt.Sprintf("%s/%s", iface.ContainerInterfaceConfig.PodNamespace, iface.ContainerInterfaceConfig.PodName) + } + } + if ob.appliedToRef == "" { + klog.InfoS("Cannot determine namespace/name of appliedTo Pod", "ip", localIP) + ob.appliedToRef = nullPlaceholder + } + // Get disposition Allow or Drop. match = getMatchRegField(matchers, openflow.APDispositionField) disposition, err := getInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange()) @@ -234,10 +266,14 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, c *Controller, ob *logInfo) er if err != nil { return fmt.Errorf("received error while unloading conjunction id from reg: %v", err) } - ob.npRef, ob.ofPriority, ob.ruleName, ob.logLabel = c.ofClient.GetPolicyInfoFromConjunction(conjID) - if ob.npRef == "" || ob.ofPriority == "" { + ok, npRef, ofPriority, ruleName, logLabel := c.ofClient.GetPolicyInfoFromConjunction(conjID) + if !ok { return fmt.Errorf("networkpolicy not found for conjunction id: %v", conjID) } + ob.npRef = npRef.ToString() + ob.ofPriority = ofPriority + ob.ruleName = ruleName + ob.logLabel = logLabel // Fill in placeholders for Antrea native policies without log labels, // K8s NetworkPolicies without rule names or log labels. fillLogInfoPlaceholders([]*string{&ob.ruleName, &ob.logLabel, &ob.ofPriority}) @@ -248,11 +284,11 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, c *Controller, ob *logInfo) er func getPacketInfo(packet *binding.Packet, ob *logInfo) { ob.srcIP = packet.SourceIP.String() ob.destIP = packet.DestinationIP.String() - ob.pktLength = packet.IPLength + ob.pktLength = strconv.FormatUint(uint64(packet.IPLength), 10) ob.protocolStr = ip.IPProtocolNumberToString(packet.IPProto, "UnknownProtocol") if ob.protocolStr == "TCP" || ob.protocolStr == "UDP" { - ob.srcPort = strconv.Itoa(int(packet.SourcePort)) - ob.destPort = strconv.Itoa(int(packet.DestinationPort)) + ob.srcPort = strconv.FormatUint(uint64(packet.SourcePort), 10) + ob.destPort = strconv.FormatUint(uint64(packet.DestinationPort), 10) } else { // Placeholders for ICMP packets without port numbers. fillLogInfoPlaceholders([]*string{&ob.srcPort, &ob.destPort}) @@ -262,7 +298,7 @@ func getPacketInfo(packet *binding.Packet, ob *logInfo) { func fillLogInfoPlaceholders(logItems []*string) { for i, v := range logItems { if *v == "" { - *logItems[i] = "" + *logItems[i] = nullPlaceholder } } } @@ -278,7 +314,7 @@ func (c *Controller) logPacket(pktIn *ofctrl.PacketIn) error { } // Set Network Policy and packet info to log. - err = getNetworkPolicyInfo(pktIn, c, ob) + err = getNetworkPolicyInfo(pktIn, packet, c, ob) if err != nil { return fmt.Errorf("received error while retrieving NetworkPolicy info: %v", err) } diff --git a/pkg/agent/controller/networkpolicy/audit_logging_test.go b/pkg/agent/controller/networkpolicy/audit_logging_test.go index e47fcd478ce..bec6b280ad9 100644 --- a/pkg/agent/controller/networkpolicy/audit_logging_test.go +++ b/pkg/agent/controller/networkpolicy/audit_logging_test.go @@ -34,8 +34,11 @@ import ( "k8s.io/utils/clock" clocktesting "k8s.io/utils/clock/testing" + "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" openflowtesting "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/util/ip" ) @@ -48,6 +51,16 @@ var ( actionAllow = openflow.DispositionToString[openflow.DispositionAllow] actionDrop = openflow.DispositionToString[openflow.DispositionDrop] actionRedirect = "Redirect" + testANPRef = &v1beta2.NetworkPolicyReference{ + Type: v1beta2.AntreaNetworkPolicy, + Namespace: "default", + Name: "test", + } + testK8sNPRef = &v1beta2.NetworkPolicyReference{ + Type: v1beta2.K8sNetworkPolicy, + Namespace: "default", + Name: "test", + } ) // mockLogger implements io.Writer. @@ -81,7 +94,7 @@ func newTestAntreaPolicyLogger(bufferLength time.Duration, clock clock.Clock) (* func newLogInfo(disposition string) (*logInfo, string) { testLogInfo := &logInfo{ tableName: openflow.AntreaPolicyIngressRuleTable.GetName(), - npRef: "AntreaNetworkPolicy:default/test", + npRef: testANPRef.ToString(), ruleName: "test-rule", logLabel: "test-label", ofPriority: "0", @@ -91,7 +104,7 @@ func newLogInfo(disposition string) (*logInfo, string) { destIP: "1.1.1.1", destPort: "80", protocolStr: "TCP", - pktLength: 60, + pktLength: "60", } return testLogInfo, buildLogMsg(testLogInfo) } @@ -198,20 +211,46 @@ func TestGetNetworkPolicyInfo(t *testing.T) { prepareMockOFTablesWithCache() generateMatch := func(regID int, data []byte) openflow15.MatchField { return openflow15.MatchField{ - Class: openflow15.OXM_CLASS_PACKET_REGS, + Class: openflow15.OXM_CLASS_PACKET_REGS, + // convert reg (4-byte) ID to xreg (8-byte) ID Field: uint8(regID / 2), HasMask: false, Value: &openflow15.ByteArrayField{Data: data}, } } - testANPRef := "AntreaNetworkPolicy:default/test-anp" - testK8sRef := "K8sNetworkPolicy:default/test-anp" - testPriority, testRule, testLogLabel, defaultLog := "61800", "test-rule", "test-log-label", "" + testPriority, testRule, testLogLabel := "61800", "test-rule", "test-log-label" + // only need 4 bytes of register data for the disposition + // this will go into the openflow.APDispositionField register allowDispositionData := []byte{0x11, 0x00, 0x00, 0x11} dropCNPDispositionData := []byte{0x11, 0x00, 0x0c, 0x11} dropK8sDispositionData := []byte{0x11, 0x00, 0x08, 0x11} redirectDispositionData := []byte{0x11, 0x10, 0x00, 0x11} - ingressData := []byte{0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11} + // need 8 bytes (full register) of data for the conjunction + // this will be used for one of the following registers depending on the test case: + // openflow.APConjIDField, openflow.TFEgressConjIDField, openflow.TFIngressConjIDField + // the data itself is not relevant + conjunctionData := []byte{0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11} + srcIP := net.ParseIP("192.168.1.1") + destIP := net.ParseIP("192.168.1.2") + testPacket := &binding.Packet{ + SourceIP: srcIP, + DestinationIP: destIP, + } + + ifaceStore := interfacestore.NewInterfaceStore() + ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: util.GenerateContainerInterfaceName("srcPod", "default", "c1"), + IPs: []net.IP{srcIP}, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: "srcPod", PodNamespace: "default", ContainerID: "c1"}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 1}, + }) + ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: util.GenerateContainerInterfaceName("destPod", "default", "c2"), + IPs: []net.IP{destIP}, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: "destPod", PodNamespace: "default", ContainerID: "c2"}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 2}, + }) + tests := []struct { name string tableID uint8 @@ -222,20 +261,41 @@ func TestGetNetworkPolicyInfo(t *testing.T) { wantErr error }{ { - name: "ANP Allow", + name: "ANP Allow Ingress", tableID: openflow.AntreaPolicyIngressRuleTable.GetID(), expectedCalls: func(mockClient *openflowtesting.MockClientMockRecorder) { mockClient.GetPolicyInfoFromConjunction(gomock.Any()).Return( - testANPRef, testPriority, testRule, testLogLabel) + true, testANPRef, testPriority, testRule, testLogLabel) + }, + dispositionData: allowDispositionData, + wantOb: &logInfo{ + tableName: openflow.AntreaPolicyIngressRuleTable.GetName(), + disposition: actionAllow, + npRef: testANPRef.ToString(), + ofPriority: testPriority, + ruleName: testRule, + direction: "Ingress", + appliedToRef: "default/destPod", + logLabel: testLogLabel, + }, + }, + { + name: "ANP Allow Egress", + tableID: openflow.AntreaPolicyEgressRuleTable.GetID(), + expectedCalls: func(mockClient *openflowtesting.MockClientMockRecorder) { + mockClient.GetPolicyInfoFromConjunction(gomock.Any()).Return( + true, testANPRef, testPriority, testRule, testLogLabel) }, dispositionData: allowDispositionData, wantOb: &logInfo{ - tableName: openflow.AntreaPolicyIngressRuleTable.GetName(), - disposition: actionAllow, - npRef: testANPRef, - ofPriority: testPriority, - ruleName: testRule, - logLabel: testLogLabel, + tableName: openflow.AntreaPolicyEgressRuleTable.GetName(), + disposition: actionAllow, + npRef: testANPRef.ToString(), + ofPriority: testPriority, + ruleName: testRule, + direction: "Egress", + appliedToRef: "default/srcPod", + logLabel: testLogLabel, }, }, { @@ -243,16 +303,18 @@ func TestGetNetworkPolicyInfo(t *testing.T) { tableID: openflow.IngressRuleTable.GetID(), expectedCalls: func(mockClient *openflowtesting.MockClientMockRecorder) { mockClient.GetPolicyInfoFromConjunction(gomock.Any()).Return( - testK8sRef, testPriority, "", "") + true, testK8sNPRef, testPriority, "", "") }, dispositionData: allowDispositionData, wantOb: &logInfo{ - tableName: openflow.IngressRuleTable.GetName(), - disposition: actionAllow, - npRef: testK8sRef, - ofPriority: testPriority, - ruleName: defaultLog, - logLabel: defaultLog, + tableName: openflow.IngressRuleTable.GetName(), + disposition: actionAllow, + npRef: testK8sNPRef.ToString(), + ofPriority: testPriority, + ruleName: nullPlaceholder, + direction: "Ingress", + appliedToRef: "default/destPod", + logLabel: nullPlaceholder, }, }, { @@ -260,16 +322,18 @@ func TestGetNetworkPolicyInfo(t *testing.T) { tableID: openflow.AntreaPolicyIngressRuleTable.GetID(), expectedCalls: func(mockClient *openflowtesting.MockClientMockRecorder) { mockClient.GetPolicyInfoFromConjunction(gomock.Any()).Return( - testANPRef, testPriority, testRule, testLogLabel) + true, testANPRef, testPriority, testRule, testLogLabel) }, dispositionData: dropCNPDispositionData, wantOb: &logInfo{ - tableName: openflow.AntreaPolicyIngressRuleTable.GetName(), - disposition: actionDrop, - npRef: testANPRef, - ofPriority: testPriority, - ruleName: testRule, - logLabel: testLogLabel, + tableName: openflow.AntreaPolicyIngressRuleTable.GetName(), + disposition: actionDrop, + npRef: testANPRef.ToString(), + ofPriority: testPriority, + ruleName: testRule, + direction: "Ingress", + appliedToRef: "default/destPod", + logLabel: testLogLabel, }, }, { @@ -277,12 +341,14 @@ func TestGetNetworkPolicyInfo(t *testing.T) { tableID: openflow.IngressDefaultTable.GetID(), dispositionData: dropK8sDispositionData, wantOb: &logInfo{ - tableName: openflow.IngressDefaultTable.GetName(), - disposition: actionDrop, - npRef: "K8sNetworkPolicy", - ofPriority: defaultLog, - ruleName: defaultLog, - logLabel: defaultLog, + tableName: openflow.IngressDefaultTable.GetName(), + disposition: actionDrop, + npRef: "K8sNetworkPolicy", + ofPriority: nullPlaceholder, + ruleName: nullPlaceholder, + direction: "Ingress", + appliedToRef: "default/destPod", + logLabel: nullPlaceholder, }, }, { @@ -290,16 +356,18 @@ func TestGetNetworkPolicyInfo(t *testing.T) { tableID: openflow.AntreaPolicyIngressRuleTable.GetID(), expectedCalls: func(mockClient *openflowtesting.MockClientMockRecorder) { mockClient.GetPolicyInfoFromConjunction(gomock.Any()).Return( - testANPRef, testPriority, testRule, testLogLabel) + true, testANPRef, testPriority, testRule, testLogLabel) }, dispositionData: redirectDispositionData, wantOb: &logInfo{ - tableName: openflow.AntreaPolicyIngressRuleTable.GetName(), - disposition: actionRedirect, - npRef: testANPRef, - ofPriority: testPriority, - ruleName: testRule, - logLabel: testLogLabel, + tableName: openflow.AntreaPolicyIngressRuleTable.GetName(), + disposition: actionRedirect, + npRef: testANPRef.ToString(), + ofPriority: testPriority, + ruleName: testRule, + direction: "Ingress", + appliedToRef: "default/destPod", + logLabel: testLogLabel, }, }, } @@ -311,11 +379,15 @@ func TestGetNetworkPolicyInfo(t *testing.T) { matchers := []openflow15.MatchField{dispositionMatch} // Inject ingress/egress match when case is not K8s default drop. if tc.expectedCalls != nil { - regID := openflow.TFIngressConjIDField.GetRegID() + var regID int if tc.wantOb.disposition == actionDrop { regID = openflow.APConjIDField.GetRegID() + } else if tc.wantOb.direction == "Ingress" { + regID = openflow.TFIngressConjIDField.GetRegID() + } else { + regID = openflow.TFEgressConjIDField.GetRegID() } - ingressMatch := generateMatch(regID, ingressData) + ingressMatch := generateMatch(regID, conjunctionData) matchers = append(matchers, ingressMatch) } pktIn := &ofctrl.PacketIn{ @@ -331,9 +403,12 @@ func TestGetNetworkPolicyInfo(t *testing.T) { if tc.expectedCalls != nil { tc.expectedCalls(testClientInterface.EXPECT()) } - c := &Controller{ofClient: testClientInterface} + c := &Controller{ + ofClient: testClientInterface, + ifaceStore: ifaceStore, + } tc.ob = new(logInfo) - gotErr := getNetworkPolicyInfo(pktIn, c, tc.ob) + gotErr := getNetworkPolicyInfo(pktIn, testPacket, c, tc.ob) assert.Equal(t, tc.wantOb, tc.ob) assert.Equal(t, tc.wantErr, gotErr) }) @@ -363,7 +438,7 @@ func TestGetPacketInfo(t *testing.T) { destIP: "1.1.1.1", destPort: "80", protocolStr: "TCP", - pktLength: 60, + pktLength: "60", }, }, { @@ -380,7 +455,7 @@ func TestGetPacketInfo(t *testing.T) { destIP: "1.1.1.1", destPort: "", protocolStr: "ICMP", - pktLength: 60, + pktLength: "60", }, }, } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 5ac8d30de6d..60c0df4b66c 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -29,7 +29,8 @@ import ( "antrea.io/antrea/pkg/agent/openflow/cookie" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" - "antrea.io/antrea/pkg/apis/crd/v1alpha2" + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" + crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" binding "antrea.io/antrea/pkg/ovs/openflow" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/third_party/proxy" @@ -210,8 +211,10 @@ type Client interface { // UninstallTraceflowFlows uninstalls flows for a Traceflow request. UninstallTraceflowFlows(dataplaneTag uint8) error - // Find Network Policy reference and OFpriority by conjunction ID. - GetPolicyInfoFromConjunction(ruleID uint32) (string, string, string, string) + // GetPolicyInfoFromConjunction returns the following policy information for the provided conjunction ID: + // NetworkPolicy reference, OF priority, rule name, label + // The boolean return value indicates whether the policy information was found. + GetPolicyInfoFromConjunction(ruleID uint32) (bool, *v1beta2.NetworkPolicyReference, string, string, string) // RegisterPacketInHandler uses SubscribePacketIn to get PacketIn message and process received // packets through registered handler. @@ -307,7 +310,7 @@ type Client interface { igmp ofutil.Message) error // InstallTrafficControlMarkFlows installs the flows to mark the packets for a traffic control rule. - InstallTrafficControlMarkFlows(name string, sourceOFPorts []uint32, targetOFPort uint32, direction v1alpha2.Direction, action v1alpha2.TrafficControlAction) error + InstallTrafficControlMarkFlows(name string, sourceOFPorts []uint32, targetOFPort uint32, direction crdv1alpha2.Direction, action crdv1alpha2.TrafficControlAction) error // UninstallTrafficControlMarkFlows removes the flows for a traffic control rule. UninstallTrafficControlMarkFlows(name string) error @@ -1358,7 +1361,7 @@ func (c *client) SendIGMPQueryPacketOut( return c.bridge.SendPacketOut(packetOutObj) } -func (c *client) InstallTrafficControlMarkFlows(name string, sourceOFPorts []uint32, targetOFPort uint32, direction v1alpha2.Direction, action v1alpha2.TrafficControlAction) error { +func (c *client) InstallTrafficControlMarkFlows(name string, sourceOFPorts []uint32, targetOFPort uint32, direction crdv1alpha2.Direction, action crdv1alpha2.TrafficControlAction) error { flows := c.featurePodConnectivity.trafficControlMarkFlows(sourceOFPorts, targetOFPort, direction, action) cacheKey := fmt.Sprintf("tc_%s", name) c.replayMutex.RLock() diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index 23e3723cb5a..d43c70af2c3 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -1537,16 +1537,16 @@ func (f *featureNetworkPolicy) getPolicyRuleConjunction(ruleID uint32) *policyRu return conj.(*policyRuleConjunction) } -func (c *client) GetPolicyInfoFromConjunction(ruleID uint32) (string, string, string, string) { +func (c *client) GetPolicyInfoFromConjunction(ruleID uint32) (bool, *v1beta2.NetworkPolicyReference, string, string, string) { conjunction := c.featureNetworkPolicy.getPolicyRuleConjunction(ruleID) - if conjunction == nil { - return "", "", "", "" + if conjunction == nil || conjunction.npRef == nil { + return false, nil, "", "", "" } priorities := conjunction.ActionFlowPriorities() if len(priorities) == 0 { - return "", "", "", "" + return false, nil, "", "", "" } - return conjunction.npRef.ToString(), priorities[0], conjunction.ruleName, conjunction.ruleLogLabel + return true, conjunction.npRef, priorities[0], conjunction.ruleName, conjunction.ruleLogLabel } // UninstallPolicyRuleFlows removes the Openflow entry relevant to the specified NetworkPolicy rule. diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 4a95f43f587..ed803b7002a 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -1320,6 +1320,7 @@ func TestClient_GetPolicyInfoFromConjunction(t *testing.T) { tests := []struct { name string ruleID uint32 + valid bool wantNpRef string wantPriority string wantRuleName string @@ -1328,14 +1329,17 @@ func TestClient_GetPolicyInfoFromConjunction(t *testing.T) { { name: "conjunction not found", ruleID: uint32(100), + valid: false, }, { name: "conjunction empty priorities", ruleID: ruleID1, + valid: false, }, { name: "conjunction no error", ruleID: ruleID2, + valid: true, wantNpRef: "K8sNetworkPolicy:ns1/np1", wantPriority: "100", wantRuleName: fmt.Sprint(ruleID2), @@ -1345,11 +1349,14 @@ func TestClient_GetPolicyInfoFromConjunction(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - gotNpRef, gotPriority, gotRuleName, gotRuleLogLabel := c.GetPolicyInfoFromConjunction(tc.ruleID) - assert.Equal(t, tc.wantNpRef, gotNpRef) - assert.Equal(t, tc.wantPriority, gotPriority) - assert.Equal(t, tc.wantRuleName, gotRuleName) - assert.Equal(t, tc.wantRuleLogLabel, gotRuleLogLabel) + ok, gotNpRef, gotPriority, gotRuleName, gotRuleLogLabel := c.GetPolicyInfoFromConjunction(tc.ruleID) + require.Equal(t, tc.valid, ok) + if tc.valid { + assert.Equal(t, tc.wantNpRef, gotNpRef) + assert.Equal(t, tc.wantPriority, gotPriority) + assert.Equal(t, tc.wantRuleName, gotRuleName) + assert.Equal(t, tc.wantRuleLogLabel, gotRuleLogLabel) + } }) } } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 96a3de9e144..19845fa3764 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -22,6 +22,7 @@ package testing import ( config "antrea.io/antrea/pkg/agent/config" types "antrea.io/antrea/pkg/agent/types" + v1beta2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" v1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" openflow "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" @@ -199,14 +200,15 @@ func (mr *MockClientMockRecorder) GetPodFlowKeys(arg0 interface{}) *gomock.Call } // GetPolicyInfoFromConjunction mocks base method -func (m *MockClient) GetPolicyInfoFromConjunction(arg0 uint32) (string, string, string, string) { +func (m *MockClient) GetPolicyInfoFromConjunction(arg0 uint32) (bool, *v1beta2.NetworkPolicyReference, string, string, string) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetPolicyInfoFromConjunction", arg0) - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(string) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(*v1beta2.NetworkPolicyReference) ret2, _ := ret[2].(string) ret3, _ := ret[3].(string) - return ret0, ret1, ret2, ret3 + ret4, _ := ret[4].(string) + return ret0, ret1, ret2, ret3, ret4 } // GetPolicyInfoFromConjunction indicates an expected call of GetPolicyInfoFromConjunction diff --git a/test/e2e/antreaipam_anp_test.go b/test/e2e/antreaipam_anp_test.go index db51a45d1cd..ebd61081635 100644 --- a/test/e2e/antreaipam_anp_test.go +++ b/test/e2e/antreaipam_anp_test.go @@ -65,7 +65,7 @@ func initializeAntreaIPAM(t *testing.T, data *TestData) { failOnError(err, t) ips, err := k8sUtils.Bootstrap(namespaces, pods, false) failOnError(err, t) - podIPs = *ips + podIPs = ips } func TestAntreaIPAMAntreaPolicy(t *testing.T) { diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index 7659b449ad7..7f69d2547f7 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/utils/strings/slices" "antrea.io/antrea/pkg/agent/apiserver/handlers/podinterface" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" @@ -142,7 +141,7 @@ func initialize(t *testing.T, data *TestData) { failOnError(err, t) ips, err := k8sUtils.Bootstrap(namespaces, pods, true) failOnError(err, t) - podIPs = *ips + podIPs = ips } func skipIfAntreaPolicyDisabled(tb testing.TB) { @@ -2625,26 +2624,119 @@ func testANPMultipleAppliedTo(t *testing.T, data *TestData, singleRule bool) { failOnError(k8sUtils.DeleteANP(builder.Namespace, builder.Name), t) } +// auditLogMatcher is used to validate that audit logs are as expected. It converts input parameters +// provided by test cases into regexes that are used to match the content of the audit logs file. +type auditLogMatcher struct { + npRef string + ruleName string + direction string + disposition string + logLabel string + priorityRe string + + matchers []*regexp.Regexp +} + +func NewAuditLogMatcher(npRef, ruleName, direction, disposition string) *auditLogMatcher { + priorityRe := `[0-9]+` + if npRef == "K8sNetworkPolicy" { + // K8s NP default drop (isolated behavior): there is no priority + priorityRe = "" + } + return &auditLogMatcher{ + npRef: npRef, + ruleName: ruleName, + direction: direction, + disposition: disposition, + logLabel: "", + priorityRe: priorityRe, + matchers: make([]*regexp.Regexp, 0), + } +} + +func (m *auditLogMatcher) WithLogLabel(logLabel string) *auditLogMatcher { + m.logLabel = logLabel + return m +} + +func (m *auditLogMatcher) add(appliedToRef, srcIP, destIP string, destPort int32) { + re := regexp.MustCompile(strings.Join([]string{ + m.npRef, + m.ruleName, + m.direction, + m.disposition, + m.priorityRe, + appliedToRef, + srcIP, + `[0-9]+`, // srcPort + destIP, + strconv.Itoa(int(destPort)), + "TCP", // all AuditLogging tests use TCP + `[0-9]+`, // pktLength + m.logLabel, + }, " ")) + m.matchers = append(m.matchers, re) +} + +func (m *auditLogMatcher) AddProbe(appliedToRef, ns1, pod1, ns2, pod2 string, destPort int32) { + srcIPs, _ := podIPs[fmt.Sprintf("%s/%s", ns1, pod1)] + destIPs, _ := podIPs[fmt.Sprintf("%s/%s", ns2, pod2)] + for _, srcIP := range srcIPs { + for _, destIP := range destIPs { + // only look for an entry in the audit log file if srcIP and dstIP are of the same family + if IPFamily(srcIP) != IPFamily(destIP) { + continue + } + m.add(appliedToRef, srcIP, destIP, destPort) + } + } +} + +func (m *auditLogMatcher) AddProbeAddr(appliedToRef, ns, pod, destIP string, destPort int32) { + srcIPs, _ := podIPs[fmt.Sprintf("%s/%s", ns, pod)] + for _, srcIP := range srcIPs { + // only look for an entry in the audit log file if srcIP and dstIP are of the same family + if IPFamily(srcIP) != IPFamily(destIP) { + continue + } + m.add(appliedToRef, srcIP, destIP, destPort) + } +} + +func (m *auditLogMatcher) Matchers() []*regexp.Regexp { + return m.matchers +} + // testAuditLoggingBasic tests that audit logs are generated when egress drop applied func testAuditLoggingBasic(t *testing.T, data *TestData) { - npRef := "test-log-acnp-deny" + npName := "test-log-acnp-deny" ruleName := "DropToZ" logLabel := "testLogLabel" builder := &ClusterNetworkPolicySpecBuilder{} - builder = builder.SetName(npRef). + builder = builder.SetName(npName). SetPriority(1.0). SetAppliedToGroup([]ACNPAppliedToSpec{{PodSelector: map[string]string{"pod": "a"}, NSSelector: map[string]string{"ns": namespaces["x"]}}}) builder.AddEgress(ProtocolTCP, &p80, nil, nil, nil, nil, nil, nil, nil, nil, map[string]string{"ns": namespaces["z"]}, nil, nil, false, nil, crdv1alpha1.RuleActionDrop, "", ruleName, nil) builder.AddEgressLogging(logLabel) + npRef := fmt.Sprintf("AntreaClusterNetworkPolicy:%s", npName) acnp, err := k8sUtils.CreateOrUpdateACNP(builder.Get()) failOnError(err, t) failOnError(data.waitForACNPRealized(t, acnp.Name, policyRealizedTimeout), t) + podXA, err := k8sUtils.GetPodByLabel(namespaces["x"], "a") + if err != nil { + t.Errorf("Failed to get Pod in Namespace x with label 'pod=a': %v", err) + } + + matcher := NewAuditLogMatcher(npRef, ruleName, "Egress", "Drop").WithLogLabel(logLabel) + appliedToRef := fmt.Sprintf("%s/%s", podXA.Namespace, podXA.Name) + // generate some traffic that will be dropped by test-log-acnp-deny var wg sync.WaitGroup oneProbe := func(ns1, pod1, ns2, pod2 string) { + matcher.AddProbe(appliedToRef, ns1, pod1, ns2, pod2, p80) wg.Add(1) go func() { defer wg.Done() @@ -2656,18 +2748,9 @@ func testAuditLoggingBasic(t *testing.T, data *TestData) { oneProbe(namespaces["x"], "a", namespaces["z"], "c") wg.Wait() - podXA, err := k8sUtils.GetPodByLabel(namespaces["x"], "a") - if err != nil { - t.Errorf("Failed to get Pod in Namespace x with label 'pod=a': %v", err) - } // nodeName is guaranteed to be set at this stage, since the framework waits for all Pods to be in Running phase nodeName := podXA.Spec.NodeName - srcIPs, _ := podIPs[namespaces["x"]+"/a"] - destIPs := append(podIPs[namespaces["z"]+"/a"], append(podIPs[namespaces["z"]+"/b"], podIPs[namespaces["z"]+"/c"]...)...) - expectedLogPrefix := func(_ string) string { - return npRef + ` ` + ruleName + ` Drop [0-9]+ ` - } - checkAuditLoggingResult(t, data, nodeName, npRef, srcIPs, destIPs, expectedLogPrefix) + checkAuditLoggingResult(t, data, nodeName, npRef, matcher.Matchers()) failOnError(k8sUtils.CleanACNPs(), t) } @@ -2678,48 +2761,48 @@ func testAuditLoggingEnableK8s(t *testing.T, data *TestData) { failOnError(data.updateNamespaceWithAnnotations(namespaces["x"], map[string]string{networkpolicy.EnableNPLoggingAnnotationKey: "true"}), t) // Add a K8s namespaced NetworkPolicy in ns x that allow ingress traffic from // Pod x/b to x/a which default denies other ingress including from Pod x/c to x/a - npRef := "allow-x-b-to-x-a" + npName := "allow-x-b-to-x-a" k8sNPBuilder := &NetworkPolicySpecBuilder{} - k8sNPBuilder = k8sNPBuilder.SetName(namespaces["x"], npRef). + k8sNPBuilder = k8sNPBuilder.SetName(namespaces["x"], npName). SetPodSelector(map[string]string{"pod": "a"}). SetTypeIngress(). AddIngress(v1.ProtocolTCP, &p80, nil, nil, nil, map[string]string{"pod": "b"}, nil, nil, nil) + npRef := fmt.Sprintf("K8sNetworkPolicy:%s/%s", namespaces["x"], npName) knp, err := k8sUtils.CreateOrUpdateNetworkPolicy(k8sNPBuilder.Get()) failOnError(err, t) failOnError(waitForResourceReady(t, timeout, knp), t) + podXA, err := k8sUtils.GetPodByLabel(namespaces["x"], "a") + if err != nil { + t.Errorf("Failed to get Pod in Namespace x with label 'pod=a': %v", err) + } + + // matcher1 is for connections allowed by the K8s NP + matcher1 := NewAuditLogMatcher(npRef, "", "Ingress", "Allow") + // matcher2 is for connections dropped by the isolated behavior of the K8s NP + matcher2 := NewAuditLogMatcher("K8sNetworkPolicy", "", "Ingress", "Drop") + + appliedToRef := fmt.Sprintf("%s/%s", podXA.Namespace, podXA.Name) + // generate some traffic that will be dropped by implicit K8s policy drop var wg sync.WaitGroup - oneProbe := func(ns1, pod1, ns2, pod2 string) { + oneProbe := func(ns1, pod1, ns2, pod2 string, matcher *auditLogMatcher) { + matcher.AddProbe(appliedToRef, ns1, pod1, ns2, pod2, p80) wg.Add(1) go func() { defer wg.Done() k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP, nil, nil) }() } - oneProbe(namespaces["x"], "b", namespaces["x"], "a") - oneProbe(namespaces["x"], "c", namespaces["x"], "a") + oneProbe(namespaces["x"], "b", namespaces["x"], "a", matcher1) + oneProbe(namespaces["x"], "c", namespaces["x"], "a", matcher2) wg.Wait() - podXA, err := k8sUtils.GetPodByLabel(namespaces["x"], "a") - if err != nil { - t.Errorf("Failed to get Pod in Namespace x with label 'pod=a': %v", err) - } // nodeName is guaranteed to be set at this stage, since the framework waits for all Pods to be in Running phase nodeName := podXA.Spec.NodeName - srcIPs := append(podIPs[namespaces["x"]+"/b"], podIPs[namespaces["x"]+"/c"]...) - destIPs, _ := podIPs[namespaces["x"]+"/a"] - expectedLogPrefix := func(srcIP string) string { - if slices.Contains(podIPs[namespaces["x"]+"/b"], srcIP) { - return npRef + " Allow [0-9]+ " - } else if slices.Contains(podIPs[namespaces["x"]+"/c"], srcIP) { - return "K8sNetworkPolicy Drop " - } - return "" - } - checkAuditLoggingResult(t, data, nodeName, "K8sNetworkPolicy", srcIPs, destIPs, expectedLogPrefix) + checkAuditLoggingResult(t, data, nodeName, "K8sNetworkPolicy", append(matcher1.Matchers(), matcher2.Matchers()...)) failOnError(k8sUtils.DeleteNetworkPolicy(namespaces["x"], "allow-x-b-to-x-a"), t) failOnError(data.UpdateNamespace(namespaces["x"], func(namespace *v1.Namespace) { @@ -2739,11 +2822,11 @@ func testAuditLoggingK8sService(t *testing.T, data *TestData) { } serverNode := podXA.Spec.NodeName serviceName := "nginx" - _, serverIP, nginxCleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "test-server-", serverNode, namespaces["x"], false) + serverPodName, serverIP, nginxCleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "test-server-", serverNode, namespaces["x"], false) defer nginxCleanupFunc() serverPort := int32(80) ipFamily := v1.IPv4Protocol - if !strings.Contains(podIPs[namespaces["x"]+"/a"][0], ".") { + if IPFamily(podIPs[namespaces["x"]+"/a"][0]) == "v6" { ipFamily = v1.IPv6Protocol } service, err := data.CreateService(serviceName, namespaces["x"], serverPort, serverPort, map[string]string{"app": "nginx"}, false, false, v1.ServiceTypeClusterIP, &ipFamily) @@ -2754,48 +2837,46 @@ func testAuditLoggingK8sService(t *testing.T, data *TestData) { // Add a K8s namespaced NetworkPolicy in ns x that allow ingress traffic from // Pod x/a to service nginx which default denies other ingress including from Pod x/b to service nginx - npRef := "allow-xa-to-service" + npName := "allow-xa-to-service" k8sNPBuilder := &NetworkPolicySpecBuilder{} - k8sNPBuilder = k8sNPBuilder.SetName(namespaces["x"], npRef). + k8sNPBuilder = k8sNPBuilder.SetName(namespaces["x"], npName). SetPodSelector(map[string]string{"app": serviceName}). SetTypeIngress(). AddIngress(v1.ProtocolTCP, &p80, nil, nil, nil, map[string]string{"pod": "a"}, nil, nil, nil) + npRef := fmt.Sprintf("K8sNetworkPolicy:%s/%s", namespaces["x"], npName) knp, err := k8sUtils.CreateOrUpdateNetworkPolicy(k8sNPBuilder.Get()) failOnError(err, t) failOnError(waitForResourceReady(t, timeout, knp), t) + // matcher1 is for connections allowed by the K8s NP + matcher1 := NewAuditLogMatcher(npRef, "", "Ingress", "Allow") + // matcher2 is for connections dropped by the isolated behavior of the K8s NP + matcher2 := NewAuditLogMatcher("K8sNetworkPolicy", "", "Ingress", "Drop") + + appliedToRef := fmt.Sprintf("%s/%s", namespaces["x"], serverPodName) + // generate some traffic that wget the nginx service var wg sync.WaitGroup - oneProbe := func(pod *v1.Pod) { - wg.Add(1) - go func() { - defer wg.Done() - data.runWgetCommandFromTestPodWithRetry(pod.Name, pod.Namespace, pod.Spec.Containers[0].Name, serviceName, 5) - }() - } - oneProbe(podXA) - podXB, err := k8sUtils.GetPodByLabel(namespaces["x"], "b") - if err != nil { - t.Errorf("Failed to get Pod in Namespace x with label 'pod=b': %v", err) + oneProbe := func(ns, pod string, matcher *auditLogMatcher) { + for _, ip := range serverIP.ipStrings { + ip := ip + matcher.AddProbeAddr(appliedToRef, ns, pod, ip, serverPort) + wg.Add(1) + go func() { + defer wg.Done() + k8sUtils.ProbeAddr(ns, "pod", pod, ip, serverPort, ProtocolTCP, nil) + }() + } } - oneProbe(podXB) + oneProbe(namespaces["x"], "a", matcher1) + oneProbe(namespaces["x"], "b", matcher2) wg.Wait() - srcIPs := []string{podIPs[namespaces["x"]+"/a"][0], podIPs[namespaces["x"]+"/b"][0]} - destIPs := serverIP.ipStrings - expectedLogPrefix := func(srcIP string) string { - if slices.Contains(podIPs[namespaces["x"]+"/a"], srcIP) { - return npRef + " Allow [0-9]+ " - } else if slices.Contains(podIPs[namespaces["x"]+"/b"], srcIP) { - return "K8sNetworkPolicy Drop " - } - return "" - } - checkAuditLoggingResult(t, data, serverNode, "K8sNetworkPolicy", srcIPs, destIPs, expectedLogPrefix) + checkAuditLoggingResult(t, data, serverNode, "K8sNetworkPolicy", append(matcher1.Matchers(), matcher2.Matchers()...)) - failOnError(k8sUtils.DeleteNetworkPolicy(namespaces["x"], npRef), t) + failOnError(k8sUtils.DeleteNetworkPolicy(namespaces["x"], npName), t) failOnError(data.UpdateNamespace(namespaces["x"], func(namespace *v1.Namespace) { delete(namespace.Annotations, networkpolicy.EnableNPLoggingAnnotationKey) }), t) @@ -3046,10 +3127,14 @@ func testACNPNestedIPBlockClusterGroupCreateAndUpdate(t *testing.T) { podXAIP, _ := podIPs[namespaces["x"]+"/a"] podXBIP, _ := podIPs[namespaces["x"]+"/b"] genCIDR := func(ip string) string { - if strings.Contains(ip, ".") { + switch IPFamily(ip) { + case "v4": return ip + "/32" + case "v6": + return ip + "/128" + default: + return "" } - return ip + "/128" } cg1Name, cg2Name, cg3Name := "cg-x-a-ipb", "cg-x-b-ipb", "cg-select-x-c" cgParentName := "cg-parent" @@ -4055,7 +4140,8 @@ func testACNPMulticastEgress(t *testing.T, data *TestData, acnpName, caseName, g } } -func checkAuditLoggingResult(t *testing.T, data *TestData, nodeName, logLocator string, srcIPs, destIPs []string, expectedLogPrefix func(string) string) { +// the logMatcher parameter takes as input the srcIP and destIP of the connection and returns a regex that should match the log entry +func checkAuditLoggingResult(t *testing.T, data *TestData, nodeName, logLocator string, matchers []*regexp.Regexp) { antreaPodName, err := data.getAntreaPodOnNode(nodeName) if err != nil { t.Errorf("Error occurred when trying to get the Antrea Agent Pod running on Node %s: %v", nodeName, err) @@ -4074,27 +4160,17 @@ func checkAuditLoggingResult(t *testing.T, data *TestData, nodeName, logLocator return false, nil } - var expectedNumEntries, actualNumEntries int - for _, srcIP := range srcIPs { - for _, destIP := range destIPs { - // only look for an entry in the audit log file if srcIP and - // dstIP are of the same family - if strings.Contains(srcIP, ".") != strings.Contains(destIP, ".") { - continue - } - expectedNumEntries += 1 - // The audit log should contain log entry `... ...` - re := regexp.MustCompile(expectedLogPrefix(srcIP) + srcIP + ` [0-9]+ ` + destIP + ` ` + strconv.Itoa(int(p80))) - if re.MatchString(stdout) { - actualNumEntries += 1 - } else { - t.Logf("Audit log does not contain expected entry from client (%s) to server (%s)", srcIP, destIP) - } - break + var numEntries int + for _, re := range matchers { + t.Logf("Checking for expected entry: %s", re.String()) + if re.MatchString(stdout) { + numEntries += 1 + } else { + t.Logf("Audit log does not contain expected entry: %s", re.String()) } } - if actualNumEntries != expectedNumEntries { - t.Logf("Missing entries in audit log: expected %d but found %d", expectedNumEntries, actualNumEntries) + if numEntries != len(matchers) { + t.Logf("Missing entries in audit log: expected %d but found %d", len(matchers), numEntries) return false, nil } return true, nil diff --git a/test/e2e/k8s_util.go b/test/e2e/k8s_util.go index ee1696146ee..4fa016392f0 100644 --- a/test/e2e/k8s_util.go +++ b/test/e2e/k8s_util.go @@ -1202,7 +1202,7 @@ func (k *KubernetesUtils) ValidateRemoteCluster(remoteCluster *KubernetesUtils, } } -func (k *KubernetesUtils) Bootstrap(namespaces map[string]string, pods []string, createNamespaces bool) (*map[string][]string, error) { +func (k *KubernetesUtils) Bootstrap(namespaces map[string]string, pods []string, createNamespaces bool) (map[string][]string, error) { for _, ns := range namespaces { if createNamespaces { _, err := k.CreateOrUpdateNamespace(ns, map[string]string{"ns": ns}) @@ -1240,7 +1240,7 @@ func (k *KubernetesUtils) Bootstrap(namespaces map[string]string, pods []string, return nil, err } - return &podIPs, nil + return podIPs, nil } func (k *KubernetesUtils) Cleanup(namespaces map[string]string) { diff --git a/test/e2e/util.go b/test/e2e/util.go index 1f3c69b045d..c26693b0555 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -16,6 +16,7 @@ package e2e import ( "io" + "net" "os" "time" @@ -44,3 +45,14 @@ func timeCost() func(string) { klog.Infof("Confirming %s status costs %v", status, tc) } } + +func IPFamily(ip string) string { + switch { + case net.ParseIP(ip).To4() != nil: + return "v4" + case net.ParseIP(ip).To16() != nil: + return "v6" + default: + return "" + } +}