diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index d2921216c98..d6c4902396b 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -2259,7 +2259,7 @@ func testRejectServiceTraffic(t *testing.T, data *TestData, clientNamespace, ser for _, tc := range testcases { log.Tracef("Probing: %s -> %s:%d", tc.clientPod.PodName(), tc.destAddr, tc.destPort) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -2284,7 +2284,7 @@ func testRejectServiceTraffic(t *testing.T, data *TestData, clientNamespace, ser for _, tc := range testcases { log.Tracef("Probing: %s -> %s:%d", tc.clientPod.PodName(), tc.destAddr, tc.destPort) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -2350,7 +2350,7 @@ func testRejectNoInfiniteLoop(t *testing.T, data *TestData, clientNamespace, ser for _, tc := range testcases { log.Tracef("Probing: %s -> %s:%d", tc.clientPod.PodName(), tc.destAddr, tc.destPort) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -2576,7 +2576,7 @@ func testAuditLoggingBasic(t *testing.T, data *TestData) { wg.Add(1) go func() { defer wg.Done() - k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP, nil) + k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP, nil, nil) }() } oneProbe(namespaces["x"], "a", namespaces["z"], "a") @@ -2624,7 +2624,7 @@ func testAuditLoggingEnableK8s(t *testing.T, data *TestData) { wg.Add(1) go func() { defer wg.Done() - k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP, nil) + k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP, nil, nil) }() } oneProbe(namespaces["x"], "b", namespaces["x"], "a") @@ -3203,7 +3203,7 @@ func testFQDNPolicy(t *testing.T) { failOnError(waitForResourceReady(t, timeout, acnp), t) for _, tc := range testcases { log.Tracef("Probing: %s -> %s", tc.clientPod.PodName(), tc.destAddr) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -3294,7 +3294,7 @@ func testFQDNPolicyInClusterService(t *testing.T) { for _, tc := range testcases { log.Tracef("Probing: %s -> %s", tc.clientPod.PodName(), tc.destAddr) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -3342,7 +3342,7 @@ func testFQDNPolicyTCP(t *testing.T) { continue } log.Tracef("Probing: %s -> %s(%s)", tc.clientPod.PodName(), tc.destAddr, destIP) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), destIP, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), destIP, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -3413,7 +3413,7 @@ func testToServices(t *testing.T) { for _, tc := range testcases { log.Tracef("Probing: %s -> %s", tc.clientPod.PodName(), tc.destAddr) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -3500,7 +3500,7 @@ func testServiceAccountSelector(t *testing.T, data *TestData) { for _, tc := range testcases { log.Tracef("Probing: %s -> %s:%d", tc.clientPod.PodName(), tc.destAddr, tc.destPort) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -3562,7 +3562,7 @@ func testACNPNodeSelectorEgress(t *testing.T) { time.Sleep(networkPolicyDelay) for _, tc := range testcases { log.Tracef("Probing: %s -> %s", tc.clientPod.PodName(), tc.destAddr) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "pod", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -3637,7 +3637,7 @@ func testACNPNodeSelectorIngress(t *testing.T, data *TestData) { time.Sleep(networkPolicyDelay) for _, tc := range testcases { log.Tracef("Probing: %s -> %s", tc.clientPod.PodName(), tc.destAddr) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -3709,7 +3709,7 @@ func testACNPICMPSupport(t *testing.T, data *TestData) { time.Sleep(networkPolicyDelay) for _, tc := range testcases { log.Tracef("Probing: %s -> %s", tc.clientPod.PodName(), tc.destAddr) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolICMP) + connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolICMP, &tc.expectedConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } @@ -4097,7 +4097,7 @@ func doProbe(t *testing.T, data *TestData, p *CustomProbe, protocol AntreaPolicy _, _, dstPodCleanupFunc := createAndWaitForPodWithLabels(t, data, data.createServerPodWithLabels, p.DestPod.Pod.PodName(), p.DestPod.Pod.Namespace(), p.Port, p.DestPod.Labels) defer dstPodCleanupFunc() log.Tracef("Probing: %s -> %s", p.SourcePod.Pod.PodName(), p.DestPod.Pod.PodName()) - connectivity, err := k8sUtils.Probe(p.SourcePod.Pod.Namespace(), p.SourcePod.Pod.PodName(), p.DestPod.Pod.Namespace(), p.DestPod.Pod.PodName(), p.Port, protocol, nil) + connectivity, err := k8sUtils.Probe(p.SourcePod.Pod.Namespace(), p.SourcePod.Pod.PodName(), p.DestPod.Pod.Namespace(), p.DestPod.Pod.PodName(), p.Port, protocol, nil, &p.ExpectConnectivity) if err != nil { t.Errorf("Failure -- could not complete probe: %v", err) } diff --git a/test/e2e/k8s_util.go b/test/e2e/k8s_util.go index 7b231b06a90..311d5c52ca2 100644 --- a/test/e2e/k8s_util.go +++ b/test/e2e/k8s_util.go @@ -139,6 +139,7 @@ func (k *KubernetesUtils) probe( dstName string, port int32, protocol utils.AntreaPolicyProtocol, + expectedResult *PodConnectivityMark, ) PodConnectivityMark { protocolStr := map[utils.AntreaPolicyProtocol]string{ utils.ProtocolTCP: "tcp", @@ -159,17 +160,23 @@ func (k *KubernetesUtils) probe( // It needs to check both err and stderr because: // 1. The probe tried 3 times. If it checks err only, failure+failure+success would be considered connected. // 2. There might be an issue in Pod exec API that it sometimes doesn't return error when the probe fails. See #2394. + var actualResult PodConnectivityMark if err != nil || stderr != "" { // log this error as trace since may be an expected failure log.Tracef("%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", podName, dstName, err, stdout, stderr) // If err != nil and stderr == "", then it means this probe failed because of // the command instead of connectivity. For example, container name doesn't exist. if stderr == "" { - return Error + actualResult = Error } - return DecideProbeResult(stderr, 3) + actualResult = DecideProbeResult(stderr, 3) + } else { + actualResult = Connected + } + if expectedResult != nil && *expectedResult != actualResult { + log.Infof("%s -> %s: expected %s but got %s: err - %v /// stdout - %s /// stderr - %s", podName, dstName, *expectedResult, actualResult, err, stdout, stderr) } - return Connected + return actualResult } // DecideProbeResult uses the probe stderr to decide the connectivity. @@ -337,7 +344,7 @@ func (k *KubernetesUtils) digDNS( // installed. The connectivity from source Pod to all IPs of the target Pod // should be consistent. Otherwise, Error PodConnectivityMark will be returned. func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protocol utils.AntreaPolicyProtocol, - remoteCluster *KubernetesUtils) (PodConnectivityMark, error) { + remoteCluster *KubernetesUtils, expectedResult *PodConnectivityMark) (PodConnectivityMark, error) { fromPods, err := k.GetPodsByLabel(ns1, "pod", pod1) if err != nil { return Error, fmt.Errorf("unable to get Pods from Namespace %s: %v", ns1, err) @@ -364,11 +371,11 @@ func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protoco } toPod := toPods[0] fromPodName, toPodName := fmt.Sprintf("%s/%s", ns1, pod1), fmt.Sprintf("%s/%s", ns2, pod2) - return k.probeAndDecideConnectivity(fromPod, toPod, fromPodName, toPodName, port, protocol) + return k.probeAndDecideConnectivity(fromPod, toPod, fromPodName, toPodName, port, protocol, expectedResult) } func (k *KubernetesUtils) probeAndDecideConnectivity(fromPod, toPod v1.Pod, - fromPodName, toPodName string, port int32, protocol utils.AntreaPolicyProtocol) (PodConnectivityMark, error) { + fromPodName, toPodName string, port int32, protocol utils.AntreaPolicyProtocol, expectedResult *PodConnectivityMark) (PodConnectivityMark, error) { // Both IPv4 and IPv6 address should be tested. connectivity := Unknown for _, eachIP := range toPod.Status.PodIPs { @@ -379,7 +386,7 @@ func (k *KubernetesUtils) probeAndDecideConnectivity(fromPod, toPod v1.Pod, } // HACK: inferring container name as c80, c81 etc., for simplicity. containerName := fmt.Sprintf("c%v", port) - curConnectivity := k.probe(&fromPod, fromPodName, containerName, toIP, toPodName, port, protocol) + curConnectivity := k.probe(&fromPod, fromPodName, containerName, toIP, toPodName, port, protocol, expectedResult) if connectivity == Unknown { connectivity = curConnectivity } else if connectivity != curConnectivity { @@ -391,7 +398,7 @@ func (k *KubernetesUtils) probeAndDecideConnectivity(fromPod, toPod v1.Pod, // ProbeAddr execs into a Pod and checks its connectivity to an arbitrary destination // address. -func (k *KubernetesUtils) ProbeAddr(ns, podLabelKey, podLabelValue, dstAddr string, port int32, protocol utils.AntreaPolicyProtocol) (PodConnectivityMark, error) { +func (k *KubernetesUtils) ProbeAddr(ns, podLabelKey, podLabelValue, dstAddr string, port int32, protocol utils.AntreaPolicyProtocol, expectedResult *PodConnectivityMark) (PodConnectivityMark, error) { fromPods, err := k.GetPodsByLabel(ns, podLabelKey, podLabelValue) if err != nil { return Error, fmt.Errorf("unable to get Pods from Namespace %s: %v", ns, err) @@ -409,7 +416,7 @@ func (k *KubernetesUtils) ProbeAddr(ns, podLabelKey, podLabelValue, dstAddr stri if strings.Contains(dstAddr, ":") { dstAddr = fmt.Sprintf("[%s]", dstAddr) } - connectivity = k.probe(&fromPod, fmt.Sprintf("%s/%s", ns, podLabelValue), containerName, dstAddr, dstAddr, port, protocol) + connectivity = k.probe(&fromPod, fmt.Sprintf("%s/%s", ns, podLabelValue), containerName, dstAddr, dstAddr, port, protocol, expectedResult) } return connectivity, nil } @@ -1089,7 +1096,8 @@ func (k *KubernetesUtils) validateOnePort(allPods []Pod, reachability *Reachabil // TODO: find better metrics, this is only for POC. oneProbe := func(podFrom, podTo Pod, port int32) { log.Tracef("Probing: %s -> %s", podFrom, podTo) - connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, nil) + expectedResult := reachability.Expected.Get(podFrom.String(), podTo.String()) + connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, nil, &expectedResult) resultsCh <- &probeResult{podFrom, podTo, connectivity, err} } for _, pod1 := range allPods { @@ -1116,10 +1124,6 @@ func (k *KubernetesUtils) validateOnePort(allPods []Pod, reachability *Reachabil } else if prevConn != r.connectivity { reachability.Observe(r.podFrom, r.podTo, Error) } - - if r.connectivity != Connected && reachability.Expected.Get(r.podFrom.String(), r.podTo.String()) == Connected { - log.Warnf("FAILED CONNECTION FOR ALLOWED PODS %s -> %s:%d:%s !!!! ", r.podFrom, r.podTo, port, protocol) - } } } @@ -1144,7 +1148,8 @@ func (k *KubernetesUtils) ValidateRemoteCluster(remoteCluster *KubernetesUtils, resultsCh := make(chan *probeResult, numProbes) oneProbe := func(podFrom, podTo Pod, port int32) { log.Tracef("Probing: %s -> %s", podFrom, podTo) - connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, remoteCluster) + expectedResult := reachability.Expected.Get(podFrom.String(), podTo.String()) + connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, remoteCluster, &expectedResult) resultsCh <- &probeResult{podFrom, podTo, connectivity, err} } for _, pod1 := range allPods { @@ -1161,9 +1166,6 @@ func (k *KubernetesUtils) ValidateRemoteCluster(remoteCluster *KubernetesUtils, if prevConn == Unknown { reachability.Observe(r.podFrom, r.podTo, r.connectivity) } - if r.connectivity != Connected && reachability.Expected.Get(r.podFrom.String(), r.podTo.String()) == Connected { - log.Warnf("FAILED CONNECTION FOR ALLOWED PODS %s -> %s:%d:%s in %s !!!! ", r.podFrom, r.podTo, port, protocol, k.ClusterName) - } } }