Skip to content

Commit

Permalink
Support dumping OVS flows for a Service
Browse files Browse the repository at this point in the history
Extend Agent /ovsflows API handler to support returning OVS flows
for a Service.
Extend agent/proxy/proxier to query OVS flow keys and group IDs for a
Service.
Extend ovs/ovsctl/ofctl to support dumping a specific OVS group.
Extend "antctl get ovsflows" command to dump OVS flows for a Service.
  • Loading branch information
jianjuns committed Feb 24, 2021
1 parent f389d2e commit 9464b73
Show file tree
Hide file tree
Showing 19 changed files with 483 additions and 88 deletions.
18 changes: 11 additions & 7 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ func run(o *Options) error {
statsCollector = stats.NewCollector(antreaClientProvider, ofClient, networkPolicyController)
}

var proxier k8sproxy.Provider

var proxier proxy.Proxier
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
Expand Down Expand Up @@ -287,24 +286,25 @@ func run(o *Options) error {
go traceflowController.Run(stopCh)
}

if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
go proxier.GetProxyProvider().Run(stopCh)
}

agentQuerier := querier.NewAgentQuerier(
nodeConfig,
networkConfig,
ifaceStore,
k8sClient,
ofClient,
ovsBridgeClient,
proxier,
networkPolicyController,
o.config.APIPort)

agentMonitor := monitor.NewAgentMonitor(crdClient, agentQuerier)

go agentMonitor.Run(stopCh)

if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
go proxier.Run(stopCh)
}

cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.TLSCipherSuites)
if err != nil {
return fmt.Errorf("error generating Cipher Suite list: %v", err)
Expand Down Expand Up @@ -339,12 +339,16 @@ func run(o *Options) error {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)

var proxyProvider k8sproxy.Provider
if proxier != nil {
proxyProvider = proxier.GetProxyProvider()
}
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
ifaceStore,
v4Enabled,
v6Enabled,
proxier,
proxyProvider,
networkPolicyController,
o.pollInterval)
pollDone := make(chan struct{})
Expand Down
12 changes: 7 additions & 5 deletions docs/antctl.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,15 @@ antctl get podinterface [NAME] [-n NAMESPACE]

Starting from version 0.6.0, Antrea Agent supports dumping Antrea OVS flows. The
`antctl` `get ovsflows` (or `get of`) command can dump all OVS flows, flows
added for a specified Pod, or flows added to realize a specified NetworkPolicy,
or flows in a specified OVS flow table.
added for a specified Pod, or flows added for Service load-balancing of a
specified Service, or flows added to realize a specified NetworkPolicy, or flows
in a specified OVS flow table.

