Skip to content

Commit

Permalink
Add ServiceLocalToRemote reject type (#4547)
Browse files Browse the repository at this point in the history
For single cluster rejection, there is no difference for Service traffic or Pod-to-Pod traffic in localToRemote rejection. Service LB won't be executed on the Node where we generate reject responses. So we didn't define podLocalToRemote and serviceLocalToRemote and just let the reject response packet start from L3/L2Forwarding table based on the Node type.

But in multi-cluster situations, when the server Endpoint Pod is on the gateway Node of its cluster, where Service LB is executed, we need reject response packets to go thru the whole pipeline to be correctly UnDNATed and forwarded.
  • Loading branch information
GraysonWu authored Mar 2, 2023
1 parent 6339388 commit 4e5fba7
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 31 deletions.
7 changes: 7 additions & 0 deletions multicluster/test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ func deletePodWrapper(tb testing.TB, data *MCTestData, clusterName string, names
}
}

func deletePodAndWaitWrapper(tb testing.TB, data *MCTestData, clusterName string, namespace string, name string) {
tb.Logf("Deleting Pod '%s' in Namespace %s of cluster %s", name, namespace, clusterName)
if err := data.deletePodAndWait(clusterName, namespace, name); err != nil {
tb.Logf("Error when deleting Pod: %v", err)
}
}

