diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index dfd5e74897d..490a08a96e4 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -187,8 +187,7 @@ func run(o *Options) error { statsCollector = stats.NewCollector(antreaClientProvider, ofClient, networkPolicyController) } - var proxier k8sproxy.Provider - + var proxier proxy.Proxier if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) @@ -287,6 +286,10 @@ func run(o *Options) error { go traceflowController.Run(stopCh) } + if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { + go proxier.GetProxyProvider().Run(stopCh) + } + agentQuerier := querier.NewAgentQuerier( nodeConfig, networkConfig, @@ -294,6 +297,7 @@ func run(o *Options) error { k8sClient, ofClient, ovsBridgeClient, + proxier, networkPolicyController, o.config.APIPort) @@ -301,10 +305,6 @@ func run(o *Options) error { go agentMonitor.Run(stopCh) - if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { - go proxier.Run(stopCh) - } - cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.TLSCipherSuites) if err != nil { return fmt.Errorf("error generating Cipher Suite list: %v", err) @@ -339,12 +339,16 @@ func run(o *Options) error { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) + var proxyProvider k8sproxy.Provider + if proxier != nil { + proxyProvider = proxier.GetProxyProvider() + } connStore := connections.NewConnectionStore( connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)), ifaceStore, v4Enabled, v6Enabled, - proxier, + proxyProvider, networkPolicyController, o.pollInterval) pollDone := make(chan struct{}) diff --git a/docs/antctl.md b/docs/antctl.md index dd05e3ee041..027c3bd8591 100644 --- a/docs/antctl.md +++ b/docs/antctl.md @@ -235,13 +235,15 @@ antctl get podinterface [NAME] [-n NAMESPACE] Starting from version 0.6.0, Antrea Agent supports dumping Antrea OVS flows. The `antctl` `get ovsflows` (or `get of`) command can dump all OVS flows, flows -added for a specified Pod, or flows added to realize a specified NetworkPolicy, -or flows in a specified OVS flow table. +added for a specified Pod, or flows added for Service load-balancing of a +specified Service, or flows added to realize a specified NetworkPolicy, or flows +in a specified OVS flow table. ```bash antctl get ovsflows antctl get ovsflows -p POD -n NAMESPACE -antctl get ovsflows --networkpolicy NETWORKPOLICY -n NAMESPACE +antctl get ovsflows -S SERVICE -n NAMESPACE +antctl get ovsflows -N NETWORKPOLICY -n NAMESPACE antctl get ovsflows -T TABLE_A,TABLE_B antctl get ovsflows -T TABLE_A,TABLE_B_NUM antctl get ovsflows -T TABLE_A_NUM,TABLE_B_NUM @@ -269,7 +271,7 @@ NAMESPACE NAME APPLIED-TO RULES kube-system kube-dns 160ea6d7-0234-5d1d-8ea0-b703d0aa3b46 1 # Dump OVS flows of NetworkPolicy "kube-dns" -$ antctl get of --networkpolicy kube-dns -n kube-system +$ antctl get of -N kube-dns -n kube-system FLOW table=90, n_packets=0, n_bytes=0, priority=190,conj_id=1,ip actions=resubmit(,105) table=90, n_packets=0, n_bytes=0, priority=200,ip actions=conjunction(1,1/3) @@ -293,7 +295,7 @@ few trace-packet command examples. # Trace an IP packet between two Pods antctl trace-packet -S ns1/pod1 -D ns2/pod2 # Trace a Service request from a local Pod -antctl trace-packet -S ns1/pod1 -D ns2/srv2 -f "tcp,tcp_dst=80" +antctl trace-packet -S ns1/pod1 -D ns2/svc2 -f "tcp,tcp_dst=80" # Trace the Service reply packet (assuming "ns2/pod2" is the Service backend Pod) antctl trace-packet -D ns1/pod1 -S ns2/pod2 -f "tcp,tcp_src=80" # Trace an IP packet from a Pod to gateway port diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index 62fbc7767b8..f4ae76d4f23 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -96,20 +96,21 @@ $GOPATH/bin/openapi-gen \ # Generate mocks for testing with mockgen. MOCKGEN_TARGETS=( "pkg/agent/cniserver/ipam IPAMDriver testing" + "pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack testing" "pkg/agent/interfacestore InterfaceStore testing" + "pkg/agent/nodeportlocal/rules PodPortRules testing" "pkg/agent/openflow Client,OFEntryOperations testing" + "pkg/agent/proxy Proxier" + "pkg/agent/querier AgentQuerier testing" "pkg/agent/route Interface testing" "pkg/antctl AntctlClient ." + "pkg/controller/networkpolicy EndpointQuerier testing" + "pkg/controller/querier ControllerQuerier testing" + "pkg/ipfix IPFIXExportingProcess,IPFIXSet,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing" "pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder testing" "pkg/ovs/ovsconfig OVSBridgeClient testing" "pkg/ovs/ovsctl OVSCtlClient testing" - "pkg/agent/querier AgentQuerier testing" - "pkg/controller/networkpolicy EndpointQuerier testing" - "pkg/controller/querier ControllerQuerier testing" "pkg/querier AgentNetworkPolicyInfoQuerier testing" - "pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack testing" - "pkg/ipfix IPFIXExportingProcess,IPFIXSet,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing" - "pkg/agent/nodeportlocal/rules PodPortRules testing" "third_party/proxy Provider testing" ) diff --git a/pkg/agent/apiserver/handlers/ovsflows/handler.go b/pkg/agent/apiserver/handlers/ovsflows/handler.go index 94d89e50336..8ed2700e5d8 100644 --- a/pkg/agent/apiserver/handlers/ovsflows/handler.go +++ b/pkg/agent/apiserver/handlers/ovsflows/handler.go @@ -25,6 +25,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/openflow" agentquerier "github.com/vmware-tanzu/antrea/pkg/agent/querier" "github.com/vmware-tanzu/antrea/pkg/antctl/transform/common" + "github.com/vmware-tanzu/antrea/pkg/features" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" "github.com/vmware-tanzu/antrea/pkg/querier" ) @@ -67,6 +68,21 @@ func dumpFlows(aq agentquerier.AgentQuerier, table binding.TableIDType) ([]Respo return resps, nil } +func dumpMatchedGroups(aq agentquerier.AgentQuerier, groupIDs []binding.GroupIDType) ([]Response, error) { + resps := []Response{} + for _, g := range groupIDs { + groupStr, err := aq.GetOVSCtlClient().DumpGroup(int(g)) + if err != nil { + klog.Errorf("Failed to dump group %d: %v", g, err) + return nil, err + } + if groupStr != "" { + resps = append(resps, Response{groupStr}) + } + } + return resps, nil +} + // nil is returned if the flow table can not be found (the passed table name or // number is invalid). func getTableFlows(aq agentquerier.AgentQuerier, table string) ([]Response, error) { @@ -107,6 +123,22 @@ func getPodFlows(aq agentquerier.AgentQuerier, podName, namespace string) ([]Res } +func getServiceFlows(aq agentquerier.AgentQuerier, serviceName, namespace string) ([]Response, error) { + flowKeys, groupIDs := aq.GetProxier().GetServiceFlowKeys(serviceName, namespace) + if flowKeys == nil { + return nil, nil + } + resps, err := dumpMatchedFlows(aq, flowKeys) + if err != nil { + return nil, err + } + groupResps, err := dumpMatchedGroups(aq, groupIDs) + if err != nil { + return nil, err + } + return append(resps, groupResps...), nil +} + func getNetworkPolicyFlows(aq agentquerier.AgentQuerier, npName, namespace string) ([]Response, error) { if len(aq.GetNetworkPolicyInfoQuerier().GetNetworkPolicies(&querier.NetworkPolicyQueryFilter{SourceName: npName, Namespace: namespace})) == 0 { // NetworkPolicy not found. @@ -123,20 +155,27 @@ func HandleFunc(aq agentquerier.AgentQuerier) http.HandlerFunc { var err error var resps []Response pod := r.URL.Query().Get("pod") + service := r.URL.Query().Get("service") networkPolicy := r.URL.Query().Get("networkpolicy") namespace := r.URL.Query().Get("namespace") table := r.URL.Query().Get("table") - if (pod != "" || networkPolicy != "") && namespace == "" { + if (pod != "" || service != "" || networkPolicy != "") && namespace == "" { http.Error(w, "namespace must be provided", http.StatusBadRequest) return } - if pod == "" && networkPolicy == "" && namespace == "" && table == "" { + if pod == "" && service == "" && networkPolicy == "" && namespace == "" && table == "" { resps, err = dumpFlows(aq, binding.TableIDAll) } else if pod != "" { // Pod Namespace must be provided to dump flows of a Pod. resps, err = getPodFlows(aq, pod, namespace) + } else if service != "" { + if !features.DefaultFeatureGate.Enabled(features.AntreaProxy) { + http.Error(w, "AntreaProxy is not enabled", http.StatusServiceUnavailable) + return + } + resps, err = getServiceFlows(aq, service, namespace) } else if networkPolicy != "" { resps, err = getNetworkPolicyFlows(aq, networkPolicy, namespace) } else if table != "" { diff --git a/pkg/agent/apiserver/handlers/ovsflows/handler_test.go b/pkg/agent/apiserver/handlers/ovsflows/handler_test.go index 0995d1574a5..3c19ca85a4f 100644 --- a/pkg/agent/apiserver/handlers/ovsflows/handler_test.go +++ b/pkg/agent/apiserver/handlers/ovsflows/handler_test.go @@ -26,18 +26,23 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing" oftest "github.com/vmware-tanzu/antrea/pkg/agent/openflow/testing" + proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing" agentquerier "github.com/vmware-tanzu/antrea/pkg/agent/querier" aqtest "github.com/vmware-tanzu/antrea/pkg/agent/querier/testing" cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2" + binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" ovsctltest "github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl/testing" "github.com/vmware-tanzu/antrea/pkg/querier" queriertest "github.com/vmware-tanzu/antrea/pkg/querier/testing" ) var ( - testFlowKeys = []string{"flowKey1", "flowKey2"} - testDumpResults = []string{"flow1", "flow2"} - testResponses = []Response{{"flow1"}, {"flow2"}} + testFlowKeys = []string{"flowKey1", "flowKey2"} + testDumpFlows = []string{"flow1", "flow2"} + testGroupIDs = []binding.GroupIDType{1, 2} + testDumpGroups = []string{"group1", "group2"} + testResponses = []Response{{"flow1"}, {"flow2"}} + testGroupResponses = []Response{{"group1"}, {"group2"}} ) type testCase struct { @@ -46,11 +51,13 @@ type testCase struct { namespace string query string expectedStatus int + dumpGroups bool } func TestBadRequests(t *testing.T) { badRequests := map[string]string{ "Pod only": "?pod=pod1", + "Service only": "?service=svc1", "NetworkPolicy only": "?networkpolicy=np1", "Namespace only": "?namespace=ns1", "Pod and NetworkPolicy": "?pod=pod1&&networkpolicy=np1", @@ -77,7 +84,6 @@ func TestPodFlows(t *testing.T) { defer ctrl.Finish() testInterface := &interfacestore.InterfaceConfig{InterfaceName: "interface0"} - testcases := []testCase{ { test: "Existing Pod", @@ -108,7 +114,7 @@ func TestPodFlows(t *testing.T) { q.EXPECT().GetOpenflowClient().Return(ofc).Times(1) q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys)) for i := range testFlowKeys { - ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpResults[i], nil).Times(1) + ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpFlows[i], nil).Times(1) } } else { i.EXPECT().GetContainerInterfacesByPod(tc.name, tc.namespace).Return(nil).Times(1) @@ -118,12 +124,56 @@ func TestPodFlows(t *testing.T) { } } +func TestServiceFlows(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + testcases := []testCase{ + { + test: "Existing Service", + name: "svc1", + namespace: "ns1", + query: "?service=svc1&&namespace=ns1", + expectedStatus: http.StatusOK, + dumpGroups: true, + }, + { + test: "Non-existing Service", + name: "svc2", + namespace: "ns2", + query: "?service=svc2&&namespace=ns2", + expectedStatus: http.StatusNotFound, + }, + } + for i := range testcases { + tc := testcases[i] + p := proxytest.NewMockProxier(ctrl) + q := aqtest.NewMockAgentQuerier(ctrl) + q.EXPECT().GetProxier().Return(p).Times(1) + + if tc.expectedStatus != http.StatusNotFound { + ovsctl := ovsctltest.NewMockOVSCtlClient(ctrl) + p.EXPECT().GetServiceFlowKeys(tc.name, tc.namespace).Return(testFlowKeys, testGroupIDs).Times(1) + q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys) + len(testGroupIDs)) + for i, f := range testFlowKeys { + ovsctl.EXPECT().DumpMatchedFlow(f).Return(testDumpFlows[i], nil).Times(1) + } + for i, g := range testGroupIDs { + ovsctl.EXPECT().DumpGroup(int(g)).Return(testDumpGroups[i], nil).Times(1) + } + } else { + p.EXPECT().GetServiceFlowKeys(tc.name, tc.namespace).Return(nil, nil).Times(1) + } + + runHTTPTest(t, &tc, q) + } +} + func TestNetworkPolicyFlows(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() testNetworkPolicy := &cpv1beta.NetworkPolicy{} - testcases := []testCase{ { test: "Existing NetworkPolicy", @@ -154,7 +204,7 @@ func TestNetworkPolicyFlows(t *testing.T) { q.EXPECT().GetOpenflowClient().Return(ofc).Times(1) q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys)) for i := range testFlowKeys { - ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpResults[i], nil).Times(1) + ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpFlows[i], nil).Times(1) } } else { npq.EXPECT().GetNetworkPolicies(&querier.NetworkPolicyQueryFilter{SourceName: tc.name, Namespace: tc.namespace}).Return(nil).Times(1) @@ -186,7 +236,7 @@ func TestTableFlows(t *testing.T) { ovsctl := ovsctltest.NewMockOVSCtlClient(ctrl) q := aqtest.NewMockAgentQuerier(ctrl) q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(1) - ovsctl.EXPECT().DumpTableFlows(gomock.Any()).Return(testDumpResults, nil).Times(1) + ovsctl.EXPECT().DumpTableFlows(gomock.Any()).Return(testDumpFlows, nil).Times(1) runHTTPTest(t, &tc, q) } @@ -206,6 +256,10 @@ func runHTTPTest(t *testing.T, tc *testCase, aq agentquerier.AgentQuerier) { var received []Response err = json.Unmarshal(recorder.Body.Bytes(), &received) assert.Nil(t, err) - assert.Equal(t, testResponses, received) + if tc.dumpGroups { + assert.Equal(t, append(testResponses, testGroupResponses...), received) + } else { + assert.Equal(t, testResponses, received) + } } } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 4848b8682b8..c4d8564a01f 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -98,7 +98,7 @@ type Client interface { // InstallEndpointFlows installs flows for accessing Endpoints. // If an Endpoint is on the current Node, then flows for hairpin and endpoint // L2 forwarding should also be installed. - InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint, isIPv6 bool) error + InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error // UninstallEndpointFlows removes flows of the Endpoint installed by // InstallEndpointFlows. UninstallEndpointFlows(protocol binding.Protocol, endpoint proxy.Endpoint) error @@ -180,6 +180,10 @@ type Client interface { // Pod. GetPodFlowKeys(interfaceName string) []string + // GetServiceFlowKeys returns the keys (match strings) of the cached + // flows for a Service (port) and its endpoints. + GetServiceFlowKeys(svcIP net.IP, svcPort uint16, protocol binding.Protocol, endpoints []proxy.Endpoint) []string + // GetNetworkPolicyFlowKeys returns the keys (match strings) of the cached // flows for a NetworkPolicy. Flows are grouped by policy rules, and duplicated // entries can be added due to conjunctive match flows shared by multiple @@ -368,14 +372,14 @@ func (c *client) UninstallPodFlows(interfaceName string) error { return c.deleteFlows(c.podFlowCache, interfaceName) } -func (c *client) GetPodFlowKeys(interfaceName string) []string { - fCacheI, ok := c.podFlowCache.Load(interfaceName) +func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string) []string { + fCacheI, ok := cache.Load(cacheKey) if !ok { return nil } - fCache := fCacheI.(flowCache) flowKeys := make([]string, 0, len(fCache)) + // ReplayFlows() could change Flow internal state. Although its current // implementation does not impact Flow match string generation, we still // acquire read lock of replayMutex here for logic cleanliness. @@ -387,6 +391,10 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string { return flowKeys } +func (c *client) GetPodFlowKeys(interfaceName string) []string { + return c.getFlowKeysFromCache(c.podFlowCache, interfaceName) +} + func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() @@ -408,20 +416,24 @@ func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error { return nil } -func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint, isIPv6 bool) error { +func generateEndpointFlowCacheKey(endpointIP string, endpointPort int, protocol binding.Protocol) string { + return fmt.Sprintf("Endpoints_%s_%d_%s", endpointIP, endpointPort, protocol) +} + +func generateServicePortFlowCacheKey(svcIP net.IP, svcPort uint16, protocol binding.Protocol) string { + return fmt.Sprintf("Service_%s_%d_%s", svcIP, svcPort, protocol) +} + +func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - parser := func(ipStr string) net.IP { return net.ParseIP(ipStr).To4() } - if isIPv6 { - parser = func(ipStr string) net.IP { return net.ParseIP(ipStr).To16() } - } for _, endpoint := range endpoints { var flows []binding.Flow endpointPort, _ := endpoint.Port() - endpointIP := parser(endpoint.IP()) + endpointIP := net.ParseIP(endpoint.IP()) portVal := portToUint16(endpointPort) - cacheKey := fmt.Sprintf("Endpoints_%s_%d_%s", endpointIP, endpointPort, protocol) + cacheKey := generateEndpointFlowCacheKey(endpoint.IP(), endpointPort, protocol) flows = append(flows, c.endpointDNATFlow(endpointIP, portVal, protocol)) if endpoint.GetIsLocal() { flows = append(flows, c.hairpinSNATFlow(endpointIP)) @@ -441,7 +453,7 @@ func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint prox if err != nil { return fmt.Errorf("error when getting port: %w", err) } - cacheKey := fmt.Sprintf("Endpoints_%s_%d_%s", endpoint.IP(), port, protocol) + cacheKey := generateEndpointFlowCacheKey(endpoint.IP(), port, protocol) return c.deleteFlows(c.serviceFlowCache, cacheKey) } @@ -453,14 +465,14 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, if affinityTimeout != 0 { flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout)) } - cacheKey := fmt.Sprintf("Service_%s_%d_%s", svcIP, svcPort, protocol) + cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol) return c.addFlows(c.serviceFlowCache, cacheKey, flows) } func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - cacheKey := fmt.Sprintf("Service_%s_%d_%s", svcIP, svcPort, protocol) + cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol) return c.deleteFlows(c.serviceFlowCache, cacheKey) } @@ -480,6 +492,17 @@ func (c *client) UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcP return c.deleteFlows(c.serviceFlowCache, cacheKey) } +func (c *client) GetServiceFlowKeys(svcIP net.IP, svcPort uint16, protocol binding.Protocol, endpoints []proxy.Endpoint) []string { + cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol) + flowKeys := c.getFlowKeysFromCache(c.serviceFlowCache, cacheKey) + for _, ep := range endpoints { + epPort, _ := ep.Port() + cacheKey = generateEndpointFlowCacheKey(ep.IP(), epPort, protocol) + flowKeys = append(flowKeys, c.getFlowKeysFromCache(c.serviceFlowCache, cacheKey)...) + } + return flowKeys +} + func (c *client) InstallClusterServiceFlows() error { flows := []binding.Flow{ c.serviceNeedLBFlow(), diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index a87a5004a81..b7f8e1e41ba 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -180,6 +180,20 @@ func (mr *MockClientMockRecorder) GetPolicyInfoFromConjunction(arg0 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPolicyInfoFromConjunction", reflect.TypeOf((*MockClient)(nil).GetPolicyInfoFromConjunction), arg0) } +// GetServiceFlowKeys mocks base method +func (m *MockClient) GetServiceFlowKeys(arg0 net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 []proxy.Endpoint) []string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetServiceFlowKeys", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].([]string) + return ret0 +} + +// GetServiceFlowKeys indicates an expected call of GetServiceFlowKeys +func (mr *MockClientMockRecorder) GetServiceFlowKeys(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceFlowKeys", reflect.TypeOf((*MockClient)(nil).GetServiceFlowKeys), arg0, arg1, arg2, arg3) +} + // GetTunnelVirtualMAC mocks base method func (m *MockClient) GetTunnelVirtualMAC() net.HardwareAddr { m.ctrl.T.Helper() @@ -280,17 +294,17 @@ func (mr *MockClientMockRecorder) InstallDefaultTunnelFlows() *gomock.Call { } // InstallEndpointFlows mocks base method -func (m *MockClient) InstallEndpointFlows(arg0 openflow.Protocol, arg1 []proxy.Endpoint, arg2 bool) error { +func (m *MockClient) InstallEndpointFlows(arg0 openflow.Protocol, arg1 []proxy.Endpoint) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallEndpointFlows", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "InstallEndpointFlows", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // InstallEndpointFlows indicates an expected call of InstallEndpointFlows -func (mr *MockClientMockRecorder) InstallEndpointFlows(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallEndpointFlows(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).InstallEndpointFlows), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).InstallEndpointFlows), arg0, arg1) } // InstallExternalFlows mocks base method diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 744f5134e68..b54e6ff2f25 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/runtime" + k8sapitypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" "k8s.io/klog" @@ -31,7 +32,6 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/openflow" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" - "github.com/vmware-tanzu/antrea/pkg/agent/querier" "github.com/vmware-tanzu/antrea/pkg/features" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" @@ -43,6 +43,18 @@ const ( componentName = "antrea-agent-proxy" ) +// Proxier wraps proxy.Provider and adds extra methods. It is introduced for +// extending the proxy.Provider implementations with extra methods, without +// modifying the proxy.Provider interface. +type Proxier interface { + // GetProxyProvider returns the real proxy Provider. + GetProxyProvider() k8sproxy.Provider + // GetServiceFlowKeys returns the keys (match strings) of the cached OVS + // flows and the OVS group IDs for a Service. Nil is returned if the + // Service is not found. + GetServiceFlowKeys(serviceName, namespace string) ([]string, []binding.GroupIDType) +} + type proxier struct { once sync.Once endpointSliceConfig *config.EndpointSliceConfig @@ -62,6 +74,10 @@ type proxier struct { endpointsMap types.EndpointsMap // endpointsInstalledMap stores endpoints we actually installed. endpointsInstalledMap types.EndpointsMap + // serviceEndpointsMapsMutex protects serviceMap, serviceInstalledMap, + // endpointsMap, and endpointsInstalledMap, which can be read by + // GetServiceFlowKeys() called by the "/ovsflows" API handler. + serviceEndpointsMapsMutex sync.Mutex // endpointReferenceCounter stores the number of times an Endpoint is referenced by Services. endpointReferenceCounter map[string]int // groupCounter is used to allocate groupID. @@ -73,7 +89,6 @@ type proxier struct { runner *k8sproxy.BoundedFrequencyRunner stopChan <-chan struct{} - agentQuerier querier.AgentQuerier ofClient openflow.Client isIPv6 bool enableEndpointSlice bool @@ -217,8 +232,8 @@ func (p *proxier) installServices() { groupID, _ := p.groupCounter.Get(svcPortName) endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName] if !ok { - p.endpointsInstalledMap[svcPortName] = map[string]k8sproxy.Endpoint{} - endpointsInstalled = p.endpointsInstalledMap[svcPortName] + endpointsInstalled = map[string]k8sproxy.Endpoint{} + p.endpointsInstalledMap[svcPortName] = endpointsInstalled } endpoints := p.endpointsMap[svcPortName] // If both expected Endpoints number and installed Endpoints number are 0, we don't need to take care of this Service. @@ -274,7 +289,7 @@ func (p *proxier) installServices() { } if needUpdateEndpoints { - err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, endpointUpdateList, p.isIPv6) + err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, endpointUpdateList) if err != nil { klog.Errorf("Error when installing Endpoints flows: %v", err) continue @@ -345,11 +360,17 @@ func (p *proxier) installServices() { // syncProxyRules applies current changes in change trackers and then updates // flows for services and endpoints. It will return immediately if either // endpoints or services resources are not synced. syncProxyRules is only called -// through the Run method of the runner object, and all calls are -// serialized. Since this method is the only one accessing internal state -// (e.g. serviceMap), no synchronization mechanism, such as a mutex, is -// required. +// through the Run method of the runner object, and all calls are serialized. +// This method is the only one that changes internal state, but +// GetServiceFlowKeys(), which is called by the the "/ovsflows" API handler, +// also reads service and endpoints maps, so serviceEndpointsMapsMutex is used +// to protect these two maps. func (p *proxier) syncProxyRules() { + if !p.isInitialized() { + klog.V(4).Info("Not syncing rules until both Services and Endpoints have been synced") + return + } + start := time.Now() defer func() { delta := time.Since(start) @@ -360,11 +381,11 @@ func (p *proxier) syncProxyRules() { } klog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() - if !p.isInitialized() { - klog.V(4).Info("Not syncing rules until both Services and Endpoints have been synced") - return - } + // Protect Service and endpoints maps, which can be read by + // GetServiceFlowKeys(). + p.serviceEndpointsMapsMutex.Lock() + defer p.serviceEndpointsMapsMutex.Unlock() p.endpointsChanges.Update(p.endpointsMap) p.serviceChanges.Update(p.serviceMap) @@ -509,6 +530,55 @@ func (p *proxier) Run(stopCh <-chan struct{}) { }) } +func (p *proxier) GetProxyProvider() k8sproxy.Provider { + // Return myself. + return p +} + +func (p *proxier) GetServiceFlowKeys(serviceName, namespace string) ([]string, []binding.GroupIDType) { + var flows []string + var groups []binding.GroupIDType + namespacedName := k8sapitypes.NamespacedName{Namespace: namespace, Name: serviceName} + + p.serviceEndpointsMapsMutex.Lock() + defer p.serviceEndpointsMapsMutex.Unlock() + + for svcPortName := range p.serviceMap { + if namespacedName != svcPortName.NamespacedName { + continue + } + if flows == nil { + // The Service is found. nil should not be returned. + flows = []string{} + groups = []binding.GroupIDType{} + } + + installedSvcPort, ok := p.serviceInstalledMap[svcPortName] + if !ok { + // Service flows not installed. + continue + } + svcInfo := installedSvcPort.(*types.ServiceInfo) + + var epList []k8sproxy.Endpoint + endpoints, ok := p.endpointsMap[svcPortName] + if ok && len(endpoints) > 0 { + epList = make([]k8sproxy.Endpoint, 0, len(endpoints)) + for _, ep := range endpoints { + epList = append(epList, ep) + } + } + + svcFlows := p.ofClient.GetServiceFlowKeys(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, epList) + flows = append(flows, svcFlows...) + + groupID, _ := p.groupCounter.Get(svcPortName) + groups = append(groups, groupID) + } + + return flows, groups +} + func NewProxier( hostname string, informerFactory informers.SharedInformerFactory, @@ -553,8 +623,35 @@ func NewProxier( return p } +// metaProxierWrapper wraps metaProxier, and implements the extra methods added +// in interface Proxier. +type metaProxierWrapper struct { + ipv4Proxier *proxier + ipv6Proxier *proxier + metaProxier k8sproxy.Provider +} + +func (p *metaProxierWrapper) GetProxyProvider() k8sproxy.Provider { + return p.metaProxier +} + +func (p *metaProxierWrapper) GetServiceFlowKeys(serviceName, namespace string) ([]string, []binding.GroupIDType) { + v4Flows, v4Groups := p.ipv4Proxier.GetServiceFlowKeys(serviceName, namespace) + v6Flows, v6Groups := p.ipv6Proxier.GetServiceFlowKeys(serviceName, namespace) + + if v4Flows == nil { + if v6Flows == nil { + return nil, nil + } + // Not to return nil, even v6Flows is empty. + v4Flows = make([]string, 0, len(v6Flows)) + } + // Return the unions of IPv4 and IPv6 flows and groups. + return append(v4Flows, v6Flows...), append(v4Groups, v6Groups...) +} + func NewDualStackProxier( - hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) k8sproxy.Provider { + hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) *metaProxierWrapper { // Create an ipv4 instance of the single-stack proxier ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false) @@ -562,8 +659,9 @@ func NewDualStackProxier( // Create an ipv6 instance of the single-stack proxier ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true) - // Return a meta-proxier that dispatch calls between the two - // single-stack proxier instances + // Create a meta-proxier that dispatch calls between the two + // single-stack proxier instances. metaProxier := k8sproxy.NewMetaProxier(ipv4Proxier, ipv6Proxier) - return metaProxier + + return &metaProxierWrapper{ipv4Proxier, ipv6Proxier, metaProxier} } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 4f37b7dfbb9..e9c0113a6c7 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -147,7 +147,7 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { if isIPv6 { bindingProtocol = binding.ProtocolTCPv6 } - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) fp.syncProxyRules() @@ -200,7 +200,7 @@ func TestLoadbalancer(t *testing.T) { groupID, _ := fp.groupCounter.Get(svcPortName) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any(), false).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) mockOFClient.EXPECT().InstallLoadBalancerServiceFromOutsideFlows(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() @@ -259,7 +259,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) makeEndpointsMap(fp, ep) groupID, _ := fp.groupCounter.Get(svcPortName) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) @@ -389,8 +389,8 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP groupIDUDP, _ := fp.groupCounter.Get(svcPortNameUDP) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(2) - mockOFClient.EXPECT().InstallEndpointFlows(protocolTCP, gomock.Any(), isIPv6).Times(1) - mockOFClient.EXPECT().InstallEndpointFlows(protocolUDP, gomock.Any(), isIPv6).Times(2) + mockOFClient.EXPECT().InstallEndpointFlows(protocolTCP, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(protocolUDP, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), protocolTCP, uint16(0)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDUDP, svcIP, uint16(svcPort), protocolUDP, uint16(0)).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) @@ -450,7 +450,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv makeEndpointsMap(fp, ep) groupID, _ := fp.groupCounter.Get(svcPortName) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(2) - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(2) + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) fp.syncProxyRules() @@ -521,7 +521,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne } groupID, _ := fp.groupCounter.Get(svcPortName) mockOFClient.EXPECT().InstallServiceGroup(groupID, true, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(corev1.DefaultClientIPServiceAffinitySeconds)).Times(1) fp.syncProxyRules() @@ -627,7 +627,7 @@ func testPortChange(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { makeEndpointsMap(fp, ep) groupID, _ := fp.groupCounter.Get(svcPortName) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort1), bindingProtocol, uint16(0)) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort1), bindingProtocol) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort2), bindingProtocol, uint16(0)) @@ -715,7 +715,7 @@ func TestServicesWithSameEndpoints(t *testing.T) { mockOFClient.EXPECT().InstallServiceGroup(groupID1, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID2, false, gomock.Any()).Times(1) bindingProtocol := binding.ProtocolTCP - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), false).Times(2) + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallServiceFlows(groupID1, svcIP1, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP1, uint16(svcPort), bindingProtocol).Times(1) diff --git a/pkg/agent/proxy/testing/mock_proxy.go b/pkg/agent/proxy/testing/mock_proxy.go new file mode 100644 index 00000000000..a3bc4003ba5 --- /dev/null +++ b/pkg/agent/proxy/testing/mock_proxy.go @@ -0,0 +1,79 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/vmware-tanzu/antrea/pkg/agent/proxy (interfaces: Proxier) + +// Package testing is a generated GoMock package. +package testing + +import ( + gomock "github.com/golang/mock/gomock" + openflow "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" + proxy "github.com/vmware-tanzu/antrea/third_party/proxy" + reflect "reflect" +) + +// MockProxier is a mock of Proxier interface +type MockProxier struct { + ctrl *gomock.Controller + recorder *MockProxierMockRecorder +} + +// MockProxierMockRecorder is the mock recorder for MockProxier +type MockProxierMockRecorder struct { + mock *MockProxier +} + +// NewMockProxier creates a new mock instance +func NewMockProxier(ctrl *gomock.Controller) *MockProxier { + mock := &MockProxier{ctrl: ctrl} + mock.recorder = &MockProxierMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockProxier) EXPECT() *MockProxierMockRecorder { + return m.recorder +} + +// GetProxyProvider mocks base method +func (m *MockProxier) GetProxyProvider() proxy.Provider { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetProxyProvider") + ret0, _ := ret[0].(proxy.Provider) + return ret0 +} + +// GetProxyProvider indicates an expected call of GetProxyProvider +func (mr *MockProxierMockRecorder) GetProxyProvider() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProxyProvider", reflect.TypeOf((*MockProxier)(nil).GetProxyProvider)) +} + +// GetServiceFlowKeys mocks base method +func (m *MockProxier) GetServiceFlowKeys(arg0, arg1 string) ([]string, []openflow.GroupIDType) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetServiceFlowKeys", arg0, arg1) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].([]openflow.GroupIDType) + return ret0, ret1 +} + +// GetServiceFlowKeys indicates an expected call of GetServiceFlowKeys +func (mr *MockProxierMockRecorder) GetServiceFlowKeys(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceFlowKeys", reflect.TypeOf((*MockProxier)(nil).GetServiceFlowKeys), arg0, arg1) +} diff --git a/pkg/agent/querier/querier.go b/pkg/agent/querier/querier.go index 45025b42e21..1ae546f802e 100644 --- a/pkg/agent/querier/querier.go +++ b/pkg/agent/querier/querier.go @@ -25,6 +25,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/config" "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" "github.com/vmware-tanzu/antrea/pkg/agent/openflow" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy" "github.com/vmware-tanzu/antrea/pkg/apis/clusterinformation/v1beta1" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl" @@ -41,6 +42,7 @@ type AgentQuerier interface { GetAgentInfo(agentInfo *v1beta1.AntreaAgentInfo, partial bool) GetOpenflowClient() openflow.Client GetOVSCtlClient() ovsctl.OVSCtlClient + GetProxier() proxy.Proxier GetNetworkPolicyInfoQuerier() querier.AgentNetworkPolicyInfoQuerier } @@ -51,6 +53,7 @@ type agentQuerier struct { k8sClient clientset.Interface ofClient openflow.Client ovsBridgeClient ovsconfig.OVSBridgeClient + proxier proxy.Proxier networkPolicyInfoQuerier querier.AgentNetworkPolicyInfoQuerier apiPort int } @@ -62,6 +65,7 @@ func NewAgentQuerier( k8sClient clientset.Interface, ofClient openflow.Client, ovsBridgeClient ovsconfig.OVSBridgeClient, + proxier proxy.Proxier, networkPolicyInfoQuerier querier.AgentNetworkPolicyInfoQuerier, apiPort int, ) *agentQuerier { @@ -72,6 +76,7 @@ func NewAgentQuerier( k8sClient: k8sClient, ofClient: ofClient, ovsBridgeClient: ovsBridgeClient, + proxier: proxier, networkPolicyInfoQuerier: networkPolicyInfoQuerier, apiPort: apiPort} } @@ -106,6 +111,11 @@ func (aq *agentQuerier) GetOVSCtlClient() ovsctl.OVSCtlClient { return ovsctl.NewClient(aq.nodeConfig.OVSBridge) } +// GetProxier returns proxy.Proxier. +func (aq *agentQuerier) GetProxier() proxy.Proxier { + return aq.proxier +} + // GetNetworkPolicyInfoQuerier returns AgentNetworkPolicyInfoQuerier. func (aq agentQuerier) GetNetworkPolicyInfoQuerier() querier.AgentNetworkPolicyInfoQuerier { return aq.networkPolicyInfoQuerier diff --git a/pkg/agent/querier/testing/mock_querier.go b/pkg/agent/querier/testing/mock_querier.go index 8d97cf19e33..3a77b8f9538 100644 --- a/pkg/agent/querier/testing/mock_querier.go +++ b/pkg/agent/querier/testing/mock_querier.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import ( config "github.com/vmware-tanzu/antrea/pkg/agent/config" interfacestore "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" openflow "github.com/vmware-tanzu/antrea/pkg/agent/openflow" + proxy "github.com/vmware-tanzu/antrea/pkg/agent/proxy" v1beta1 "github.com/vmware-tanzu/antrea/pkg/apis/clusterinformation/v1beta1" ovsctl "github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl" querier "github.com/vmware-tanzu/antrea/pkg/querier" @@ -163,3 +164,17 @@ func (mr *MockAgentQuerierMockRecorder) GetOpenflowClient() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOpenflowClient", reflect.TypeOf((*MockAgentQuerier)(nil).GetOpenflowClient)) } + +// GetProxier mocks base method +func (m *MockAgentQuerier) GetProxier() proxy.Proxier { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetProxier") + ret0, _ := ret[0].(proxy.Proxier) + return ret0 +} + +// GetProxier indicates an expected call of GetProxier +func (mr *MockAgentQuerierMockRecorder) GetProxier() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProxier", reflect.TypeOf((*MockAgentQuerier)(nil).GetProxier)) +} diff --git a/pkg/antctl/antctl.go b/pkg/antctl/antctl.go index c2352acfdf3..ebb70ce81bf 100644 --- a/pkg/antctl/antctl.go +++ b/pkg/antctl/antctl.go @@ -293,8 +293,10 @@ var CommandList = &commandList{ $ antctl get ovsflows Dump OVS flows of a local Pod $ antctl get ovsflows -p pod1 -n ns1 + Dump OVS flows of a Service + $ antctl get ovsflows -S svc1 -n ns1 Dump OVS flows of a NetworkPolicy - $ antctl get ovsflows --networkpolicy np1 -n ns1 + $ antctl get ovsflows -N np1 -n ns1 Dump OVS flows of a flow Table $ antctl get ovsflows -T IngressRule @@ -314,8 +316,14 @@ var CommandList = &commandList{ shorthand: "p", }, { - name: "networkpolicy", - usage: "NetworkPolicy name. If present, Namespace must be provided.", + name: "service", + usage: "Name of a Service. If present, Namespace must be provided.", + shorthand: "S", + }, + { + name: "networkpolicy", + usage: "NetworkPolicy name. If present, Namespace must be provided.", + shorthand: "N", }, { name: "table", @@ -336,7 +344,7 @@ var CommandList = &commandList{ example: ` Trace an IP packet between two Pods $ antctl trace-packet -S ns1/pod1 -D ns2/pod2 Trace a TCP packet from a local Pod to a Service - $ antctl trace-packet -S ns1/pod1 -D ns2/srv2 -f tcp,tcp_dst=80 + $ antctl trace-packet -S ns1/pod1 -D ns2/svc2 -f tcp,tcp_dst=80 Trace a UDP packet from a Pod to an IP address $ antctl trace-packet -S ns1/pod1 -D 10.1.2.3 -f udp,udp_dst=1234 Trace an IP packet from a Pod to gateway port diff --git a/pkg/ovs/openflow/ofctrl_builder.go b/pkg/ovs/openflow/ofctrl_builder.go index bc6a919c221..77d932d12f2 100644 --- a/pkg/ovs/openflow/ofctrl_builder.go +++ b/pkg/ovs/openflow/ofctrl_builder.go @@ -85,6 +85,8 @@ func (b *ofFlowBuilder) MatchXXReg(regID int, data []byte) FlowBuilder { // MatchRegRange adds match condition for matching data in the target register at specified range. func (b *ofFlowBuilder) MatchRegRange(regID int, data uint32, rng Range) FlowBuilder { + s := fmt.Sprintf("reg%d[%d..%d]=0x%x", regID, rng[0], rng[1], data) + b.matchers = append(b.matchers, s) if rng[0] > 0 { data <<= rng[0] } diff --git a/pkg/ovs/ovsctl/interface.go b/pkg/ovs/ovsctl/interface.go index 2a5693943ef..4226517a640 100644 --- a/pkg/ovs/ovsctl/interface.go +++ b/pkg/ovs/ovsctl/interface.go @@ -39,6 +39,8 @@ type OVSCtlClient interface { DumpMatchedFlow(matchStr string) (string, error) // DumpTableFlows returns all flows in the table. DumpTableFlows(table uint8) ([]string, error) + // DumpGroup returns the OpenFlow group if it exists on the bridge. + DumpGroup(groupID int) (string, error) // DumpGroups returns OpenFlow groups of the bridge. DumpGroups(args ...string) ([][]string, error) // DumpPortsDesc returns OpenFlow ports descriptions of the bridge. diff --git a/pkg/ovs/ovsctl/ofctl.go b/pkg/ovs/ovsctl/ofctl.go index 7aa1e07c885..72fe0290bca 100644 --- a/pkg/ovs/ovsctl/ofctl.go +++ b/pkg/ovs/ovsctl/ofctl.go @@ -18,6 +18,7 @@ import ( "bufio" "bytes" "fmt" + "strconv" "strings" ) @@ -63,6 +64,27 @@ func (c *ovsCtlClient) DumpTableFlows(table uint8) ([]string, error) { return c.DumpFlows(fmt.Sprintf("table=%d", table)) } +func (c *ovsCtlClient) DumpGroup(groupID int) (string, error) { + // There seems a bug in ovs-ofctl that dump-groups always returns all + // the groups when using Openflow13, even the group ID is provided. As + // a workaround, we do not specify Openflow13 to run the command. + groupDump, err := c.runOfctlCmd(false, "dump-groups", strconv.Itoa(groupID)) + if err != nil { + return "", err + } + + scanner := bufio.NewScanner(strings.NewReader(string(groupDump))) + scanner.Split(bufio.ScanLines) + // Skip the first line. + scanner.Scan() + if !scanner.Scan() { + // No group found. + return "", nil + } + // Should have at most one line (group) returned. + return strings.TrimSpace(scanner.Text()), nil +} + func (c *ovsCtlClient) DumpGroups(args ...string) ([][]string, error) { groupsDump, err := c.RunOfctlCmd("dump-groups", args...) if err != nil { @@ -132,9 +154,12 @@ func (c *ovsCtlClient) SetPortNoFlood(ofport int) error { return nil } -func (c *ovsCtlClient) RunOfctlCmd(cmd string, args ...string) ([]byte, error) { - cmdStr := fmt.Sprintf("ovs-ofctl -O Openflow13 %s %s", cmd, c.bridge) +func (c *ovsCtlClient) runOfctlCmd(openflow13 bool, cmd string, args ...string) ([]byte, error) { + cmdStr := fmt.Sprintf("ovs-ofctl %s %s", cmd, c.bridge) cmdStr = cmdStr + " " + strings.Join(args, " ") + if openflow13 { + cmdStr += " -O Openflow13" + } out, err := getOVSCommand(cmdStr).Output() if err != nil { return nil, err @@ -142,6 +167,11 @@ func (c *ovsCtlClient) RunOfctlCmd(cmd string, args ...string) ([]byte, error) { return out, nil } +func (c *ovsCtlClient) RunOfctlCmd(cmd string, args ...string) ([]byte, error) { + // Default to use Openflow13. + return c.runOfctlCmd(true, cmd, args...) +} + // trimFlowStr removes undesirable fields from the flow string. func trimFlowStr(flowStr string) string { return flowStr[strings.Index(flowStr, " table")+1:] @@ -156,9 +186,8 @@ func flowExactMatch(matchStr, flowStr string) bool { if i == 0 { continue } - if strings.HasPrefix(m, "in_port=") { - // in_port can be formatted as port name. - m = "in_port=" + if i := strings.Index(m, "="); i != -1 { + m = m[:i] } if !strings.Contains(matchStr, m) { // The match condition is not included in matchStr. diff --git a/pkg/ovs/ovsctl/testing/mock_ovsctl.go b/pkg/ovs/ovsctl/testing/mock_ovsctl.go index f31d39d70e6..f0f88a34a71 100644 --- a/pkg/ovs/ovsctl/testing/mock_ovsctl.go +++ b/pkg/ovs/ovsctl/testing/mock_ovsctl.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -67,6 +67,21 @@ func (mr *MockOVSCtlClientMockRecorder) DumpFlows(arg0 ...interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DumpFlows", reflect.TypeOf((*MockOVSCtlClient)(nil).DumpFlows), arg0...) } +// DumpGroup mocks base method +func (m *MockOVSCtlClient) DumpGroup(arg0 int) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DumpGroup", arg0) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DumpGroup indicates an expected call of DumpGroup +func (mr *MockOVSCtlClientMockRecorder) DumpGroup(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DumpGroup", reflect.TypeOf((*MockOVSCtlClient)(nil).DumpGroup), arg0) +} + // DumpGroups mocks base method func (m *MockOVSCtlClient) DumpGroups(arg0 ...string) ([][]string, error) { m.ctrl.T.Helper() diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index e5c1e890ab3..452d0c35ed6 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -561,7 +561,7 @@ func TestProxyServiceFlows(t *testing.T) { func installServiceFlows(t *testing.T, gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyMaxAgeSeconds uint16) { groupID := ofconfig.GroupIDType(gid) - err := c.InstallEndpointFlows(svc.protocol, endpointList, false) + err := c.InstallEndpointFlows(svc.protocol, endpointList) assert.NoError(t, err, "no error should return when installing flows for Endpoints") err = c.InstallServiceGroup(groupID, svc.withSessionAffinity, endpointList) assert.NoError(t, err, "no error should return when installing groups for Service") diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index 3b9a80096ce..4ab743d0b3d 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -289,7 +289,7 @@ func TestOFctrlGroup(t *testing.T) { } // Check if the group could be added. require.Nil(t, group.Add()) - groups, err := ovsCtlClient.DumpGroups(brName) + groups, err := ovsCtlClient.DumpGroups() require.Nil(t, err) require.Len(t, groups, 1) dumpedGroup := groups[0] @@ -311,7 +311,7 @@ func TestOFctrlGroup(t *testing.T) { } // Check if the group could be deleted. require.Nil(t, group.Delete()) - groups, err = ovsCtlClient.DumpGroups(brName) + groups, err = ovsCtlClient.DumpGroups() require.Nil(t, err) require.Len(t, groups, 0) })