```bash
antctl get ovsflows
antctl get ovsflows -p POD -n NAMESPACE
antctl get ovsflows --networkpolicy NETWORKPOLICY -n NAMESPACE
antctl get ovsflows -S SERVICE -n NAMESPACE
antctl get ovsflows -N NETWORKPOLICY -n NAMESPACE
antctl get ovsflows -T TABLE_A,TABLE_B
antctl get ovsflows -T TABLE_A,TABLE_B_NUM
antctl get ovsflows -T TABLE_A_NUM,TABLE_B_NUM
Expand Down Expand Up @@ -269,7 +271,7 @@ NAMESPACE NAME APPLIED-TO RULES
kube-system kube-dns 160ea6d7-0234-5d1d-8ea0-b703d0aa3b46 1
# Dump OVS flows of NetworkPolicy "kube-dns"
$ antctl get of --networkpolicy kube-dns -n kube-system
$ antctl get of -N kube-dns -n kube-system
FLOW
table=90, n_packets=0, n_bytes=0, priority=190,conj_id=1,ip actions=resubmit(,105)
table=90, n_packets=0, n_bytes=0, priority=200,ip actions=conjunction(1,1/3)
Expand All @@ -293,7 +295,7 @@ few trace-packet command examples.
# Trace an IP packet between two Pods
antctl trace-packet -S ns1/pod1 -D ns2/pod2
# Trace a Service request from a local Pod
antctl trace-packet -S ns1/pod1 -D ns2/srv2 -f "tcp,tcp_dst=80"
antctl trace-packet -S ns1/pod1 -D ns2/svc2 -f "tcp,tcp_dst=80"
# Trace the Service reply packet (assuming "ns2/pod2" is the Service backend Pod)
antctl trace-packet -D ns1/pod1 -S ns2/pod2 -f "tcp,tcp_src=80"
# Trace an IP packet from a Pod to gateway port
Expand Down
13 changes: 7 additions & 6 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,21 @@ $GOPATH/bin/openapi-gen \
# Generate mocks for testing with mockgen.
MOCKGEN_TARGETS=(
"pkg/agent/cniserver/ipam IPAMDriver testing"
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack testing"
"pkg/agent/interfacestore InterfaceStore testing"
"pkg/agent/nodeportlocal/rules PodPortRules testing"
"pkg/agent/openflow Client,OFEntryOperations testing"
"pkg/agent/proxy Proxier"
"pkg/agent/querier AgentQuerier testing"
"pkg/agent/route Interface testing"
"pkg/antctl AntctlClient ."
"pkg/controller/networkpolicy EndpointQuerier testing"
"pkg/controller/querier ControllerQuerier testing"
"pkg/ipfix IPFIXExportingProcess,IPFIXSet,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder testing"
"pkg/ovs/ovsconfig OVSBridgeClient testing"
"pkg/ovs/ovsctl OVSCtlClient testing"
"pkg/agent/querier AgentQuerier testing"
"pkg/controller/networkpolicy EndpointQuerier testing"
"pkg/controller/querier ControllerQuerier testing"
"pkg/querier AgentNetworkPolicyInfoQuerier testing"
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack testing"
"pkg/ipfix IPFIXExportingProcess,IPFIXSet,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing"
"pkg/agent/nodeportlocal/rules PodPortRules testing"
"third_party/proxy Provider testing"
)

Expand Down
43 changes: 41 additions & 2 deletions pkg/agent/apiserver/handlers/ovsflows/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
agentquerier "github.com/vmware-tanzu/antrea/pkg/agent/querier"
"github.com/vmware-tanzu/antrea/pkg/antctl/transform/common"
"github.com/vmware-tanzu/antrea/pkg/features"
binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow"
"github.com/vmware-tanzu/antrea/pkg/querier"
)
Expand Down Expand Up @@ -67,6 +68,21 @@ func dumpFlows(aq agentquerier.AgentQuerier, table binding.TableIDType) ([]Respo
return resps, nil
}

func dumpMatchedGroups(aq agentquerier.AgentQuerier, groupIDs []binding.GroupIDType) ([]Response, error) {
resps := []Response{}
for _, g := range groupIDs {
groupStr, err := aq.GetOVSCtlClient().DumpGroup(int(g))
if err != nil {
klog.Errorf("Failed to dump group %d: %v", g, err)
return nil, err
}
if groupStr != "" {
resps = append(resps, Response{groupStr})
}
}
return resps, nil
}

// nil is returned if the flow table can not be found (the passed table name or
// number is invalid).
func getTableFlows(aq agentquerier.AgentQuerier, table string) ([]Response, error) {
Expand Down Expand Up @@ -107,6 +123,22 @@ func getPodFlows(aq agentquerier.AgentQuerier, podName, namespace string) ([]Res

}

func getServiceFlows(aq agentquerier.AgentQuerier, serviceName, namespace string) ([]Response, error) {
flowKeys, groupIDs := aq.GetProxier().GetServiceFlowKeys(serviceName, namespace)
if flowKeys == nil {
return nil, nil
}
resps, err := dumpMatchedFlows(aq, flowKeys)
if err != nil {
return nil, err
}
groupResps, err := dumpMatchedGroups(aq, groupIDs)
if err != nil {
return nil, err
}
return append(resps, groupResps...), nil
}

func getNetworkPolicyFlows(aq agentquerier.AgentQuerier, npName, namespace string) ([]Response, error) {
if len(aq.GetNetworkPolicyInfoQuerier().GetNetworkPolicies(&querier.NetworkPolicyQueryFilter{SourceName: npName, Namespace: namespace})) == 0 {
// NetworkPolicy not found.
Expand All @@ -123,20 +155,27 @@ func HandleFunc(aq agentquerier.AgentQuerier) http.HandlerFunc {
var err error
var resps []Response
pod := r.URL.Query().Get("pod")
service := r.URL.Query().Get("service")
networkPolicy := r.URL.Query().Get("networkpolicy")
namespace := r.URL.Query().Get("namespace")
table := r.URL.Query().Get("table")

if (pod != "" || networkPolicy != "") && namespace == "" {
if (pod != "" || service != "" || networkPolicy != "") && namespace == "" {
http.Error(w, "namespace must be provided", http.StatusBadRequest)
return
}

if pod == "" && networkPolicy == "" && namespace == "" && table == "" {
if pod == "" && service == "" && networkPolicy == "" && namespace == "" && table == "" {
resps, err = dumpFlows(aq, binding.TableIDAll)
} else if pod != "" {
// Pod Namespace must be provided to dump flows of a Pod.
resps, err = getPodFlows(aq, pod, namespace)
} else if service != "" {
if !features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
http.Error(w, "AntreaProxy is not enabled", http.StatusServiceUnavailable)
return
}
resps, err = getServiceFlows(aq, service, namespace)
} else if networkPolicy != "" {
resps, err = getNetworkPolicyFlows(aq, networkPolicy, namespace)
} else if table != "" {
Expand Down
72 changes: 63 additions & 9 deletions pkg/agent/apiserver/handlers/ovsflows/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
oftest "github.com/vmware-tanzu/antrea/pkg/agent/openflow/testing"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
agentquerier "github.com/vmware-tanzu/antrea/pkg/agent/querier"
aqtest "github.com/vmware-tanzu/antrea/pkg/agent/querier/testing"
cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow"
ovsctltest "github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl/testing"
"github.com/vmware-tanzu/antrea/pkg/querier"
queriertest "github.com/vmware-tanzu/antrea/pkg/querier/testing"
)

var (
testFlowKeys = []string{"flowKey1", "flowKey2"}
testDumpResults = []string{"flow1", "flow2"}
testResponses = []Response{{"flow1"}, {"flow2"}}
testFlowKeys = []string{"flowKey1", "flowKey2"}
testDumpFlows = []string{"flow1", "flow2"}
testGroupIDs = []binding.GroupIDType{1, 2}
testDumpGroups = []string{"group1", "group2"}
testResponses = []Response{{"flow1"}, {"flow2"}}
testGroupResponses = []Response{{"group1"}, {"group2"}}
)

type testCase struct {
Expand All @@ -46,11 +51,13 @@ type testCase struct {
namespace string
query string
expectedStatus int
dumpGroups bool
}

func TestBadRequests(t *testing.T) {
badRequests := map[string]string{
"Pod only": "?pod=pod1",
"Service only": "?service=svc1",
"NetworkPolicy only": "?networkpolicy=np1",
"Namespace only": "?namespace=ns1",
"Pod and NetworkPolicy": "?pod=pod1&&networkpolicy=np1",
Expand All @@ -77,7 +84,6 @@ func TestPodFlows(t *testing.T) {
defer ctrl.Finish()

testInterface := &interfacestore.InterfaceConfig{InterfaceName: "interface0"}

testcases := []testCase{
{
test: "Existing Pod",
Expand Down Expand Up @@ -108,7 +114,7 @@ func TestPodFlows(t *testing.T) {
q.EXPECT().GetOpenflowClient().Return(ofc).Times(1)
q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys))
for i := range testFlowKeys {
ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpResults[i], nil).Times(1)
ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpFlows[i], nil).Times(1)
}
} else {
i.EXPECT().GetContainerInterfacesByPod(tc.name, tc.namespace).Return(nil).Times(1)
Expand All @@ -118,12 +124,56 @@ func TestPodFlows(t *testing.T) {
}
}

func TestServiceFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testcases := []testCase{
{
test: "Existing Service",
name: "svc1",
namespace: "ns1",
query: "?service=svc1&&namespace=ns1",
expectedStatus: http.StatusOK,
dumpGroups: true,
},
{
test: "Non-existing Service",
name: "svc2",
namespace: "ns2",
query: "?service=svc2&&namespace=ns2",
expectedStatus: http.StatusNotFound,
},
}
for i := range testcases {
tc := testcases[i]
p := proxytest.NewMockProxier(ctrl)
q := aqtest.NewMockAgentQuerier(ctrl)
q.EXPECT().GetProxier().Return(p).Times(1)

if tc.expectedStatus != http.StatusNotFound {
ovsctl := ovsctltest.NewMockOVSCtlClient(ctrl)
p.EXPECT().GetServiceFlowKeys(tc.name, tc.namespace).Return(testFlowKeys, testGroupIDs).Times(1)
q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys) + len(testGroupIDs))
for i, f := range testFlowKeys {
ovsctl.EXPECT().DumpMatchedFlow(f).Return(testDumpFlows[i], nil).Times(1)
}
for i, g := range testGroupIDs {
ovsctl.EXPECT().DumpGroup(int(g)).Return(testDumpGroups[i], nil).Times(1)
}
} else {
p.EXPECT().GetServiceFlowKeys(tc.name, tc.namespace).Return(nil, nil).Times(1)
}

runHTTPTest(t, &tc, q)
}
}

func TestNetworkPolicyFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testNetworkPolicy := &cpv1beta.NetworkPolicy{}

testcases := []testCase{
{
test: "Existing NetworkPolicy",
Expand Down Expand Up @@ -154,7 +204,7 @@ func TestNetworkPolicyFlows(t *testing.T) {
q.EXPECT().GetOpenflowClient().Return(ofc).Times(1)
q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys))
for i := range testFlowKeys {
ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpResults[i], nil).Times(1)
ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpFlows[i], nil).Times(1)
}
} else {
npq.EXPECT().GetNetworkPolicies(&querier.NetworkPolicyQueryFilter{SourceName: tc.name, Namespace: tc.namespace}).Return(nil).Times(1)
Expand Down Expand Up @@ -186,7 +236,7 @@ func TestTableFlows(t *testing.T) {
ovsctl := ovsctltest.NewMockOVSCtlClient(ctrl)
q := aqtest.NewMockAgentQuerier(ctrl)
q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(1)
ovsctl.EXPECT().DumpTableFlows(gomock.Any()).Return(testDumpResults, nil).Times(1)
ovsctl.EXPECT().DumpTableFlows(gomock.Any()).Return(testDumpFlows, nil).Times(1)

runHTTPTest(t, &tc, q)
}
Expand All @@ -206,6 +256,10 @@ func runHTTPTest(t *testing.T, tc *testCase, aq agentquerier.AgentQuerier) {
var received []Response
err = json.Unmarshal(recorder.Body.Bytes(), &received)
assert.Nil(t, err)
assert.Equal(t, testResponses, received)
if tc.dumpGroups {
assert.Equal(t, append(testResponses, testGroupResponses...), received)
} else {
assert.Equal(t, testResponses, received)
}
}
}
Loading

0 comments on commit 9464b73

Please sign in to comment.