From 8c6a4a0253c9d09850f3afe258e3a33bc6454323 Mon Sep 17 00:00:00 2001 From: graysonwu Date: Fri, 30 Sep 2022 15:32:55 -0700 Subject: [PATCH] Address comment Signed-off-by: graysonwu --- test/e2e/antreapolicy_test.go | 109 +++++++++++++----------- test/integration/agent/openflow_test.go | 87 +++++++++++++++++-- 2 files changed, 142 insertions(+), 54 deletions(-) diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index ac1bc654429..b45f37227b8 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "antrea.io/antrea/pkg/agent/apiserver/handlers/podinterface" + "antrea.io/antrea/pkg/agent/config" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" crdv1alpha3 "antrea.io/antrea/pkg/apis/crd/v1alpha3" @@ -3686,9 +3687,35 @@ func testACNPICMPSupport(t *testing.T, data *TestData) { func testACNPNodePortServiceSupport(t *testing.T, data *TestData) { skipIfProxyAllDisabled(t, data) + testEndpointOnDiffNode := false + currentEncapMode, err := data.GetEncapMode() + if err != nil { + t.Fatalf("Failed to get encap mode: %v", err) + } + if currentEncapMode == config.TrafficEncapModeEncap { + testEndpointOnDiffNode = true + } + + // Create a NodePort Service. + ipProtocol := v1.IPv4Protocol + var nodePort int32 + nodePortSvc, err := data.createNginxNodePortService("test-nodeport-svc", false, false, &ipProtocol) + failOnError(err, t) + for _, port := range nodePortSvc.Spec.Ports { + if port.NodePort != 0 { + nodePort = port.NodePort + break + } + } + + backendPodName := "test-nodeport-backend-pod" + require.NoError(t, data.createNginxPodOnNode(backendPodName, data.testNamespace, nodeName(0), false)) + if err := data.podWaitForRunning(defaultTimeout, backendPodName, data.testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", backendPodName) + } + defer deletePodWrapper(t, data, data.testNamespace, backendPodName) // Create another netns to fake an external network on the host network Pod. - clientName := "test-client" testNetns := "test-ns" cmd := fmt.Sprintf(`ip netns add %[1]s && \ ip link add dev %[1]s-a type veth peer name %[1]s-b && \ @@ -3700,77 +3727,61 @@ ip netns exec %[1]s ip link set dev %[1]s-a up && \ ip netns exec %[1]s ip route replace default via %[3]s && \ sleep 3600 `, testNetns, "1.1.1.1", "1.1.1.254", 24) - if err := NewPodBuilder(clientName, data.testNamespace, agnhostImage).OnNode(controlPlaneNodeName()).WithCommand([]string{"sh", "-c", cmd}).InHostNetwork().Privileged().Create(data); err != nil { + clientNames := []string{"client0"} + if err := NewPodBuilder(clientNames[0], data.testNamespace, agnhostImage).OnNode(nodeName(0)).WithCommand([]string{"sh", "-c", cmd}).InHostNetwork().Privileged().Create(data); err != nil { t.Fatalf("Failed to create client Pod: %v", err) } - defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace) - ips, err := data.podWaitForIPs(defaultTimeout, clientName, data.testNamespace) + defer data.deletePodAndWait(defaultTimeout, clientNames[0], data.testNamespace) + err = data.podWaitForRunning(defaultTimeout, clientNames[0], data.testNamespace) failOnError(err, t) - - var cidr string - if clusterInfo.podV4NetworkCIDR != "" { - cidr = ips.ipv4.String() - } else { - cidr = ips.ipv6.String() + if testEndpointOnDiffNode { + clientNames = append(clientNames, "client1") + if err := NewPodBuilder(clientNames[1], data.testNamespace, agnhostImage).OnNode(nodeName(1)).WithCommand([]string{"sh", "-c", cmd}).InHostNetwork().Privileged().Create(data); err != nil { + t.Fatalf("Failed to create client Pod: %v", err) + } + defer data.deletePodAndWait(defaultTimeout, clientNames[1], data.testNamespace) + err = data.podWaitForRunning(defaultTimeout, clientNames[1], data.testNamespace) + failOnError(err, t) } - cidr += "/32" - - svc1, cleanup1 := data.createAgnhostServiceAndBackendPods(t, "svc1", data.testNamespace, nodeName(0), v1.ServiceTypeNodePort) - defer cleanup1() - - svc2, cleanup2 := data.createAgnhostServiceAndBackendPods(t, "svc2", data.testNamespace, nodeName(1), v1.ServiceTypeNodePort) - defer cleanup2() + cidr := "1.1.1.1/24" builder := &ClusterNetworkPolicySpecBuilder{} builder = builder.SetName("test-acnp-nodeport-svc"). SetPriority(1.0). SetAppliedToGroup([]ACNPAppliedToSpec{ { Service: &crdv1alpha1.NamespacedName{ - Name: svc1.Name, - Namespace: svc1.Namespace, - }, - }, - { - Service: &crdv1alpha1.NamespacedName{ - Name: svc2.Name, - Namespace: svc2.Namespace, + Name: nodePortSvc.Name, + Namespace: nodePortSvc.Namespace, }, }, }) builder.AddIngress(ProtocolTCP, nil, nil, nil, nil, nil, nil, nil, &cidr, nil, nil, nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil) - testcases := []podToAddrTestStep{ - { - Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)), - nodeIP(1), - svc1.Spec.Ports[0].NodePort, - Rejected, - }, - { - Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)), - nodeIP(1), - svc2.Spec.Ports[0].NodePort, - Rejected, - }, - } - acnp, err := k8sUtils.CreateOrUpdateACNP(builder.Get()) failOnError(err, t) failOnError(waitForResourceReady(t, timeout, acnp), t) - for _, tc := range testcases { - log.Tracef("Probing: %s -> %s:%d", cidr, tc.destAddr, tc.destPort) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) - if err != nil { - t.Errorf("failure -- could not complete probe: %v", err) + for idx, clientName := range clientNames { + log.Infof("Probing: 1.1.1.1 -> %s:%d", nodeIP(idx), nodePort) + // Connect to NodePort in the fake external network. + cmd = fmt.Sprintf("for i in $(seq 1 3); do ip netns exec %s /agnhost connect %s:%d --timeout=1s --protocol=tcp; done;", testNetns, nodeIP(idx), nodePort) + stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientName, agnhostContainerName, []string{"sh", "-c", cmd}) + connectivity := Connected + if err != nil || stderr != "" { + // log this error as trace since may be an expected failure + log.Tracef("1.1.1.1 -> %s:%d: error when running command: err - %v /// stdout - %s /// stderr - %s", nodeIP(idx), nodePort, err, stdout, stderr) + // If err != nil and stderr == "", then it means this probe failed because of + // the command instead of connectivity. For example, container name doesn't exist. + if stderr == "" { + connectivity = Error + } + connectivity = DecideProbeResult(stderr, 3) } - if connectivity != tc.expectedConnectivity { - t.Errorf("failure -- wrong results for probe: Source %s --> Dest %s:%d connectivity: %v, expected: %v", - cidr, tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity) + if connectivity != Rejected { + t.Errorf("failure -- wrong results for probe: Source 1.1.1.1 --> Dest %s:%d connectivity: %v, expected: Rej", nodeIP(idx), nodePort, connectivity) } } - // cleanup test resources failOnError(k8sUtils.DeleteACNP(builder.Name), t) } diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 7a59344e3b2..0803c84837a 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -604,7 +604,7 @@ type svcConfig struct { withSessionAffinity bool } -func TestProxyServiceFlows(t *testing.T) { +func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) { // Reset OVS metrics (Prometheus) and reinitialize them to test. legacyregistry.Reset() metrics.InitializeOVSMetrics() @@ -679,7 +679,77 @@ func TestProxyServiceFlows(t *testing.T) { for _, tc := range tcs { groupID := ofconfig.GroupIDType(tc.gid) - expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge) + expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, false) + installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge) + for _, tableFlow := range expTableFlows { + ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows) + } + ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, true) + + uninstallServiceFlowsFunc(t, tc.gid, tc.svc, tc.endpoints) + for _, tableFlow := range expTableFlows { + ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, false, tableFlow.flows) + } + ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, false) + } +} + +func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) { + // Reset OVS metrics (Prometheus) and reinitialize them to test. + legacyregistry.Reset() + metrics.InitializeOVSMetrics() + + c = ofClient.NewClient(br, bridgeMgmtAddr, true, true, false, false, false, false, false, false, false) + err := ofTestUtils.PrepareOVSBridge(br) + require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) + + config := prepareConfiguration(true, false) + _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) + require.Nil(t, err, "Failed to initialize OFClient") + + defer func() { + err = c.Disconnect() + assert.Nil(t, err, fmt.Sprintf("Error while disconnecting from OVS bridge: %v", err)) + err = ofTestUtils.DeleteOVSBridge(br) + assert.Nil(t, err, fmt.Sprintf("Error while deleting OVS bridge: %v", err)) + ofClient.CleanOFTableCache() + ofClient.ResetOFTable() + }() + + endpoints := []k8sproxy.Endpoint{ + k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{ + Endpoint: net.JoinHostPort("192.168.1.2", "8081"), + IsLocal: true, + }), + k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{ + Endpoint: net.JoinHostPort("10.20.1.11", "8081"), + IsLocal: false, + }), + } + + stickyMaxAgeSeconds := uint16(30) + + tcs := []struct { + svc svcConfig + gid uint32 + endpoints []k8sproxy.Endpoint + stickyAge uint16 + }{ + { + svc: svcConfig{ + protocol: ofconfig.ProtocolTCP, + ip: net.ParseIP("10.20.30.41"), + port: uint16(8000), + }, + gid: 2, + endpoints: endpoints, + stickyAge: stickyMaxAgeSeconds, + }, + } + + for _, tc := range tcs { + groupID := ofconfig.GroupIDType(tc.gid) + expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, true) installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge) for _, tableFlow := range expTableFlows { ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows) @@ -716,7 +786,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint } } -func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16) (tableFlows []expectTableFlows, groupBuckets []string) { +func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16, antreaPolicyEnabled bool) (tableFlows []expectTableFlows, groupBuckets []string) { nw_proto := 6 learnProtoField := "NXM_OF_TCP_DST[]" if svc.protocol == ofconfig.ProtocolUDP { @@ -732,10 +802,17 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [ serviceLearnReg = 3 } cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum) + + loadGourpID := "" + ctTable := "EgressRule" + if antreaPolicyEnabled { + loadGourpID = fmt.Sprintf("set_field:0x%x->reg7,", gid) + ctTable = "AntreaPolicyEgressRule" + } svcFlows := expectTableFlows{tableName: "ServiceLB", flows: []*ofTestUtils.ExpectFlow{ { MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port), - ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x%x->reg7,group:%d", serviceLearnReg<<16, gid, gid), + ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,%sgroup:%d", serviceLearnReg<<16, loadGourpID, gid), }, { MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port), @@ -754,7 +831,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [ unionVal := (0b010 << 16) + uint32(epPort) epDNATFlows.flows = append(epDNATFlows.flows, &ofTestUtils.ExpectFlow{ MatchStr: fmt.Sprintf("priority=200,%s,reg3=%s,reg4=0x%x/0x7ffff", string(svc.protocol), epIP, unionVal), - ActStr: fmt.Sprintf("ct(commit,table=EgressRule,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ep.IP(), epPort), + ActStr: fmt.Sprintf("ct(commit,table=%s,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ctTable, ep.IP(), epPort), }) if ep.GetIsLocal() {