func deleteServiceWrapper(tb testing.TB, data *MCTestData, clusterName string, namespace string, name string) {
tb.Logf("Deleting Service '%s' in Namespace %s of cluster %s", name, namespace, clusterName)
if err := data.deleteService(clusterName, namespace, name); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions multicluster/test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ func (data *MCTestData) deletePod(clusterName, namespace, name string) error {
return nil
}

func (data *MCTestData) deletePodAndWait(clusterName, namespace, name string) error {
if d, ok := data.clusterTestDataMap[clusterName]; ok {
if err := d.DeletePodAndWait(defaultTimeout, namespace, name); err != nil {
return err
}
}
return nil
}

func (data *MCTestData) deleteService(clusterName, namespace, name string) error {
if d, ok := data.clusterTestDataMap[clusterName]; ok {
if err := d.DeleteService(namespace, name); err != nil {
Expand Down
1 change: 1 addition & 0 deletions multicluster/test/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func TestConnectivity(t *testing.T) {
t.Run("Case=ScaleDownMCServiceEndpoints", func(t *testing.T) { testScaleDownMCServiceEndpoints(t, data) })
t.Run("Case=ANPToServices", func(t *testing.T) { testANPToServices(t, data) })
t.Run("Case=StretchedNetworkPolicy", func(t *testing.T) { testStretchedNetworkPolicy(t, data) })
t.Run("Case=StretchedNetworkPolicyReject", func(t *testing.T) { testStretchedNetworkPolicyReject(t, data) })
t.Run("Case=StretchedNetworkPolicyUpdatePod", func(t *testing.T) { testStretchedNetworkPolicyUpdatePod(t, data) })
t.Run("Case=StretchedNetworkPolicyUpdateNS", func(t *testing.T) { testStretchedNetworkPolicyUpdateNS(t, data) })
t.Run("Case=StretchedNetworkPolicyUpdatePolicy", func(t *testing.T) { testStretchedNetworkPolicyUpdatePolicy(t, data) })
Expand Down
55 changes: 51 additions & 4 deletions multicluster/test/e2e/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func testStretchedNetworkPolicy(t *testing.T, data *MCTestData) {
data.testStretchedNetworkPolicy(t)
}

func testStretchedNetworkPolicyReject(t *testing.T, data *MCTestData) {
data.testStretchedNetworkPolicyReject(t)
}

func testStretchedNetworkPolicyUpdatePod(t *testing.T, data *MCTestData) {
data.testStretchedNetworkPolicyUpdatePod(t)
}
Expand Down Expand Up @@ -233,17 +237,17 @@ func (data *MCTestData) testStretchedNetworkPolicy(t *testing.T) {
acnpBuilder1 = acnpBuilder1.SetName("drop-client-pod-sel").
SetPriority(1.0).
SetAppliedToGroup([]e2euttils.ACNPAppliedToSpec{{PodSelector: map[string]string{"app": "nginx"}}}).
AddStretchedIngressRule(map[string]string{"antrea-e2e": eastGwClientName}, nil, "", nil, crdv1alpha1.RuleActionReject).
AddStretchedIngressRule(map[string]string{"antrea-e2e": eastRegularClientName}, map[string]string{"kubernetes.io/metadata.name": multiClusterTestNamespace}, "", nil, crdv1alpha1.RuleActionReject)
AddStretchedIngressRule(map[string]string{"antrea-e2e": eastGwClientName}, nil, "", nil, crdv1alpha1.RuleActionDrop).
AddStretchedIngressRule(map[string]string{"antrea-e2e": eastRegularClientName}, map[string]string{"kubernetes.io/metadata.name": multiClusterTestNamespace}, "", nil, crdv1alpha1.RuleActionDrop)
if _, err := data.createOrUpdateACNP(westCluster, acnpBuilder1.Get()); err != nil {
t.Fatalf("Error creating ACNP %s: %v", acnpBuilder1.Name, err)
}

connectivity := data.probeFromPodInCluster(eastCluster, multiClusterTestNamespace, eastGwClientName, "client", westExpSvcIP, mcWestClusterTestService, 80, corev1.ProtocolTCP)
assert.Equal(t, antreae2e.Rejected, connectivity, getStretchedNetworkPolicyErrorMessage(eastGwClientName))
assert.Equal(t, antreae2e.Dropped, connectivity, getStretchedNetworkPolicyErrorMessage(eastGwClientName))

connectivity = data.probeFromPodInCluster(eastCluster, multiClusterTestNamespace, eastRegularClientName, "client", westExpSvcIP, mcWestClusterTestService, 80, corev1.ProtocolTCP)
assert.Equal(t, antreae2e.Rejected, connectivity, getStretchedNetworkPolicyErrorMessage(eastRegularClientName))
assert.Equal(t, antreae2e.Dropped, connectivity, getStretchedNetworkPolicyErrorMessage(eastRegularClientName))
data.deleteACNP(westCluster, acnpBuilder1.Name)

// Verify that Stretched NetworkPolicy works fine with nsSelect.
Expand All @@ -265,6 +269,49 @@ func (data *MCTestData) testStretchedNetworkPolicy(t *testing.T) {
assert.Equal(t, antreae2e.Dropped, connectivity, getStretchedNetworkPolicyErrorMessage(eastRegularClientName))
}

func (data *MCTestData) testStretchedNetworkPolicyReject(t *testing.T) {
westExpSvcInEast, err := data.getService(eastCluster, multiClusterTestNamespace, mcWestClusterTestService)
if err != nil {
t.Fatalf("Error when getting the imported Service %s: %v", mcWestClusterTestService, err)
}
westExpSvcInEastIP := westExpSvcInEast.Spec.ClusterIP

eastGwClientName := getClusterGatewayClientPodName(eastCluster)
eastRegularClientName := getClusterRegularClientPodName(eastCluster)

acnpBuilder := &e2euttils.ClusterNetworkPolicySpecBuilder{}
acnpBuilder = acnpBuilder.SetName("drop-client-pod-sel").
SetPriority(1.0).
SetAppliedToGroup([]e2euttils.ACNPAppliedToSpec{{PodSelector: map[string]string{"app": "nginx"}}}).
AddStretchedIngressRule(map[string]string{"app": "client"}, nil, "", nil, crdv1alpha1.RuleActionReject)
if _, err := data.createOrUpdateACNP(westCluster, acnpBuilder.Get()); err != nil {
t.Fatalf("Error creating ACNP %s: %v", acnpBuilder.Name, err)
}
defer data.deleteACNP(westCluster, acnpBuilder.Name)

testConnectivity := func() {
connectivity := data.probeFromPodInCluster(eastCluster, multiClusterTestNamespace, eastGwClientName, "client", westExpSvcInEastIP, mcWestClusterTestService, 80, corev1.ProtocolTCP)
assert.Equal(t, antreae2e.Rejected, connectivity, getStretchedNetworkPolicyErrorMessage(eastGwClientName))

connectivity = data.probeFromPodInCluster(eastCluster, multiClusterTestNamespace, eastRegularClientName, "client", westExpSvcInEastIP, mcWestClusterTestService, 80, corev1.ProtocolTCP)
assert.Equal(t, antreae2e.Rejected, connectivity, getStretchedNetworkPolicyErrorMessage(eastRegularClientName))
}

// Test when the server Pod is created and running on the Gateway Node.
deletePodAndWaitWrapper(t, data, westCluster, multiClusterTestNamespace, testServerPod)
if err := createPodWrapper(t, data, westCluster, multiClusterTestNamespace, testServerPod+"-gw", data.clusterGateways[westCluster], nginxImage, "nginx", nil, nil, nil, nil, false, nil); err != nil {
t.Fatalf("Error when creating nginx Pod in cluster %s: %v", westCluster, err)
}
testConnectivity()

// Test when the server Pod is created and running on the regular Node.
deletePodAndWaitWrapper(t, data, westCluster, multiClusterTestNamespace, testServerPod+"-gw")
if err := createPodWrapper(t, data, westCluster, multiClusterTestNamespace, testServerPod+"-regular", data.clusterRegularNodes[westCluster], nginxImage, "nginx", nil, nil, nil, nil, false, nil); err != nil {
t.Fatalf("Error when creating nginx Pod in cluster %s: %v", westCluster, err)
}
testConnectivity()
}

func (data *MCTestData) testStretchedNetworkPolicyUpdatePod(t *testing.T) {
westExpSvc, err := data.getService(eastCluster, multiClusterTestNamespace, mcWestClusterTestService)
if err != nil {
Expand Down
28 changes: 19 additions & 9 deletions pkg/agent/controller/networkpolicy/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ const (
// traffic and for this response, the srcPod is on a remote Node and the dstPod is
// on the local Node.
RejectPodRemoteToLocal
// RejectLocalToRemote represents this packetOut is used to reject traffic and for
// this response, the srcPod is on the local Node and the dstPod is on a remote Node.
// While generating rejection from local to remote, there is no difference between
// Service traffic and Pod traffic.
RejectLocalToRemote
// RejectPodLocalToRemote represents this packetOut is used to reject Pod-to-Pod
// traffic and for this response, the srcPod is on the local Node and the dstPod is
// on a remote Node.
RejectPodLocalToRemote
// RejectServiceLocal represents this packetOut is used to reject Service traffic,
// when AntreaProxy is enabled. The EndpointPod and the dstPod of the reject
// response are on the same Node.
Expand All @@ -66,6 +65,10 @@ const (
// traffic, when AntreaProxy is enabled. The EndpointPod is on a remote Node and
// the dstPod of the reject response is on the local Node.
RejectServiceRemoteToLocal
// RejectServiceLocalToRemote represents this packetOut is used to reject Service
// traffic, when AntreaProxy is enabled. The EndpointPod is on the local Node and
// the dstPod of the reject response is on a remote Node.
RejectServiceLocalToRemote
// RejectNoAPServiceLocal represents this packetOut is used to reject Service
// traffic, when AntreaProxy is disabled. The EndpointPod and the dstPod of the
// reject response are on the same Node.
Expand Down Expand Up @@ -225,7 +228,7 @@ func getRejectType(isServiceTraffic, antreaProxyEnabled, srcIsLocal, dstIsLocal
if dstIsLocal {
return RejectPodLocal
}
return RejectLocalToRemote
return RejectPodLocalToRemote
}
if dstIsLocal {
return RejectPodRemoteToLocal
Expand All @@ -245,7 +248,7 @@ func getRejectType(isServiceTraffic, antreaProxyEnabled, srcIsLocal, dstIsLocal
if dstIsLocal {
return RejectServiceLocal
}
return RejectLocalToRemote
return RejectServiceLocalToRemote
}
if dstIsLocal {
return RejectServiceRemoteToLocal
Expand All @@ -262,6 +265,13 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte
inPort = uint32(sIface.OFPort)
outPort = uint32(dIface.OFPort)
case RejectServiceLocal:
fallthrough
case RejectServiceLocalToRemote:
// For RejectServiceLocal and RejectServiceLocalToRemote, we set inPort as the
// OFPort of the srcPod to simulate its rejection. And we don't set outPort, since
// it's Service traffic load-balanced by AntreaProxy. The reject response packet
// needs to be UnDNATed by the pipeline, instead of directly sending it out
// through outPort.
inPort = uint32(sIface.OFPort)
case RejectPodRemoteToLocal:
if dIface.Type == interfacestore.ExternalEntityInterface {
Expand All @@ -272,7 +282,7 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte
outPort = uint32(dIface.OFPort)
case RejectServiceRemoteToLocal:
inPort = gwOFPort
case RejectLocalToRemote:
case RejectPodLocalToRemote:
inPort = uint32(sIface.OFPort)
if sIface.Type == interfacestore.ExternalEntityInterface {
outPort = uint32(sIface.EntityInterfaceConfig.UplinkPort.OFPort)
Expand Down Expand Up @@ -312,7 +322,7 @@ func getRejectPacketOutMutateFunc(rejectType RejectType, nodeType config.NodeTyp
return packetOutBuilder.AddLoadRegMark(openflow.CustomReasonRejectRegMark).
AddResubmitAction(nil, &tableID)
}
case RejectLocalToRemote:
case RejectPodLocalToRemote:
tableID := openflow.L3ForwardingTable.GetID()
// L3ForwardingTable is not initialized for ExternalNode case since layer 3 is not needed.
if nodeType == config.ExternalNode {
Expand Down
Loading

0 comments on commit 4e5fba7

Please sign in to comment.