diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 390858aeba6..6e955ab9aa1 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -628,6 +628,16 @@ spec: name: Destination-IP priority: 10 type: string + - description: Trace live traffic. + jsonPath: .spec.liveTraffic + name: Live-Traffic + priority: 10 + type: boolean + - description: Timeout in seconds. + jsonPath: .spec.timeout + name: Timeout + priority: 10 + type: integer - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -638,15 +648,6 @@ spec: spec: properties: destination: - oneOf: - - required: - - pod - - namespace - - required: - - service - - namespace - - required: - - ip properties: ip: pattern: ^(((([1]?\d)?\d|2[0-4]\d|25[0-5])\.){3}(([1]?\d)?\d|2[0-4]\d|25[0-5]))|([\da-fA-F]{1,4}(\:[\da-fA-F]{1,4}){7})|(([\da-fA-F]{1,4}:){0,5}::([\da-fA-F]{1,4}:){0,5}[\da-fA-F]{1,4})$ @@ -658,6 +659,8 @@ spec: service: type: string type: object + liveTraffic: + type: boolean packet: properties: ipHeader: @@ -719,9 +722,10 @@ spec: - pod - namespace type: object + timeout: + type: integer required: - source - - destination type: object status: properties: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 6f65c402c9b..6ab530e5977 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -628,6 +628,16 @@ spec: name: Destination-IP priority: 10 type: string + - description: Trace live traffic. + jsonPath: .spec.liveTraffic + name: Live-Traffic + priority: 10 + type: boolean + - description: Timeout in seconds. + jsonPath: .spec.timeout + name: Timeout + priority: 10 + type: integer - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -638,15 +648,6 @@ spec: spec: properties: destination: - oneOf: - - required: - - pod - - namespace - - required: - - service - - namespace - - required: - - ip properties: ip: pattern: ^(((([1]?\d)?\d|2[0-4]\d|25[0-5])\.){3}(([1]?\d)?\d|2[0-4]\d|25[0-5]))|([\da-fA-F]{1,4}(\:[\da-fA-F]{1,4}){7})|(([\da-fA-F]{1,4}:){0,5}::([\da-fA-F]{1,4}:){0,5}[\da-fA-F]{1,4})$ @@ -658,6 +659,8 @@ spec: service: type: string type: object + liveTraffic: + type: boolean packet: properties: ipHeader: @@ -719,9 +722,10 @@ spec: - pod - namespace type: object + timeout: + type: integer required: - source - - destination type: object status: properties: diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 4b52f19c649..5e73e1a7359 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -628,6 +628,16 @@ spec: name: Destination-IP priority: 10 type: string + - description: Trace live traffic. + jsonPath: .spec.liveTraffic + name: Live-Traffic + priority: 10 + type: boolean + - description: Timeout in seconds. + jsonPath: .spec.timeout + name: Timeout + priority: 10 + type: integer - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -638,15 +648,6 @@ spec: spec: properties: destination: - oneOf: - - required: - - pod - - namespace - - required: - - service - - namespace - - required: - - ip properties: ip: pattern: ^(((([1]?\d)?\d|2[0-4]\d|25[0-5])\.){3}(([1]?\d)?\d|2[0-4]\d|25[0-5]))|([\da-fA-F]{1,4}(\:[\da-fA-F]{1,4}){7})|(([\da-fA-F]{1,4}:){0,5}::([\da-fA-F]{1,4}:){0,5}[\da-fA-F]{1,4})$ @@ -658,6 +659,8 @@ spec: service: type: string type: object + liveTraffic: + type: boolean packet: properties: ipHeader: @@ -719,9 +722,10 @@ spec: - pod - namespace type: object + timeout: + type: integer required: - source - - destination type: object status: properties: diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 2d0481e0b67..6f1edf35536 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -628,6 +628,16 @@ spec: name: Destination-IP priority: 10 type: string + - description: Trace live traffic. + jsonPath: .spec.liveTraffic + name: Live-Traffic + priority: 10 + type: boolean + - description: Timeout in seconds. + jsonPath: .spec.timeout + name: Timeout + priority: 10 + type: integer - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -638,15 +648,6 @@ spec: spec: properties: destination: - oneOf: - - required: - - pod - - namespace - - required: - - service - - namespace - - required: - - ip properties: ip: pattern: ^(((([1]?\d)?\d|2[0-4]\d|25[0-5])\.){3}(([1]?\d)?\d|2[0-4]\d|25[0-5]))|([\da-fA-F]{1,4}(\:[\da-fA-F]{1,4}){7})|(([\da-fA-F]{1,4}:){0,5}::([\da-fA-F]{1,4}:){0,5}[\da-fA-F]{1,4})$ @@ -658,6 +659,8 @@ spec: service: type: string type: object + liveTraffic: + type: boolean packet: properties: ipHeader: @@ -719,9 +722,10 @@ spec: - pod - namespace type: object + timeout: + type: integer required: - source - - destination type: object status: properties: diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 8e3e40629c7..ce466b487ba 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -628,6 +628,16 @@ spec: name: Destination-IP priority: 10 type: string + - description: Trace live traffic. + jsonPath: .spec.liveTraffic + name: Live-Traffic + priority: 10 + type: boolean + - description: Timeout in seconds. + jsonPath: .spec.timeout + name: Timeout + priority: 10 + type: integer - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -638,15 +648,6 @@ spec: spec: properties: destination: - oneOf: - - required: - - pod - - namespace - - required: - - service - - namespace - - required: - - ip properties: ip: pattern: ^(((([1]?\d)?\d|2[0-4]\d|25[0-5])\.){3}(([1]?\d)?\d|2[0-4]\d|25[0-5]))|([\da-fA-F]{1,4}(\:[\da-fA-F]{1,4}){7})|(([\da-fA-F]{1,4}:){0,5}::([\da-fA-F]{1,4}:){0,5}[\da-fA-F]{1,4})$ @@ -658,6 +659,8 @@ spec: service: type: string type: object + liveTraffic: + type: boolean packet: properties: ipHeader: @@ -719,9 +722,10 @@ spec: - pod - namespace type: object + timeout: + type: integer required: - source - - destination type: object status: properties: diff --git a/build/yamls/base/crds.yml b/build/yamls/base/crds.yml index cdae4d8aeeb..2af13e2ec4c 100644 --- a/build/yamls/base/crds.yml +++ b/build/yamls/base/crds.yml @@ -73,6 +73,16 @@ spec: name: Destination-IP type: string priority: 10 + - jsonPath: .spec.liveTraffic + description: Trace live traffic. + name: Live-Traffic + type: boolean + priority: 10 + - jsonPath: .spec.timeout + description: Timeout in seconds. + name: Timeout + type: integer + priority: 10 - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -86,7 +96,6 @@ spec: type: object required: - source - - destination properties: source: type: object @@ -110,10 +119,6 @@ spec: ip: type: string pattern: ^(((([1]?\d)?\d|2[0-4]\d|25[0-5])\.){3}(([1]?\d)?\d|2[0-4]\d|25[0-5]))|([\da-fA-F]{1,4}(\:[\da-fA-F]{1,4}){7})|(([\da-fA-F]{1,4}:){0,5}::([\da-fA-F]{1,4}:){0,5}[\da-fA-F]{1,4})$ - oneOf: - - required: ["pod", "namespace"] - - required: ["service", "namespace"] - - required: ["ip"] packet: type: object properties: @@ -165,6 +170,10 @@ spec: type: integer flags: type: integer + liveTraffic: + type: boolean + timeout: + type: integer status: type: object properties: diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 101dbcb7075..0e4a01a6751 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -132,7 +132,8 @@ function run_test { fi sleep 1 if $coverage; then - go test -v -timeout=50m github.com/vmware-tanzu/antrea/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --coverage --coverage-dir $ANTREA_COV_DIR + #XXX go test -v -timeout=50m github.com/vmware-tanzu/antrea/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --coverage --coverage-dir $ANTREA_COV_DIR + go test -v -timeout=10m -run TestTraceflowInterNode github.com/vmware-tanzu/antrea/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --coverage --coverage-dir $ANTREA_COV_DIR else go test -v -timeout=45m github.com/vmware-tanzu/antrea/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR fi @@ -141,7 +142,7 @@ function run_test { if [[ "$mode" == "" ]] || [[ "$mode" == "encap" ]]; then echo "======== Test encap mode ==========" - run_test encap "--images \"$COMMON_IMAGES\"" + #XXX run_test encap "--images \"$COMMON_IMAGES\"" fi if [[ "$mode" == "" ]] || [[ "$mode" == "noEncap" ]]; then echo "======== Test noencap mode ==========" @@ -149,7 +150,6 @@ if [[ "$mode" == "" ]] || [[ "$mode" == "noEncap" ]]; then fi if [[ "$mode" == "" ]] || [[ "$mode" == "hybrid" ]]; then echo "======== Test hybrid mode ==========" - run_test hybrid "--subnets \"20.20.20.0/24\" --images \"$COMMON_IMAGES\"" + #XXX run_test hybrid "--subnets \"20.20.20.0/24\" --images \"$COMMON_IMAGES\"" fi exit 0 - diff --git a/pkg/agent/controller/traceflow/packetin.go b/pkg/agent/controller/traceflow/packetin.go index 37373a58491..4a2fd0fce82 100644 --- a/pkg/agent/controller/traceflow/packetin.go +++ b/pkg/agent/controller/traceflow/packetin.go @@ -43,6 +43,7 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error { klog.Errorf("parsePacketIn error: %+v", err) return err } + // Retry when update CRD conflict which caused by multiple agents updating one CRD at same time. err = retry.RetryOnConflict(retry.DefaultRetry, func() error { tf, err := c.traceflowInformer.Lister().Get(oldTf.Name) @@ -89,17 +90,32 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*opsv1alpha1.Tracefl return nil, nil, fmt.Errorf("unsupported traceflow packet Ethertype: %d", pktIn.Data.Ethertype) } - // Get traceflow CRD from cache by data plane tag. - tf, err := c.GetRunningTraceflowCRD(uint8(tag)) + firstPacket := false + c.runningTraceflowsMutex.RLock() + tfState, exists := c.runningTraceflows[tag] + if exists { + firstPacket = !tfState.receivedPacket + tfState.receivedPacket = true + } + c.runningTraceflowsMutex.RUnlock() + if !exists { + return nil, nil, fmt.Errorf("Traceflow for dataplane tag %d not found in cache", pktIn.Data.Ethertype) + } + + if tfState.liveTraffic && firstPacket { + // Uninstall the OVS flows after receiving the first packet, to + // avoid capturing too many matched packets. + c.ofClient.UninstallTraceflowFlows(tag) + } + + tf, err := c.traceflowLister.Get(tfState.name) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to get Traceflow %s CRD: %v", tfState.name, err) } - obs := make([]opsv1alpha1.Observation, 0) - isSender := c.isSender(uint8(tag)) + obs := []opsv1alpha1.Observation{} tableID := pktIn.TableId - - if isSender { + if tfState.isSender { ob := new(opsv1alpha1.Observation) ob.Component = opsv1alpha1.SpoofGuard ob.Action = opsv1alpha1.Forwarded diff --git a/pkg/agent/controller/traceflow/traceflow_controller.go b/pkg/agent/controller/traceflow/traceflow_controller.go index e1a2d0b9b93..1bdd711d7bd 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller.go +++ b/pkg/agent/controller/traceflow/traceflow_controller.go @@ -24,6 +24,7 @@ import ( "time" "github.com/contiv/libOpenflow/protocol" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -43,13 +44,11 @@ import ( opsinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions/ops/v1alpha1" opslisters "github.com/vmware-tanzu/antrea/pkg/client/listers/ops/v1alpha1" "github.com/vmware-tanzu/antrea/pkg/features" + binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig" "github.com/vmware-tanzu/antrea/pkg/querier" ) -type icmpType uint8 -type icmpCode uint8 - const ( controllerName = "AntreaAgentTraceflowController" // Set resyncPeriod to 0 to disable resyncing. @@ -62,12 +61,24 @@ const ( // Seconds delay before injecting packet into OVS. The time of different nodes may not be completely // synchronized, which requires a delay before inject packet. injectPacketDelay = 5 + // ICMP Echo Request type and code. - icmpEchoRequestType icmpType = 8 - icmpv6EchoRequestType icmpType = 128 - icmpEchoRequestCode icmpCode = 0 + icmpEchoRequestType uint8 = 8 + icmpv6EchoRequestType uint8 = 128 + icmpEchoRequestCode uint8 = 0 + + defaultTTL uint8 = 64 ) +type traceflowState struct { + name string + tag uint8 + liveTraffic bool + isSender bool + // Agent received the first Traceflow packet from OVS. + receivedPacket bool +} + // Controller is responsible for setting up Openflow entries and injecting traceflow packet into // the switch for traceflow request. type Controller struct { @@ -87,9 +98,9 @@ type Controller struct { serviceCIDR *net.IPNet // K8s Service ClusterIP CIDR queue workqueue.RateLimitingInterface runningTraceflowsMutex sync.RWMutex - runningTraceflows map[uint8]string // tag->traceflowName if tf.Status.Phase is Running. - injectedTagsMutex sync.RWMutex - injectedTags map[uint8]string // tag->traceflowName if this Node is sender. + // runningTraceflows is a map for storing the running Traceflow state + // with dataplane tag to be the key. + runningTraceflows map[uint8]*traceflowState } // NewTraceflowController instantiates a new Controller object which will process Traceflow @@ -120,8 +131,8 @@ func NewTraceflowController( nodeConfig: nodeConfig, serviceCIDR: serviceCIDR, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "traceflow"), - runningTraceflows: make(map[uint8]string), - injectedTags: make(map[uint8]string)} + runningTraceflows: make(map[uint8]*traceflowState), + } // Add handlers for Traceflow events. traceflowInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -184,7 +195,7 @@ func (c *Controller) updateTraceflow(_, curObj interface{}) { func (c *Controller) deleteTraceflow(old interface{}) { tf := old.(*opsv1alpha1.Traceflow) klog.Infof("Processing Traceflow %s DELETE event", tf.Name) - c.deallocateTag(tf) + c.enqueueTraceflow(tf) } // worker is a long-running function that will continually call the processTraceflowItem function @@ -239,15 +250,19 @@ func (c *Controller) syncTraceflow(traceflowName string) error { tf, err := c.traceflowLister.Get(traceflowName) if err != nil { + if apierrors.IsNotFound(err) { + c.cleanupTraceflow(traceflowName) + return nil + } return err } + switch tf.Status.Phase { case opsv1alpha1.Running: if tf.Status.DataplaneTag != 0 { start := false c.runningTraceflowsMutex.Lock() if _, ok := c.runningTraceflows[tf.Status.DataplaneTag]; !ok { - c.runningTraceflows[tf.Status.DataplaneTag] = tf.Name start = true } c.runningTraceflowsMutex.Unlock() @@ -258,7 +273,7 @@ func (c *Controller) syncTraceflow(traceflowName string) error { klog.Warningf("Invalid data plane tag %d for Traceflow %s", tf.Status.DataplaneTag, tf.Name) } default: - c.deallocateTag(tf) + c.cleanupTraceflow(traceflowName) } return err } @@ -269,28 +284,66 @@ func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) error { err := c.validateTraceflow(tf) defer func() { if err != nil { + c.cleanupTraceflow(tf.Name) c.errorTraceflowCRD(tf, fmt.Sprintf("Node: %s, error: %+v", c.nodeConfig.Name, err)) } }() if err != nil { return err } - // Deploy flow entries for traceflow - klog.V(2).Infof("Deploy flow entries for Traceflow %s", tf.Name) - err = c.ofClient.InstallTraceflowFlows(tf.Status.DataplaneTag) - if err != nil { - return err - } // TODO: let controller compute the source Node, and the source Node can just return an error, // if fails to find the Pod. - // Inject packet if this Node is sender. podInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Source.Pod, tf.Spec.Source.Namespace) - // Skip inject packet if Pod not found in current Node. - if len(podInterfaces) == 0 { - return nil + liveTraffic := tf.Spec.LiveTraffic + isSender := len(podInterfaces) > 0 + + var packet, matchPacket *binding.Packet + var srcOFPort uint32 + if isSender { + packet, err = c.preparePacket(tf, podInterfaces[0]) + if err != nil { + return err + } + srcOFPort = uint32(podInterfaces[0].OFPort) + // On the source Node, trace the first packet of the first + // connection that matches the Traceflow spec. + if liveTraffic { + matchPacket = packet + } + klog.V(2).Infof("Traceflow packet %v", *packet) + klog.Infof("XXX Traceflow packet %v", *packet) + } + + // Store Traceflow to cache. + c.runningTraceflowsMutex.Lock() + tfState := traceflowState{name: tf.Name, tag: tf.Status.DataplaneTag, liveTraffic: tf.Spec.LiveTraffic, isSender: isSender} + c.runningTraceflows[tfState.tag] = &tfState + c.runningTraceflowsMutex.Unlock() + + // Install flow entries for traceflow. + klog.V(2).Infof("Installing flow entries for Traceflow %s", tf.Name) + timeout := tf.Spec.Timeout + if timeout == 0 { + timeout = opsv1alpha1.DefaultTraceflowTimeout + } + err = c.ofClient.InstallTraceflowFlows(tfState.tag, liveTraffic, matchPacket, srcOFPort, timeout) + klog.Infof("XXX Installed Traceflow OVS flows %v, timeout %d", tfState, timeout) + if err != nil { + return err + } + + // Skip packet injection if the source Pod is not found on the local Node. + if !liveTraffic && isSender { + if packet.DestinationMAC == nil { + // If the destination is Service/IP or the packet will + // be sent to remote Node, wait a small period for other + // Nodes. + time.Sleep(time.Duration(injectPacketDelay) * time.Second) + } + klog.V(2).Infof("Injecting packet for Traceflow %s", tf.Name) + err = c.ofClient.SendTraceflowPacket(tfState.tag, packet, srcOFPort, -1) } - err = c.injectPacket(tf) return err } @@ -312,144 +365,171 @@ func (c *Controller) validateTraceflow(tf *opsv1alpha1.Traceflow) error { return nil } -func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error { - podInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Source.Pod, tf.Spec.Source.Namespace) - // Update Traceflow phase to Running. - klog.V(2).Infof("Injecting packet for Traceflow %s", tf.Name) - c.injectedTagsMutex.Lock() - c.injectedTags[tf.Status.DataplaneTag] = tf.Name - c.injectedTagsMutex.Unlock() - - var srcTCPPort, dstTCPPort, srcUDPPort, dstUDPPort, idICMP, sequenceICMP uint16 - var flagsTCP uint8 - - // Calculate destination MAC/IP. - isIPv6 := tf.Spec.Packet.IPv6Header != nil - srcIP := "" - dstMAC := "" - dstIP := tf.Spec.Destination.IP - if isIPv6 { - srcIP = podInterfaces[0].GetIPv6Addr().String() - } else { - srcIP = podInterfaces[0].GetIPv4Addr().String() - } - if err := validateIPVersion(srcIP, isIPv6); err != nil { - return err +func (c *Controller) preparePacket(tf *opsv1alpha1.Traceflow, intf *interfacestore.InterfaceConfig) (*binding.Packet, error) { + liveTraffic := tf.Spec.LiveTraffic + isICMP := false + packet := new(binding.Packet) + packet.IsIPv6 = tf.Spec.Packet.IPv6Header != nil + if !liveTraffic { + if packet.IsIPv6 { + packet.SourceIP = intf.GetIPv6Addr() + if packet.SourceIP == nil { + return nil, errors.New("source Pod does not have an IPv6 address") + } + } else { + packet.SourceIP = intf.GetIPv4Addr() + if packet.SourceIP == nil { + return nil, errors.New("source Pod does not have an IPv4 address") + } + } + packet.SourceMAC = intf.MAC } - if dstIP != "" { - dstPodInterface, hasInterface := c.interfaceStore.GetInterfaceByIP(dstIP) - if hasInterface { - dstMAC = dstPodInterface.MAC.String() + if tf.Spec.Destination.IP != "" { + packet.DestinationIP = net.ParseIP(tf.Spec.Destination.IP) + if packet.DestinationIP == nil { + return nil, errors.New("invalid destination IP address") + } + if !packet.IsIPv6 { + packet.DestinationIP = packet.DestinationIP.To4() + if packet.DestinationIP == nil { + return nil, errors.New("destination IP should be an IPv4 address") + } + } else if packet.DestinationIP.To4() != nil { + return nil, errors.New("destination IP should be an IPv6 address") + } + if !liveTraffic { + dstPodInterface, hasInterface := c.interfaceStore.GetInterfaceByIP(tf.Spec.Destination.IP) + if hasInterface { + packet.DestinationMAC = dstPodInterface.MAC + } } } else if tf.Spec.Destination.Pod != "" { dstPodInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Destination.Pod, tf.Spec.Destination.Namespace) if len(dstPodInterfaces) > 0 { - dstMAC = dstPodInterfaces[0].MAC.String() - if isIPv6 { - dstIP = dstPodInterfaces[0].GetIPv6Addr().String() + if packet.IsIPv6 { + packet.DestinationIP = dstPodInterfaces[0].GetIPv6Addr() } else { - dstIP = dstPodInterfaces[0].GetIPv4Addr().String() + packet.DestinationIP = dstPodInterfaces[0].GetIPv4Addr() + } + if !liveTraffic { + packet.DestinationMAC = dstPodInterfaces[0].MAC } } else { dstPod, err := c.kubeClient.CoreV1().Pods(tf.Spec.Destination.Namespace).Get(context.TODO(), tf.Spec.Destination.Pod, metav1.GetOptions{}) if err != nil { - return err + return nil, fmt.Errorf("failed to get the destination Pod: %v", err) } - // dstMAC is "" here, will be set to Gateway MAC in ofClient.SendTraceflowPacket + // DestinationMAC is nil here, will be set to gateway + // MAC in ofClient.SendTraceflowPacket() podIPs := make([]net.IP, len(dstPod.Status.PodIPs)) for i, ip := range dstPod.Status.PodIPs { podIPs[i] = net.ParseIP(ip.IP) } - if isIPv6 { - ipv6, _ := util.GetIPWithFamily(podIPs, util.FamilyIPv6) - dstIP = ipv6.String() + if packet.IsIPv6 { + packet.DestinationIP, _ = util.GetIPWithFamily(podIPs, util.FamilyIPv6) } else { - dstIP = util.GetIPv4Addr(podIPs).String() + packet.DestinationIP = util.GetIPv4Addr(podIPs) + } + } + if packet.DestinationIP == nil { + if packet.IsIPv6 { + return nil, errors.New("destination Pod does not have an IPv6 address") + } else { + return nil, errors.New("destination Pod does not have an IPv4 address") } } } else if tf.Spec.Destination.Service != "" { dstSvc, err := c.serviceLister.Services(tf.Spec.Destination.Namespace).Get(tf.Spec.Destination.Service) if err != nil { - return err + return nil, fmt.Errorf("failed to get the destination Service: %v", err) + } + if dstSvc.Spec.ClusterIP == "" { + return nil, errors.New("destination Service does not have a ClusterIP") + } + packet.DestinationIP = net.ParseIP(dstSvc.Spec.ClusterIP) + if !packet.IsIPv6 { + packet.DestinationIP = packet.DestinationIP.To4() + if packet.DestinationIP == nil { + return nil, errors.New("destination Service does not have an IPv4 ClusterIP") + } + } else if packet.DestinationIP.To4() != nil { + return nil, errors.New("destination Service does not have an IPv6 ClusterIP") } - dstIP = dstSvc.Spec.ClusterIP - flagsTCP = 2 - } - if err := validateIPVersion(dstIP, isIPv6); err != nil { - return err - } - if dstMAC == "" { - // If the destination is Service/IP or the packet will be sent to remote Node, wait a small period for other Nodes. - time.Sleep(time.Duration(injectPacketDelay) * time.Second) + if !liveTraffic { + // Set the SYN flag. In encap mode, the SYN flag is only required for + // Service traffic, but probably we should always set it. + packet.TCPFlags = 2 + } + } else if !liveTraffic { + return nil, errors.New("destination is not specified") } - var ipProtocol uint8 - var ttl uint8 - var ipFlags uint16 - if isIPv6 { - if tf.Spec.Packet.IPv6Header.NextHeader == nil { - ipProtocol = protocol.Type_IPv6ICMP - } else { - ipProtocol = uint8(*tf.Spec.Packet.IPv6Header.NextHeader) + if tf.Spec.Packet.IPv6Header != nil { + // IP Protocol 0 (IPv6 Hop-by-Hop Option) is not supported by + // Traceflow. If NextHeader is not provided, protocol ICMPv6 + // will be used as the default. + if tf.Spec.Packet.IPv6Header.NextHeader != nil { + packet.IPProto = uint8(*tf.Spec.Packet.IPv6Header.NextHeader) + } + if !liveTraffic { + packet.TTL = uint8(tf.Spec.Packet.IPv6Header.HopLimit) + packet.IPFlags = 0 } - ttl = uint8(tf.Spec.Packet.IPv6Header.HopLimit) - ipFlags = 0 } else { - ipProtocol = uint8(tf.Spec.Packet.IPHeader.Protocol) - // Protocol is 0 (IPv6 Hop-by-Hop Option) if not set in CRD, which is not supported by Traceflow - // Use Protocol=1 (ICMP) as default. - if ipProtocol == 0 { - ipProtocol = protocol.Type_ICMP + packet.IPProto = uint8(tf.Spec.Packet.IPHeader.Protocol) + if !liveTraffic { + packet.TTL = uint8(tf.Spec.Packet.IPHeader.TTL) + packet.IPFlags = uint16(tf.Spec.Packet.IPHeader.Flags) } - ttl = uint8(tf.Spec.Packet.IPHeader.TTL) - ipFlags = uint16(tf.Spec.Packet.IPHeader.Flags) + } + if !liveTraffic && packet.TTL == 0 { + packet.TTL = defaultTTL } + // TCP > UDP > ICMP > other IP protocol. if tf.Spec.Packet.TransportHeader.TCP != nil { - srcTCPPort = uint16(tf.Spec.Packet.TransportHeader.TCP.SrcPort) - dstTCPPort = uint16(tf.Spec.Packet.TransportHeader.TCP.DstPort) + packet.IPProto = protocol.Type_TCP + packet.SourcePort = uint16(tf.Spec.Packet.TransportHeader.TCP.SrcPort) + packet.DestinationPort = uint16(tf.Spec.Packet.TransportHeader.TCP.DstPort) if tf.Spec.Packet.TransportHeader.TCP.Flags != 0 { - flagsTCP = uint8(tf.Spec.Packet.TransportHeader.TCP.Flags) + packet.TCPFlags = uint8(tf.Spec.Packet.TransportHeader.TCP.Flags) + } + } else if tf.Spec.Packet.TransportHeader.UDP != nil { + packet.IPProto = protocol.Type_UDP + packet.SourcePort = uint16(tf.Spec.Packet.TransportHeader.UDP.SrcPort) + packet.DestinationPort = uint16(tf.Spec.Packet.TransportHeader.UDP.DstPort) + } else if tf.Spec.Packet.TransportHeader.ICMP != nil { + isICMP = true + if !liveTraffic { + packet.ICMPEchoID = uint16(tf.Spec.Packet.TransportHeader.ICMP.ID) + packet.ICMPEchoSeq = uint16(tf.Spec.Packet.TransportHeader.ICMP.Sequence) } - } - if tf.Spec.Packet.TransportHeader.UDP != nil { - srcUDPPort = uint16(tf.Spec.Packet.TransportHeader.UDP.SrcPort) - dstUDPPort = uint16(tf.Spec.Packet.TransportHeader.UDP.DstPort) - } - if tf.Spec.Packet.TransportHeader.ICMP != nil { - idICMP = uint16(tf.Spec.Packet.TransportHeader.ICMP.ID) - sequenceICMP = uint16(tf.Spec.Packet.TransportHeader.ICMP.Sequence) } - var packetOutIcmpEchoRequestType icmpType - if isIPv6 { - packetOutIcmpEchoRequestType = icmpv6EchoRequestType - } else { - packetOutIcmpEchoRequestType = icmpEchoRequestType + if packet.IPProto == 0 || packet.IPProto == protocol.Type_ICMP || packet.IPProto == protocol.Type_IPv6ICMP { + // IPProto defaults to ICMP. + isICMP = true + } + if isICMP { + if packet.IsIPv6 { + packet.IPProto = protocol.Type_IPv6ICMP + if !liveTraffic { + packet.ICMPType = icmpv6EchoRequestType + } + } else { + packet.IPProto = protocol.Type_ICMP + if !liveTraffic { + packet.ICMPType = icmpEchoRequestType + } + } + if !liveTraffic { + packet.ICMPCode = icmpEchoRequestCode + } } - return c.ofClient.SendTraceflowPacket( - tf.Status.DataplaneTag, - podInterfaces[0].MAC.String(), - dstMAC, - srcIP, - dstIP, - ipProtocol, - ttl, - ipFlags, - srcTCPPort, - dstTCPPort, - flagsTCP, - srcUDPPort, - dstUDPPort, - uint8(packetOutIcmpEchoRequestType), - uint8(icmpEchoRequestCode), - idICMP, - sequenceICMP, - uint32(podInterfaces[0].OFPort), - -1) + return packet, nil } func (c *Controller) errorTraceflowCRD(tf *opsv1alpha1.Traceflow, reason string) (*opsv1alpha1.Traceflow, error) { @@ -463,68 +543,30 @@ func (c *Controller) errorTraceflowCRD(tf *opsv1alpha1.Traceflow, reason string) return c.traceflowClient.OpsV1alpha1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status") } -// Deallocate tag from cache. -func (c *Controller) deallocateTag(tf *opsv1alpha1.Traceflow) { - dataplaneTag := uint8(0) +// Delete Traceflow from cache. +func (c *Controller) deleteTraceflowState(tfName string) *traceflowState { c.runningTraceflowsMutex.Lock() + defer c.runningTraceflowsMutex.Unlock() // Controller could have deallocated the tag and cleared the DataplaneTag // field in the Traceflow Status, so try looking up the tag from the // cache by Traceflow name. - for tag, existingTraceflowName := range c.runningTraceflows { - if tf.Name == existingTraceflowName { + for tag, tfState := range c.runningTraceflows { + if tfName == tfState.name { delete(c.runningTraceflows, tag) - dataplaneTag = tag - break + return tfState } } - c.runningTraceflowsMutex.Unlock() - if dataplaneTag == 0 { - return - } - c.injectedTagsMutex.Lock() - if existingTraceflowName, ok := c.injectedTags[dataplaneTag]; ok { - if tf.Name == existingTraceflowName { - delete(c.injectedTags, dataplaneTag) - } else { - klog.Warningf("runningTraceflows cache mismatch tag: %d name: %s existingName: %s", - dataplaneTag, tf.Name, existingTraceflowName) - } - } - c.injectedTagsMutex.Unlock() -} - -func (c *Controller) isSender(tag uint8) bool { - c.injectedTagsMutex.RLock() - defer c.injectedTagsMutex.RUnlock() - if _, ok := c.injectedTags[tag]; ok { - return true - } - return false -} - -// getTraceflowCRD gets traceflow CRD by data plane tag. -func (c *Controller) GetRunningTraceflowCRD(tag uint8) (*opsv1alpha1.Traceflow, error) { - c.runningTraceflowsMutex.RLock() - defer c.runningTraceflowsMutex.RUnlock() - if traceflowName, ok := c.runningTraceflows[tag]; ok { - return c.traceflowLister.Get(traceflowName) - } - return nil, errors.New(fmt.Sprintf("traceflow with the data plane tag %d doesn't exist", tag)) + return nil } -func validateIPVersion(ip string, isIPv6 bool) error { - parsedIP := net.ParseIP(ip) - if parsedIP == nil { - return errors.New(fmt.Sprintf("invalid ip string %s", ip)) - } - if isIPv6 { - if parsedIP.To4() != nil { - return errors.New(fmt.Sprintf("expect IPv6, but got IPv4 %s", ip)) - } - } else { - if parsedIP.To4() == nil { - return errors.New(fmt.Sprintf("expect IPv4, but got IPv6 %s", ip)) +// Delete Traceflow state and OVS flows. +func (c *Controller) cleanupTraceflow(tfName string) { + tfState := c.deleteTraceflowState(tfName) + if tfState != nil { + err := c.ofClient.UninstallTraceflowFlows(tfState.tag) + klog.Infof("XXX uninstalled Traceflow flows %v", tfState) + if err != nil { + klog.Errorf("Failed to uninstall Traceflow %s flows: %v", tfName, err) } } - return nil } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 82f0e19a84e..7ea6413e20b 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -15,13 +15,13 @@ package openflow import ( - "errors" "fmt" "math/rand" "net" "github.com/contiv/libOpenflow/protocol" "github.com/contiv/ofnet/ofctrl" + "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" @@ -224,29 +224,13 @@ type Client interface { SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error // SendTraceflowPacket injects packet to specified OVS port for Openflow. - SendTraceflowPacket( - dataplaneTag uint8, - srcMAC string, - dstMAC string, - srcIP string, - dstIP string, - ipProtocol uint8, - ttl uint8, - ipFlags uint16, - tcpSrcPort uint16, - tcpDstPort uint16, - tcpFlags uint8, - udpSrcPort uint16, - udpDstPort uint16, - icmpType uint8, - icmpCode uint8, - icmpID uint16, - icmpSequence uint16, - inPort uint32, - outPort int32) error + SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, inPort uint32, outPort int32) error - // InstallTraceflowFlows installs flows for specific traceflow request. - InstallTraceflowFlows(dataplaneTag uint8) error + // InstallTraceflowFlows installs flows for a Traceflow request. + InstallTraceflowFlows(dataplaneTag uint8, liveTraffic bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error + + // UninstallTraceflowFlows uninstalls flows for a Traceflow request. + UninstallTraceflowFlows(dataplaneTag uint8) error // Initial tun_metadata0 in TLV map for Traceflow. InitialTLVMap() error @@ -813,108 +797,57 @@ func (c *client) SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error return c.bridge.SubscribePacketIn(reason, ch) } -func (c *client) SendTraceflowPacket( - dataplaneTag uint8, - srcMAC string, - dstMAC string, - srcIP string, - dstIP string, - IPProtocol uint8, - ttl uint8, - ipFlags uint16, - tcpSrcPort uint16, - tcpDstPort uint16, - tcpFlags uint8, - udpSrcPort uint16, - udpDstPort uint16, - icmpType uint8, - icmpCode uint8, - icmpID uint16, - icmpSequence uint16, - inPort uint32, - outPort int32) error { - +func (c *client) SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, inPort uint32, outPort int32) error { packetOutBuilder := c.bridge.BuildPacketOut() - parsedSrcMAC, err := net.ParseMAC(srcMAC) - if err != nil { - return err - } - var parsedDstMAC net.HardwareAddr - if dstMAC == "" { - parsedDstMAC = c.nodeConfig.GatewayConfig.MAC - } else { - parsedDstMAC, err = net.ParseMAC(dstMAC) - if err != nil { - return err - } - } - parsedSrcIP := net.ParseIP(srcIP) - parsedDstIP := net.ParseIP(dstIP) - if parsedSrcIP == nil || parsedDstIP == nil { - return errors.New("invalid IP") - } - isIPv6 := parsedSrcIP.To4() == nil - if isIPv6 != (parsedDstIP.To4() == nil) { - return errors.New("IP version mismatch") + if packet.DestinationMAC == nil { + packet.DestinationMAC = c.nodeConfig.GatewayConfig.MAC } - // Set ethernet header - packetOutBuilder = packetOutBuilder.SetSrcMAC(parsedSrcMAC) - packetOutBuilder = packetOutBuilder.SetDstMAC(parsedDstMAC) + packetOutBuilder = packetOutBuilder.SetDstMAC(packet.DestinationMAC).SetSrcMAC(packet.SourceMAC) + // Set IP header - packetOutBuilder = packetOutBuilder.SetSrcIP(parsedSrcIP) - packetOutBuilder = packetOutBuilder.SetDstIP(parsedDstIP) - if ttl == 0 { - packetOutBuilder = packetOutBuilder.SetTTL(128) - } else { - packetOutBuilder = packetOutBuilder.SetTTL(ttl) - } - if !isIPv6 { - packetOutBuilder = packetOutBuilder.SetIPFlags(ipFlags) + packetOutBuilder = packetOutBuilder.SetDstIP(packet.DestinationIP).SetSrcIP(packet.SourceIP).SetTTL(packet.TTL) + if !packet.IsIPv6 { + packetOutBuilder = packetOutBuilder.SetIPFlags(packet.IPFlags) } // Set transport header - switch IPProtocol { - case protocol.Type_ICMP: - if isIPv6 { - return errors.New("cannot set protocol ICMP in IPv6 packet") - } - packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolICMP) - packetOutBuilder = packetOutBuilder.SetICMPType(icmpType) - packetOutBuilder = packetOutBuilder.SetICMPCode(icmpCode) - packetOutBuilder = packetOutBuilder.SetICMPID(icmpID) - packetOutBuilder = packetOutBuilder.SetICMPSequence(icmpSequence) - case protocol.Type_IPv6ICMP: - if !isIPv6 { - return errors.New("cannot set protocol ICMPv6 in IPv4 packet") + switch packet.IPProto { + case protocol.Type_ICMP, protocol.Type_IPv6ICMP: + if packet.IPProto == protocol.Type_ICMP { + packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolICMP) + } else { + packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolICMPv6) } - packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolICMPv6) - packetOutBuilder = packetOutBuilder.SetICMPType(icmpType) - packetOutBuilder = packetOutBuilder.SetICMPCode(icmpCode) - packetOutBuilder = packetOutBuilder.SetICMPID(icmpID) - packetOutBuilder = packetOutBuilder.SetICMPSequence(icmpSequence) + packetOutBuilder = packetOutBuilder.SetICMPType(packet.ICMPType). + SetICMPCode(packet.ICMPCode). + SetICMPID(packet.ICMPEchoID). + SetICMPSequence(packet.ICMPEchoSeq) case protocol.Type_TCP: - if isIPv6 { + if packet.IsIPv6 { packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolTCPv6) } else { packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolTCP) } + tcpSrcPort := packet.SourcePort if tcpSrcPort == 0 { // #nosec G404: random number generator not used for security purposes. tcpSrcPort = uint16(rand.Uint32()) } - packetOutBuilder = packetOutBuilder.SetTCPSrcPort(tcpSrcPort) - packetOutBuilder = packetOutBuilder.SetTCPDstPort(tcpDstPort) - packetOutBuilder = packetOutBuilder.SetTCPFlags(tcpFlags) + packetOutBuilder = packetOutBuilder.SetTCPDstPort(packet.DestinationPort). + SetTCPSrcPort(tcpSrcPort). + SetTCPFlags(packet.TCPFlags) case protocol.Type_UDP: - if isIPv6 { + if packet.IsIPv6 { packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolUDPv6) } else { packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolUDP) } - packetOutBuilder = packetOutBuilder.SetUDPSrcPort(udpSrcPort) - packetOutBuilder = packetOutBuilder.SetUDPDstPort(udpDstPort) + packetOutBuilder = packetOutBuilder.SetUDPDstPort(packet.DestinationPort). + SetUDPSrcPort(packet.SourcePort) + default: + packetOutBuilder = packetOutBuilder.SetIPProtocolValue(packet.IsIPv6, packet.IPProto) } packetOutBuilder = packetOutBuilder.SetInport(inPort) @@ -927,12 +860,18 @@ func (c *client) SendTraceflowPacket( return c.bridge.SendPacketOut(packetOutObj) } -func (c *client) InstallTraceflowFlows(dataplaneTag uint8) error { +func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error { + cacheKey := fmt.Sprintf("%x", dataplaneTag) flows := []binding.Flow{} - flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, cookie.Default)...) - flows = append(flows, c.traceflowConnectionTrackFlows(dataplaneTag, cookie.Default)...) - flows = append(flows, c.traceflowNetworkPolicyFlows(dataplaneTag, cookie.Default)...) - return c.AddAll(flows) + flows = append(flows, c.traceflowConnectionTrackFlows(dataplaneTag, packet, srcOFPort, timeoutSeconds, cookie.Default)...) + flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, liveTraffic, timeoutSeconds, cookie.Default)...) + flows = append(flows, c.traceflowNetworkPolicyFlows(dataplaneTag, timeoutSeconds, cookie.Default)...) + return c.addFlows(c.tfFlowCache, cacheKey, flows) +} + +func (c *client) UninstallTraceflowFlows(dataplaneTag uint8) error { + cacheKey := fmt.Sprintf("%x", dataplaneTag) + return c.deleteFlows(c.tfFlowCache, cacheKey) } // Add TLV map optClass 0x0104, optType 0x80 optLength 4 tunMetadataIndex 0 to store data plane tag diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index fa7a7b1e516..a4bb9b4b293 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -31,7 +31,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/config" "github.com/vmware-tanzu/antrea/pkg/agent/openflow/cookie" oftest "github.com/vmware-tanzu/antrea/pkg/agent/openflow/testing" - ofconfig "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" + binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" ovsoftest "github.com/vmware-tanzu/antrea/pkg/ovs/openflow/testing" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig" ) @@ -39,7 +39,7 @@ import ( const bridgeName = "dummy-br" var ( - bridgeMgmtAddr = ofconfig.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeName) + bridgeMgmtAddr = binding.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeName) gwMAC, _ = net.ParseMAC("AA:BB:CC:DD:EE:EE") gwIP, ipNet, _ = net.ParseCIDR("10.0.1.1/24") gwIPv6, _, _ = net.ParseCIDR("f00d::b00:0:0:0/80") @@ -280,7 +280,7 @@ func Test_client_InstallTraceflowFlows(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() c := tt.prepareFunc(ctrl) - if err := c.InstallTraceflowFlows(tt.args.dataplaneTag); (err != nil) != tt.wantErr { + if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, nil, 0, 300); (err != nil) != tt.wantErr { t.Errorf("InstallTraceflowFlows() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -290,168 +290,95 @@ func Test_client_InstallTraceflowFlows(t *testing.T) { func Test_client_SendTraceflowPacket(t *testing.T) { type args struct { dataplaneTag uint8 - srcMAC string - dstMAC string - srcIP string - dstIP string - IPProtocol uint8 - ttl uint8 - IPFlags uint16 - TCPSrcPort uint16 - TCPDstPort uint16 - TCPFlags uint8 - UDPSrcPort uint16 - UDPDstPort uint16 - ICMPType uint8 - ICMPCode uint8 - ICMPID uint16 - ICMPSequence uint16 - inPort uint32 - outPort int32 + binding.Packet + inPort uint32 + outPort int32 } + srcMAC, _ := net.ParseMAC("11:22:33:44:55:66") + dstMAC, _ := net.ParseMAC("11:22:33:44:55:77") tests := []struct { name string args args wantErr bool }{ - { - name: "err noSrcMAC", - args: args{}, - wantErr: true, - }, - { - name: "err invalidDstMAC", - args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "invalidMAC", - }, - wantErr: true, - }, - { - name: "err noIP", - args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "11:22:33:44:55:77", - }, - wantErr: true, - }, - { - name: "err IPVersionMismatch", - args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "11:22:33:44:55:77", - srcIP: "1.2.3.4", - dstIP: "1111::5555", - }, - wantErr: true, - }, - { - name: "IPv4 ICMP", - args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "11:22:33:44:55:77", - srcIP: "1.2.3.4", - dstIP: "1.2.3.5", - ttl: 64, - IPProtocol: 1, - }, - wantErr: false, - }, - { - name: "IPv4 ICMP invalid", - args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "", - srcIP: "1.2.3.4", - dstIP: "1.2.3.5", - ttl: 64, - IPProtocol: 58, - outPort: -1, - }, - wantErr: true, - }, { name: "IPv4 ICMP", args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "", - srcIP: "1.2.3.4", - dstIP: "1.2.3.5", - ttl: 64, - IPProtocol: 1, - outPort: -1, + Packet: binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.5"), + IPProto: 1, + TTL: 64, + }, }, - wantErr: false, }, { name: "IPv4 TCP", args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "11:22:33:44:55:77", - srcIP: "1.2.3.4", - dstIP: "1.2.3.5", - IPProtocol: 6, + Packet: binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.5"), + IPProto: 6, + TTL: 64, + }, }, - wantErr: false, }, { name: "IPv4 UDP", args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "11:22:33:44:55:77", - srcIP: "1.2.3.4", - dstIP: "1.2.3.5", - IPProtocol: 17, + Packet: binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.5"), + IPProto: 17, + TTL: 64, + }, }, - wantErr: false, - }, - { - name: "IPv6 ICMP invalid", - args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "", - srcIP: "1111::4444", - dstIP: "1111::5555", - ttl: 64, - IPProtocol: 1, - outPort: -1, - }, - wantErr: true, }, { name: "IPv6 ICMPv6", args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "", - srcIP: "1111::4444", - dstIP: "1111::5555", - ttl: 64, - IPProtocol: 58, - outPort: -1, + Packet: binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1111::4444"), + DestinationIP: net.ParseIP("1111::5555"), + IPProto: 58, + TTL: 64, + }, + outPort: -1, }, - wantErr: false, }, { name: "IPv6 TCP", args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "11:22:33:44:55:77", - srcIP: "1111::4444", - dstIP: "1111::5555", - IPProtocol: 6, + Packet: binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1111::4444"), + DestinationIP: net.ParseIP("1111::5555"), + IPProto: 6, + TTL: 64, + }, }, - wantErr: false, }, { name: "IPv6 UDP", args: args{ - srcMAC: "11:22:33:44:55:66", - dstMAC: "11:22:33:44:55:77", - srcIP: "1111::4444", - dstIP: "1111::5555", - IPProtocol: 17, + Packet: binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1111::4444"), + DestinationIP: net.ParseIP("1111::5555"), + IPProto: 17, + TTL: 64, + }, }, - wantErr: false, }, } for _, tt := range tests { @@ -459,7 +386,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() c := prepareSendTraceflowPacket(ctrl, !tt.wantErr) - if err := c.SendTraceflowPacket(tt.args.dataplaneTag, tt.args.srcMAC, tt.args.dstMAC, tt.args.srcIP, tt.args.dstIP, tt.args.IPProtocol, tt.args.ttl, tt.args.IPFlags, tt.args.TCPSrcPort, tt.args.TCPDstPort, tt.args.TCPFlags, tt.args.UDPSrcPort, tt.args.UDPDstPort, tt.args.ICMPType, tt.args.ICMPCode, tt.args.ICMPID, tt.args.ICMPSequence, tt.args.inPort, tt.args.outPort); (err != nil) != tt.wantErr { + if err := c.SendTraceflowPacket(tt.args.dataplaneTag, &tt.args.Packet, tt.args.inPort, tt.args.outPort); (err != nil) != tt.wantErr { t.Errorf("SendTraceflowPacket() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -477,10 +404,10 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client { mFlow := ovsoftest.NewMockFlow(ctrl) ctx := &conjMatchFlowContext{dropFlow: mFlow} - mFlow.EXPECT().FlowProtocol().Return(ofconfig.Protocol("ip")) + mFlow.EXPECT().FlowProtocol().Return(binding.Protocol("ip")) mFlow.EXPECT().CopyToBuilder(priorityNormal+2, false).Return(c.pipeline[EgressDefaultTable].BuildFlow(priorityNormal + 2)).Times(1) c.globalConjMatchFlowCache["mockContext"] = ctx - c.policyCache.Add(&policyRuleConjunction{metricFlows: []ofconfig.Flow{c.denyRuleMetricFlow(123, false)}}) + c.policyCache.Add(&policyRuleConjunction{metricFlows: []binding.Flow{c.denyRuleMetricFlow(123, false)}}) return c } @@ -490,7 +417,7 @@ func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client { c.nodeConfig = nodeConfig m := ovsoftest.NewMockBridge(ctrl) c.bridge = m - bridge := ofconfig.OFBridge{} + bridge := binding.OFBridge{} m.EXPECT().BuildPacketOut().Return(bridge.BuildPacketOut()).Times(1) if success { m.EXPECT().SendPacketOut(gomock.Any()).Times(1) @@ -577,7 +504,7 @@ func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *clie c := ofClient.(*client) m := ovsoftest.NewMockBridge(ctrl) c.bridge = m - bridge := ofconfig.OFBridge{} + bridge := binding.OFBridge{} m.EXPECT().BuildPacketOut().Return(bridge.BuildPacketOut()).Times(1) if success { m.EXPECT().SendPacketOut(gomock.Any()).Times(1) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 9231d346429..08f1eca0153 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/contiv/libOpenflow/protocol" + "k8s.io/client-go/tools/cache" "k8s.io/klog" @@ -313,9 +315,9 @@ var ( // metricEgressRuleIDRange takes 32..63 range of ct_label to store the egress rule ID. metricEgressRuleIDRange = binding.Range{32, 63} - // traceflowTagToSRange stores dataplaneTag at range 2-7 in ToS field of IP header. - // IPv4/v6 DSCP (bits 2-7) field supports exact match only. - traceflowTagToSRange = binding.Range{2, 7} + // traceflowTagToSRange stores Traceflow dataplane tag to DSCP bits of + // IP header ToS field. + traceflowTagToSRange = binding.IPDSCPToSRange // snatPktMarkRange takes an 8-bit range of pkt_mark to store the ID of // a SNAT IP. The bit range must match SNATIPMarkMask. @@ -361,7 +363,7 @@ type client struct { ingressEntryTable binding.TableIDType pipeline map[binding.TableIDType]binding.Table // Flow caches for corresponding deletions. - nodeFlowCache, podFlowCache, serviceFlowCache, snatFlowCache *flowCategoryCache + nodeFlowCache, podFlowCache, serviceFlowCache, snatFlowCache, tfFlowCache *flowCategoryCache // "fixed" flows installed by the agent after initialization and which do not change during // the lifetime of the client. gatewayFlows, defaultServiceFlows, defaultTunnelFlows, hostNetworkingFlows []binding.Flow @@ -695,42 +697,98 @@ func (c *client) kubeProxyFlows(category cookie.Category) []binding.Flow { // TODO: Use DuplicateToBuilder or integrate this function into original one to avoid unexpected // difference. -// traceflowConnectionTrackFlows generates Traceflow specific flows that bypass the drop flow in -// connectionTrackFlows to avoid unexpected packet drop in Traceflow. It also ensures that if any -// Traceflow packet has ct_state +rpl, it is dropped. This may happen when the Traceflow request -// destination is the Node's IP. -func (c *client) traceflowConnectionTrackFlows(dataplaneTag uint8, category cookie.Category) []binding.Flow { +// traceflowConnectionTrackFlows generates Traceflow specific flows in the +// connectionTrackStateTable. When packet is not provided, the flows bypass the +// drop flow in connectionTrackFlows to avoid unexpected drop of the injected +// Traceflow packet, and to drop any Traceflow packet that has ct_state +rpl, +// which may happen when the Traceflow request destination is the Node's IP. +// When packet is provided, a flow is added to mark the first packet as the +// Traceflow packet, for the first connection that is initiated by srcOFPort +// and matches the provided packet. +func (c *client) traceflowConnectionTrackFlows(dataplaneTag uint8, packet *binding.Packet, srcOFPort uint32, timeout uint16, category cookie.Category) []binding.Flow { connectionTrackStateTable := c.pipeline[conntrackStateTable] var flows []binding.Flow - flowBuilder := connectionTrackStateTable.BuildFlow(priorityLow + 1). - MatchProtocol(binding.ProtocolIP). - MatchIPDscp(dataplaneTag). - SetHardTimeout(300). - Cookie(c.cookieAllocator.Request(category).Raw()) - if c.enableProxy { - flowBuilder = flowBuilder. - Action().ResubmitToTable(sessionAffinityTable). - Action().ResubmitToTable(serviceLBTable) + if packet == nil { + flowBuilder := connectionTrackStateTable.BuildFlow(priorityLow + 1). + MatchProtocol(binding.ProtocolIP). + MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). + Cookie(c.cookieAllocator.Request(category).Raw()) + if c.enableProxy { + flowBuilder = flowBuilder. + Action().ResubmitToTable(sessionAffinityTable). + Action().ResubmitToTable(serviceLBTable) + } else { + flowBuilder = flowBuilder. + Action().ResubmitToTable(connectionTrackStateTable.GetNext()) + } + flows = append(flows, flowBuilder.Done()) + + flows = append(flows, connectionTrackStateTable.BuildFlow(priorityLow+2). + MatchProtocol(binding.ProtocolIP). + MatchIPDSCP(dataplaneTag). + MatchCTStateTrk(true).MatchCTStateRpl(true). + SetHardTimeout(timeout). + Cookie(c.cookieAllocator.Request(category).Raw()). + Action().Drop(). + Done()) } else { - flowBuilder = flowBuilder. - Action().ResubmitToTable(connectionTrackStateTable.GetNext()) - } - flows = append(flows, flowBuilder.Done()) + flowBuilder := connectionTrackStateTable.BuildFlow(priorityLow). + MatchInPort(srcOFPort). + MatchCTStateNew(true).MatchCTStateTrk(true). + SetHardTimeout(timeout). + Cookie(c.cookieAllocator.Request(category).Raw()). + Action().LoadIPDSCP(dataplaneTag) - flows = append(flows, connectionTrackStateTable.BuildFlow(priorityLow+2). - MatchProtocol(binding.ProtocolIP). - MatchIPDscp(dataplaneTag). - MatchCTStateTrk(true).MatchCTStateRpl(true). - SetHardTimeout(300). - Cookie(c.cookieAllocator.Request(category).Raw()). - Action().Drop(). - Done()) + if packet.DestinationIP != nil { + flowBuilder = flowBuilder.MatchDstIP(packet.DestinationIP) + } + // Match transport header + switch packet.IPProto { + case protocol.Type_ICMP: + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolICMP) + case protocol.Type_IPv6ICMP: + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolICMPv6) + case protocol.Type_TCP: + if packet.IsIPv6 { + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolTCPv6) + } else { + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolTCP) + } + case protocol.Type_UDP: + if packet.IsIPv6 { + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolUDPv6) + } else { + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolUDP) + } + default: + flowBuilder = flowBuilder.MatchIPProtocolValue(packet.IsIPv6, packet.IPProto) + + } + if packet.IPProto == protocol.Type_TCP || packet.IPProto == protocol.Type_UDP { + if packet.DestinationPort != 0 { + flowBuilder = flowBuilder.MatchDstPort(packet.DestinationPort, nil) + } + if packet.SourcePort != 0 { + flowBuilder = flowBuilder.MatchSrcPort(packet.SourcePort, nil) + } + } + if c.enableProxy { + flowBuilder = flowBuilder. + Action().ResubmitToTable(sessionAffinityTable). + Action().ResubmitToTable(serviceLBTable) + } else { + flowBuilder = flowBuilder. + Action().ResubmitToTable(connectionTrackStateTable.GetNext()) + } + flows = []binding.Flow{flowBuilder.Done()} + } return flows } -func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, category cookie.Category) []binding.Flow { +func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, timeout uint16, category cookie.Category) []binding.Flow { flows := []binding.Flow{} c.conjMatchFlowLock.Lock() defer c.conjMatchFlowLock.Unlock() @@ -741,15 +799,15 @@ func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, category cookie if ctx.dropFlow.FlowProtocol() == "" { copyFlowBuilderIPv6 := ctx.dropFlow.CopyToBuilder(priorityNormal+2, false) copyFlowBuilderIPv6 = copyFlowBuilderIPv6.MatchProtocol(binding.ProtocolIPv6) - flows = append(flows, copyFlowBuilderIPv6.MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + flows = append(flows, copyFlowBuilderIPv6.MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). Cookie(c.cookieAllocator.Request(category).Raw()). Action().SendToController(uint8(PacketInReasonTF)). Done()) copyFlowBuilder = copyFlowBuilder.MatchProtocol(binding.ProtocolIP) } - flows = append(flows, copyFlowBuilder.MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + flows = append(flows, copyFlowBuilder.MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). Cookie(c.cookieAllocator.Request(category).Raw()). Action().SendToController(uint8(PacketInReasonTF)). Done()) @@ -765,15 +823,15 @@ func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, category cookie if flow.FlowProtocol() == "" { copyFlowBuilderIPv6 := flow.CopyToBuilder(priorityNormal+2, false) copyFlowBuilderIPv6 = copyFlowBuilderIPv6.MatchProtocol(binding.ProtocolIPv6) - flows = append(flows, copyFlowBuilderIPv6.MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + flows = append(flows, copyFlowBuilderIPv6.MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). Cookie(c.cookieAllocator.Request(category).Raw()). Action().SendToController(uint8(PacketInReasonTF)). Done()) copyFlowBuilder = copyFlowBuilder.MatchProtocol(binding.ProtocolIP) } - flows = append(flows, copyFlowBuilder.MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + flows = append(flows, copyFlowBuilder.MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). Cookie(c.cookieAllocator.Request(category).Raw()). Action().SendToController(uint8(PacketInReasonTF)). Done()) @@ -879,41 +937,48 @@ func (c *client) l2ForwardCalcFlow(dstMAC net.HardwareAddr, ofPort uint32, skipI // traceflowL2ForwardOutputFlows generates Traceflow specific flows that outputs traceflow packets // to OVS port and Antrea Agent after L2forwarding calculation. -func (c *client) traceflowL2ForwardOutputFlows(dataplaneTag uint8, category cookie.Category) []binding.Flow { +func (c *client) traceflowL2ForwardOutputFlows(dataplaneTag uint8, liveTraffic bool, timeout uint16, category cookie.Category) []binding.Flow { flows := []binding.Flow{} + l2FwdOutTable := c.pipeline[L2ForwardingOutTable] for _, ipProtocol := range []binding.Protocol{binding.ProtocolIP, binding.ProtocolIPv6} { if c.encapMode.SupportsEncap() { // SendToController and Output if output port is tunnel port. - flows = append(flows, c.pipeline[L2ForwardingOutTable].BuildFlow(priorityNormal+3). + flows = append(flows, l2FwdOutTable.BuildFlow(priorityNormal+3). MatchReg(int(PortCacheReg), config.DefaultTunOFPort). - MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). Action().OutputRegRange(int(PortCacheReg), ofPortRegRange). Action().SendToController(uint8(PacketInReasonTF)). Cookie(c.cookieAllocator.Request(category).Raw()). Done()) - // Only SendToController if output port is local gateway. In encapMode, a - // Traceflow packet going out of the gateway port (i.e. exiting the overlay) - // essentially means that the Traceflow request is complete. - flows = append(flows, c.pipeline[L2ForwardingOutTable].BuildFlow(priorityNormal+2). + // For injected packets, only SendToController if output port is local + // gateway. In encapMode, a Traceflow packet going out of the gateway + // port (i.e. exiting the overlay) essentially means that the Traceflow + // request is complete. + flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+2). MatchReg(int(PortCacheReg), config.HostGatewayOFPort). - MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). Action().SendToController(uint8(PacketInReasonTF)). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done()) + Cookie(c.cookieAllocator.Request(category).Raw()) + if liveTraffic { + // Clear the loaded DSCP bits before output. + flowBuilder = flowBuilder.Action().LoadIPDSCP(0). + Action().OutputRegRange(int(PortCacheReg), ofPortRegRange) + } + flows = append(flows, flowBuilder.Done()) } else { // SendToController and Output if output port is local gateway. Unlike in // encapMode, inter-Node Pod-to-Pod traffic is expected to go out of the // gateway port on the way to its destination. - flows = append(flows, c.pipeline[L2ForwardingOutTable].BuildFlow(priorityNormal+2). + flows = append(flows, l2FwdOutTable.BuildFlow(priorityNormal+2). MatchReg(int(PortCacheReg), config.HostGatewayOFPort). - MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). Action().OutputRegRange(int(PortCacheReg), ofPortRegRange). @@ -927,26 +992,34 @@ func (c *client) traceflowL2ForwardOutputFlows(dataplaneTag uint8, category cook gatewayIP = c.nodeConfig.GatewayConfig.IPv6 } if gatewayIP != nil { - flows = append(flows, c.pipeline[L2ForwardingOutTable].BuildFlow(priorityNormal+3). + flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+3). MatchReg(int(PortCacheReg), config.HostGatewayOFPort). MatchDstIP(gatewayIP). - MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). Action().SendToController(uint8(PacketInReasonTF)). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done()) + Cookie(c.cookieAllocator.Request(category).Raw()) + if liveTraffic { + flowBuilder = flowBuilder.Action().LoadIPDSCP(0). + Action().OutputRegRange(int(PortCacheReg), ofPortRegRange) + } + flows = append(flows, flowBuilder.Done()) } // Only SendToController if output port is Pod port. - flows = append(flows, c.pipeline[L2ForwardingOutTable].BuildFlow(priorityNormal+2). - MatchIPDscp(dataplaneTag). - SetHardTimeout(300). + flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+2). + MatchIPDSCP(dataplaneTag). + SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). Action().SendToController(uint8(PacketInReasonTF)). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done()) + Cookie(c.cookieAllocator.Request(category).Raw()) + if liveTraffic { + flowBuilder = flowBuilder.Action().LoadIPDSCP(0). + Action().OutputRegRange(int(PortCacheReg), ofPortRegRange) + } + flows = append(flows, flowBuilder.Done()) } return flows } @@ -2033,6 +2106,7 @@ func NewClient(bridgeName, mgmtAddr string, ovsDatapathType ovsconfig.OVSDatapat nodeFlowCache: newFlowCategoryCache(), podFlowCache: newFlowCategoryCache(), serviceFlowCache: newFlowCategoryCache(), + tfFlowCache: newFlowCategoryCache(), policyCache: policyCache, groupCache: sync.Map{}, globalConjMatchFlowCache: map[string]*conjMatchFlowContext{}, diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index d9ba62db7a9..5eedd724f02 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -448,17 +448,17 @@ func (mr *MockClientMockRecorder) InstallServiceGroup(arg0, arg1, arg2 interface } // InstallTraceflowFlows mocks base method -func (m *MockClient) InstallTraceflowFlows(arg0 byte) error { +func (m *MockClient) InstallTraceflowFlows(arg0 byte, arg1 bool, arg2 *openflow.Packet, arg3 uint32, arg4 uint16) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallTraceflowFlows", arg0) + ret := m.ctrl.Call(m, "InstallTraceflowFlows", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) return ret0 } // InstallTraceflowFlows indicates an expected call of InstallTraceflowFlows -func (mr *MockClientMockRecorder) InstallTraceflowFlows(arg0 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallTraceflowFlows(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).InstallTraceflowFlows), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).InstallTraceflowFlows), arg0, arg1, arg2, arg3, arg4) } // IsConnected mocks base method @@ -584,17 +584,17 @@ func (mr *MockClientMockRecorder) SendTCPPacketOut(arg0, arg1, arg2, arg3, arg4, } // SendTraceflowPacket mocks base method -func (m *MockClient) SendTraceflowPacket(arg0 byte, arg1, arg2, arg3, arg4 string, arg5, arg6 byte, arg7, arg8, arg9 uint16, arg10 byte, arg11, arg12 uint16, arg13, arg14 byte, arg15, arg16 uint16, arg17 uint32, arg18 int32) error { +func (m *MockClient) SendTraceflowPacket(arg0 byte, arg1 *openflow.Packet, arg2 uint32, arg3 int32) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendTraceflowPacket", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16, arg17, arg18) + ret := m.ctrl.Call(m, "SendTraceflowPacket", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // SendTraceflowPacket indicates an expected call of SendTraceflowPacket -func (mr *MockClientMockRecorder) SendTraceflowPacket(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16, arg17, arg18 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) SendTraceflowPacket(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTraceflowPacket", reflect.TypeOf((*MockClient)(nil).SendTraceflowPacket), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16, arg17, arg18) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTraceflowPacket", reflect.TypeOf((*MockClient)(nil).SendTraceflowPacket), arg0, arg1, arg2, arg3) } // StartPacketInHandler mocks base method @@ -750,6 +750,20 @@ func (mr *MockClientMockRecorder) UninstallServiceGroup(arg0 interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallServiceGroup", reflect.TypeOf((*MockClient)(nil).UninstallServiceGroup), arg0) } +// UninstallTraceflowFlows mocks base method +func (m *MockClient) UninstallTraceflowFlows(arg0 byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallTraceflowFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallTraceflowFlows indicates an expected call of UninstallTraceflowFlows +func (mr *MockClientMockRecorder) UninstallTraceflowFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).UninstallTraceflowFlows), arg0) +} + // MockOFEntryOperations is a mock of OFEntryOperations interface type MockOFEntryOperations struct { ctrl *gomock.Controller diff --git a/pkg/antctl/raw/traceflow/command.go b/pkg/antctl/raw/traceflow/command.go index def1ffbb352..ad29b4f11bd 100644 --- a/pkg/antctl/raw/traceflow/command.go +++ b/pkg/antctl/raw/traceflow/command.go @@ -39,6 +39,8 @@ import ( clientset "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned" ) +const defaultTimeout time.Duration = time.Second * 10 + var ( Command *cobra.Command option = &struct { @@ -46,7 +48,9 @@ var ( destination string outputType string flow string - waiting bool + liveTraffic bool + nowait bool + timeout time.Duration }{} ) @@ -79,8 +83,10 @@ func init() { $antctl traceflow -S busybox0 -D svc0 -f tcp,tcp_dst=80,tcp_flags=2 Start a Traceflow from busybox0 in Namespace ns0 to busybox1 in Namespace ns1, output type is json $antctl traceflow -S ns0/busybox0 -D ns1/busybox1 -o json - Start a Traceflow from busybox0 to busybox1, with TCP header and 80 as destination port + Start a Traceflow from busybox0 to busybox1, with a TCP packet to destination port 80 $antctl traceflow -S busybox0 -D busybox1 -f tcp,tcp_dst=80 + Start a Traceflow for live TCP traffic from busybox0 to TCP port 80, with 1 minute timeout + $antctl traceflow -S busybox0 -f tcp,tcp_dst=80 --live-traffic -t 1m `, RunE: runE, } @@ -88,13 +94,28 @@ func init() { Command.Flags().StringVarP(&option.source, "source", "S", "", "source of the Traceflow: Namespace/Pod or Pod") Command.Flags().StringVarP(&option.destination, "destination", "D", "", "destination of the Traceflow: Namespace/Pod, Pod, Namespace/Service, Service or IP") Command.Flags().StringVarP(&option.outputType, "output", "o", "yaml", "output type: yaml (default), json") - Command.Flags().BoolVarP(&option.waiting, "wait", "", true, "if false, command returns without retrieving results") Command.Flags().StringVarP(&option.flow, "flow", "f", "", "specify the flow (packet headers) of the Traceflow packet, including tcp_src, tcp_dst, tcp_flags, udp_src, udp_dst, ipv6") + Command.Flags().BoolVarP(&option.liveTraffic, "live-traffic", "", false, "if set, the Traceflow will trace the first packet of the matched live traffic flow") + Command.Flags().BoolVarP(&option.nowait, "nowait", "", false, "if set, command returns without retrieving results") } func runE(cmd *cobra.Command, _ []string) error { - if len(option.source) == 0 || len(option.destination) == 0 { - fmt.Println("Please provide source and destination.") + option.timeout, _ = cmd.Flags().GetDuration("timeout") + if option.timeout > time.Hour*12 { + fmt.Println("Timeout cannot be longer than 12 hours") + return nil + } + if option.timeout == 0 { + option.timeout = defaultTimeout + } + + if len(option.source) == 0 { + fmt.Println("Please provide source") + return nil + } + + if !option.liveTraffic && len(option.destination) == 0 { + fmt.Println("Please provide destination") return nil } @@ -127,19 +148,19 @@ func runE(cmd *cobra.Command, _ []string) error { return fmt.Errorf("error when creating Traceflow, is Traceflow feature gate enabled? %w", err) } defer func() { - if option.waiting { + if !option.nowait { if err = client.OpsV1alpha1().Traceflows().Delete(context.TODO(), tf.Name, metav1.DeleteOptions{}); err != nil { klog.Errorf("error when deleting Traceflow: %+v", err) } } }() - if !option.waiting { + if option.nowait { return nil } var res *v1alpha1.Traceflow - err = wait.Poll(1*time.Second, 15*time.Second, func() (bool, error) { + err = wait.Poll(1*time.Second, option.timeout, func() (bool, error) { res, err = client.OpsV1alpha1().Traceflows().Get(context.TODO(), tf.Name, metav1.GetOptions{}) if err != nil { return false, err @@ -180,33 +201,37 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) { } var dst v1alpha1.Destination - dstIP := net.ParseIP(option.destination) - if dstIP != nil { - dst.IP = dstIP.String() - name = getTFName(fmt.Sprintf("%s-%s-to-%s", src.Namespace, src.Pod, dst.IP)) - } else { - var isPod bool - var dest string - var err error - split = strings.Split(option.destination, "/") - if len(split) == 1 { - dst.Namespace = "default" - dest = split[0] - } else if len(split) == 2 && len(split[0]) != 0 && len(split[1]) != 0 { - dst.Namespace = split[0] - dest = split[1] - } else { - return nil, fmt.Errorf("destination should be in the format of Namespace/Pod, Pod, Namespace/Service or Service") - } - if isPod, err = dstIsPod(client, dst.Namespace, dest); err != nil { - return nil, fmt.Errorf("failed to check if destination is Pod or Service: %w", err) - } - if isPod { - dst.Pod = dest + if option.destination != "" { + dstIP := net.ParseIP(option.destination) + if dstIP != nil { + dst.IP = dstIP.String() + name = getTFName(fmt.Sprintf("%s-%s-to-%s", src.Namespace, src.Pod, dst.IP)) } else { - dst.Service = dest + var isPod bool + var dest string + var err error + split = strings.Split(option.destination, "/") + if len(split) == 1 { + dst.Namespace = "default" + dest = split[0] + } else if len(split) == 2 && len(split[0]) != 0 && len(split[1]) != 0 { + dst.Namespace = split[0] + dest = split[1] + } else { + return nil, fmt.Errorf("destination should be in the format of Namespace/Pod, Pod, Namespace/Service or Service") + } + if isPod, err = dstIsPod(client, dst.Namespace, dest); err != nil { + return nil, fmt.Errorf("failed to check if destination is Pod or Service: %w", err) + } + if isPod { + dst.Pod = dest + } else { + dst.Service = dest + } + name = getTFName(fmt.Sprintf("%s-%s-to-%s-%s", src.Namespace, src.Pod, dst.Namespace, dest)) } - name = getTFName(fmt.Sprintf("%s-%s-to-%s-%s", src.Namespace, src.Pod, dst.Namespace, dest)) + } else { + name = getTFName(fmt.Sprintf("%s-%s-to-any", src.Namespace, src.Pod)) } pkt, err := parseFlow() @@ -222,9 +247,10 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) { Source: src, Destination: dst, Packet: *pkt, + Timeout: uint16(option.timeout.Seconds()), + LiveTraffic: option.liveTraffic, }, } - return tf, nil } @@ -248,12 +274,12 @@ func parseFlow() (*v1alpha1.Packet, error) { return nil, fmt.Errorf("error when parsing the flow: %w", err) } - pkt := new(v1alpha1.Packet) + var pkt v1alpha1.Packet + _, isIPv6 := fields["ipv6"] if isIPv6 { pkt.IPv6Header = new(v1alpha1.IPv6Header) } - for k, v := range protocols { if _, ok := fields[k]; ok { if isIPv6 { @@ -293,7 +319,7 @@ func parseFlow() (*v1alpha1.Packet, error) { pkt.TransportHeader.UDP.DstPort = int32(r) } - return pkt, nil + return &pkt, nil } func getPortFields(cleanFlow string) (map[string]int, error) { @@ -322,15 +348,14 @@ func output(tf *v1alpha1.Traceflow) error { Name: tf.Name, Phase: tf.Status.Phase, Source: fmt.Sprintf("%s/%s", tf.Spec.Source.Namespace, tf.Spec.Source.Pod), - Destination: tf.Spec.Destination.IP, NodeResults: tf.Status.Results, } - if len(tf.Spec.Destination.IP) == 0 { - if len(tf.Spec.Destination.Service) != 0 { - r.Destination = fmt.Sprintf("%s/%s", tf.Spec.Destination.Namespace, tf.Spec.Destination.Service) - } else { - r.Destination = fmt.Sprintf("%s/%s", tf.Spec.Destination.Namespace, tf.Spec.Destination.Pod) - } + if len(tf.Spec.Destination.IP) > 0 { + r.Destination = tf.Spec.Destination.IP + } else if len(tf.Spec.Destination.Pod) != 0 { + r.Destination = fmt.Sprintf("%s/%s", tf.Spec.Destination.Namespace, tf.Spec.Destination.Pod) + } else if len(tf.Spec.Destination.Service) != 0 { + r.Destination = fmt.Sprintf("%s/%s", tf.Spec.Destination.Namespace, tf.Spec.Destination.Service) } if option.outputType == "json" { if err := jsonOutput(&r); err != nil { @@ -369,7 +394,7 @@ func jsonOutput(r *Response) error { } func getTFName(prefix string) string { - if !option.waiting { + if option.nowait { return prefix } return fmt.Sprintf("%s-%s", prefix, rand.String(8)) diff --git a/pkg/antctl/raw/traceflow/command_test.go b/pkg/antctl/raw/traceflow/command_test.go index fff23b81f16..05cd9a81cb0 100644 --- a/pkg/antctl/raw/traceflow/command_test.go +++ b/pkg/antctl/raw/traceflow/command_test.go @@ -98,6 +98,7 @@ func TestParseFlow(t *testing.T) { IPHeader: v1alpha1.IPHeader{ Protocol: 1, }, + TransportHeader: v1alpha1.TransportHeader{}, }, }, }, diff --git a/pkg/apis/ops/v1alpha1/types.go b/pkg/apis/ops/v1alpha1/types.go index 6dc68553b24..2c35914d767 100644 --- a/pkg/apis/ops/v1alpha1/types.go +++ b/pkg/apis/ops/v1alpha1/types.go @@ -90,6 +90,9 @@ const ( EtherTypeIPv6 uint16 = 0x86DD ) +// Default timeout in seconds. +const DefaultTraceflowTimeout uint16 = 20 + // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -106,6 +109,13 @@ type TraceflowSpec struct { Source Source `json:"source,omitempty"` Destination Destination `json:"destination,omitempty"` Packet Packet `json:"packet,omitempty"` + // LiveTraffic indicates the Traceflow is to trace the live traffic + // rather than an injected packet, when set to true. The first packet of + // the first connection that matches the packet spec will be traced. + LiveTraffic bool `json:"liveTraffic,omitempty"` + // Timeout specifies the timeout of the Traceflow in seconds. Defaults + // to 20 seconds if not set. + Timeout uint16 `json:"timeout,omitempty"` } // Source describes the source spec of the traceflow. @@ -130,7 +140,7 @@ type Destination struct { // IPHeader describes spec of an IPv4 header. type IPHeader struct { - // SrcIP is the source IP. + // SrcIP is the source IP address. SrcIP string `json:"srcIP,omitempty"` // Protocol is the IP protocol. Protocol int32 `json:"protocol,omitempty"` @@ -142,7 +152,7 @@ type IPHeader struct { // IPv6Header describes spec of an IPv6 header. type IPv6Header struct { - // SrcIP is the source IPv6. + // SrcIP is the source IPv6 address. SrcIP string `json:"srcIP,omitempty"` // NextHeader is the IPv6 protocol. NextHeader *int32 `json:"nextHeader,omitempty"` @@ -185,7 +195,6 @@ type TCPHeader struct { // Packet includes header info. type Packet struct { - // TODO: change type IPHeader to *IPHeader and correct all internal references IPHeader IPHeader `json:"ipHeader,omitempty"` IPv6Header *IPv6Header `json:"ipv6Header,omitempty"` TransportHeader TransportHeader `json:"transportHeader,omitempty"` diff --git a/pkg/apis/ops/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/ops/v1alpha1/zz_generated.deepcopy.go index 4c6d64e6bbb..a76d7776e2a 100644 --- a/pkg/apis/ops/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/ops/v1alpha1/zz_generated.deepcopy.go @@ -1,6 +1,6 @@ // +build !ignore_autogenerated -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/controller/traceflow/controller.go b/pkg/controller/traceflow/controller.go index 046fbd55981..606e3aaa0da 100644 --- a/pkg/controller/traceflow/controller.go +++ b/pkg/controller/traceflow/controller.go @@ -62,12 +62,13 @@ const ( // String set to TraceflowStatus.Reason. traceflowTimeout = "Traceflow timeout" + + // Traceflow timeout period. + defaultTimeoutDuration = time.Second * time.Duration(opsv1alpha1.DefaultTraceflowTimeout) ) var ( - // Traceflow timeout period. - timeoutDuration = 2 * time.Minute - timeoutCheckInterval = timeoutDuration / 2 + timeoutCheckInterval = 10 * time.Second ) // Controller is for traceflow. @@ -311,8 +312,13 @@ func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) error { c.deallocateTagForTF(tf) return c.updateTraceflowStatus(tf, opsv1alpha1.Succeeded, "", 0) } + + timeoutSeconds := int64(tf.Spec.Timeout) + if timeoutSeconds == 0 { + timeoutSeconds = int64(defaultTimeoutDuration.Seconds()) + } // CreationTimestamp is of second accuracy. - if time.Now().Unix() > tf.CreationTimestamp.Unix()+int64(timeoutDuration.Seconds()) { + if time.Now().Unix() > tf.CreationTimestamp.Unix()+timeoutSeconds { c.deallocateTagForTF(tf) return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, traceflowTimeout, 0) } diff --git a/pkg/controller/traceflow/controller_test.go b/pkg/controller/traceflow/controller_test.go index f38b9aefa45..1d874817251 100644 --- a/pkg/controller/traceflow/controller_test.go +++ b/pkg/controller/traceflow/controller_test.go @@ -64,9 +64,8 @@ func newController() *traceflowController { } func TestTraceflow(t *testing.T) { - // Use shorter timeout. - timeoutDuration = 2 * time.Second - timeoutCheckInterval = timeoutDuration / 2 + // Check timeout more frequently. + timeoutCheckInterval = time.Second tfc := newController() stopCh := make(chan struct{}) @@ -84,6 +83,7 @@ func TestTraceflow(t *testing.T) { Spec: ops.TraceflowSpec{ Source: ops.Source{Namespace: "ns1", Pod: "pod1"}, Destination: ops.Destination{Namespace: "ns2", Pod: "pod2"}, + Timeout: 2, // 2 seconds timeout }, } @@ -118,9 +118,9 @@ func TestTraceflow(t *testing.T) { tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{}) res, _ = tfc.waitForTraceflow("tf1", ops.Running, time.Second) assert.NotNil(t, res) - res, _ = tfc.waitForTraceflow("tf1", ops.Failed, timeoutDuration*2) + res, _ = tfc.waitForTraceflow("tf1", ops.Failed, defaultTimeoutDuration*2) assert.NotNil(t, res) - assert.True(t, time.Now().Sub(startTime) >= timeoutDuration) + assert.True(t, time.Now().Sub(startTime) >= time.Second*time.Duration(tf1.Spec.Timeout)) assert.Equal(t, res.Status.Reason, traceflowTimeout) assert.True(t, res.Status.DataplaneTag == 0) assert.Equal(t, numRunningTraceflows(), 0) diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 9f7425cfb81..b4519b51da8 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -78,6 +78,9 @@ const ( DeleteMessage ) +// IPDSCPToSRange stores the DSCP bits in ToS field of IP header. +var IPDSCPToSRange = Range{2, 7} + // Bridge defines operations on an openflow bridge. type Bridge interface { CreateTable(id, next TableIDType, missAction MissActionType) Table @@ -172,6 +175,7 @@ type Action interface { LoadARPOperation(value uint16) FlowBuilder LoadRegRange(regID int, value uint32, to Range) FlowBuilder LoadPktMarkRange(value uint32, to Range) FlowBuilder + LoadIPDSCP(value uint8) FlowBuilder LoadRange(name string, addr uint64, to Range) FlowBuilder Move(from, to string) FlowBuilder MoveRange(fromName, toName string, from, to Range) FlowBuilder @@ -205,6 +209,7 @@ type Action interface { type FlowBuilder interface { MatchPriority(uint16) FlowBuilder MatchProtocol(name Protocol) FlowBuilder + MatchIPProtocolValue(isIPv6 bool, protoValue uint8) FlowBuilder MatchReg(regID int, data uint32) FlowBuilder MatchXXReg(regID int, data []byte) FlowBuilder MatchRegRange(regID int, data uint32, rng Range) FlowBuilder @@ -220,7 +225,7 @@ type FlowBuilder interface { MatchARPSpa(ip net.IP) FlowBuilder MatchARPTpa(ip net.IP) FlowBuilder MatchARPOp(op uint16) FlowBuilder - MatchIPDscp(dscp uint8) FlowBuilder + MatchIPDSCP(dscp uint8) FlowBuilder MatchCTStateNew(isSet bool) FlowBuilder MatchCTStateRel(isSet bool) FlowBuilder MatchCTStateRpl(isSet bool) FlowBuilder @@ -234,6 +239,7 @@ type FlowBuilder interface { MatchPktMark(value uint32, mask *uint32) FlowBuilder MatchConjID(value uint32) FlowBuilder MatchDstPort(port uint16, portMask *uint16) FlowBuilder + MatchSrcPort(port uint16, portMask *uint16) FlowBuilder MatchICMPv6Type(icmp6Type byte) FlowBuilder MatchICMPv6Code(icmp6Code byte) FlowBuilder MatchTunnelDst(dstIP net.IP) FlowBuilder @@ -320,6 +326,7 @@ type PacketOutBuilder interface { SetSrcIP(ip net.IP) PacketOutBuilder SetDstIP(ip net.IP) PacketOutBuilder SetIPProtocol(protocol Protocol) PacketOutBuilder + SetIPProtocolValue(isIPv6 bool, protoValue uint8) PacketOutBuilder SetTTL(ttl uint8) PacketOutBuilder SetIPFlags(flags uint16) PacketOutBuilder SetTCPSrcPort(port uint16) PacketOutBuilder @@ -356,3 +363,21 @@ type PortRange struct { StartPort uint16 EndPort uint16 } + +type Packet struct { + IsIPv6 bool + DestinationMAC net.HardwareAddr + SourceMAC net.HardwareAddr + DestinationIP net.IP + SourceIP net.IP + IPProto uint8 + IPFlags uint16 + TTL uint8 + DestinationPort uint16 + SourcePort uint16 + TCPFlags uint8 + ICMPType uint8 + ICMPCode uint8 + ICMPEchoID uint16 + ICMPEchoSeq uint16 +} diff --git a/pkg/ovs/openflow/ofctrl_action.go b/pkg/ovs/openflow/ofctrl_action.go index 8aaaaf8a3a7..89bb66e711b 100644 --- a/pkg/ovs/openflow/ofctrl_action.go +++ b/pkg/ovs/openflow/ofctrl_action.go @@ -244,6 +244,11 @@ func (a *ofFlowAction) LoadPktMarkRange(value uint32, rng Range) FlowBuilder { return a.LoadRange(NxmFieldPktMark, uint64(value), rng) } +// LoadIPDSCP is an action to load data to IP DSCP bits. +func (a *ofFlowAction) LoadIPDSCP(value uint8) FlowBuilder { + return a.LoadRange(NxmFieldIPToS, uint64(value), IPDSCPToSRange) +} + // Move is an action to copy all data from "fromField" to "toField". Fields with name "fromField" and "fromField" should // have the same data length, otherwise there will be error when realizing the flow on OFSwitch. func (a *ofFlowAction) Move(fromField, toField string) FlowBuilder { diff --git a/pkg/ovs/openflow/ofctrl_builder.go b/pkg/ovs/openflow/ofctrl_builder.go index a3b26551cb2..073a6146441 100644 --- a/pkg/ovs/openflow/ofctrl_builder.go +++ b/pkg/ovs/openflow/ofctrl_builder.go @@ -416,10 +416,10 @@ func (b *ofFlowBuilder) MatchARPOp(op uint16) FlowBuilder { return b } -// MatchIPDscp adds match condition for matching DSCP field in the IP header. Note, OVS use TOS to present DSCP, and +// MatchIPDSCP adds match condition for matching DSCP field in the IP header. Note, OVS use TOS to present DSCP, and // the field name is shown as "nw_tos" with OVS command line, and the value is calculated by shifting the given value // left 2 bits. -func (b *ofFlowBuilder) MatchIPDscp(dscp uint8) FlowBuilder { +func (b *ofFlowBuilder) MatchIPDSCP(dscp uint8) FlowBuilder { b.matchers = append(b.matchers, fmt.Sprintf("nw_tos=%d", dscp<<2)) b.Match.IpDscp = dscp return b @@ -475,6 +475,17 @@ func (b *ofFlowBuilder) MatchProtocol(protocol Protocol) FlowBuilder { return b } +// MatchProtocol adds match condition for IP protocol with the intetger value. +func (b *ofFlowBuilder) MatchIPProtocolValue(isIPv6 bool, protoValue uint8) FlowBuilder { + if isIPv6 { + b.Match.Ethertype = 0x86dd + } else { + b.Match.Ethertype = 0x0800 + } + b.Match.IpProto = protoValue + return b +} + // MatchDstPort adds match condition for matching destination port in transport layer. OVS will match the port exactly // if portMask is nil. func (b *ofFlowBuilder) MatchDstPort(port uint16, portMask *uint16) FlowBuilder { @@ -488,6 +499,19 @@ func (b *ofFlowBuilder) MatchDstPort(port uint16, portMask *uint16) FlowBuilder return b } +// MatchSrcPort adds match condition for matching source port in transport layer. OVS will match the port exactly +// if portMask is nil. +func (b *ofFlowBuilder) MatchSrcPort(port uint16, portMask *uint16) FlowBuilder { + b.Match.SrcPort = port + b.Match.SrcPortMask = portMask + matchStr := fmt.Sprintf("tp_src=0x%x", port) + if portMask != nil { + matchStr = fmt.Sprintf("%s/0x%x", matchStr, portMask) + } + b.matchers = append(b.matchers, matchStr) + return b +} + // MatchCTSrcIP matches the source IPv4 address of the connection tracker original direction tuple. This match requires // a match to valid connection tracking state as a prerequisite, and valid connection tracking state matches include // "+new", "+est", "+rel" and "+trk-inv". diff --git a/pkg/ovs/openflow/ofctrl_packetout.go b/pkg/ovs/openflow/ofctrl_packetout.go index 9dd07de0869..d0daea0b3fc 100644 --- a/pkg/ovs/openflow/ofctrl_packetout.go +++ b/pkg/ovs/openflow/ofctrl_packetout.go @@ -109,6 +109,23 @@ func (b *ofPacketOutBuilder) SetIPProtocol(proto Protocol) PacketOutBuilder { return b } +// ofPacketOutBuilder sets IP protocol in the packet's IP header with the +// intetger protocol value. +func (b *ofPacketOutBuilder) SetIPProtocolValue(isIPv6 bool, protoValue uint8) PacketOutBuilder { + if isIPv6 { + if b.pktOut.IPv6Header == nil { + b.pktOut.IPv6Header = new(protocol.IPv6) + } + b.pktOut.IPv6Header.NextHeader = protoValue + } else { + if b.pktOut.IPHeader == nil { + b.pktOut.IPHeader = new(protocol.IPv4) + } + b.pktOut.IPHeader.Protocol = protoValue + } + return b +} + // SetTTL sets TTL in the packet's IP header. func (b *ofPacketOutBuilder) SetTTL(ttl uint8) PacketOutBuilder { if b.pktOut.IPv6Header == nil { diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index bbc8bc39e95..cd21fcb7076 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -693,6 +693,20 @@ func (mr *MockActionMockRecorder) LoadARPOperation(arg0 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadARPOperation", reflect.TypeOf((*MockAction)(nil).LoadARPOperation), arg0) } +// LoadIPDSCP mocks base method +func (m *MockAction) LoadIPDSCP(arg0 byte) openflow.FlowBuilder { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadIPDSCP", arg0) + ret0, _ := ret[0].(openflow.FlowBuilder) + return ret0 +} + +// LoadIPDSCP indicates an expected call of LoadIPDSCP +func (mr *MockActionMockRecorder) LoadIPDSCP(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadIPDSCP", reflect.TypeOf((*MockAction)(nil).LoadIPDSCP), arg0) +} + // LoadPktMarkRange mocks base method func (m *MockAction) LoadPktMarkRange(arg0 uint32, arg1 openflow.Range) openflow.FlowBuilder { m.ctrl.T.Helper() @@ -1607,18 +1621,32 @@ func (mr *MockFlowBuilderMockRecorder) MatchICMPv6Type(arg0 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchICMPv6Type", reflect.TypeOf((*MockFlowBuilder)(nil).MatchICMPv6Type), arg0) } -// MatchIPDscp mocks base method -func (m *MockFlowBuilder) MatchIPDscp(arg0 byte) openflow.FlowBuilder { +// MatchIPDSCP mocks base method +func (m *MockFlowBuilder) MatchIPDSCP(arg0 byte) openflow.FlowBuilder { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MatchIPDSCP", arg0) + ret0, _ := ret[0].(openflow.FlowBuilder) + return ret0 +} + +// MatchIPDSCP indicates an expected call of MatchIPDSCP +func (mr *MockFlowBuilderMockRecorder) MatchIPDSCP(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchIPDSCP", reflect.TypeOf((*MockFlowBuilder)(nil).MatchIPDSCP), arg0) +} + +// MatchIPProtocolValue mocks base method +func (m *MockFlowBuilder) MatchIPProtocolValue(arg0 bool, arg1 byte) openflow.FlowBuilder { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MatchIPDscp", arg0) + ret := m.ctrl.Call(m, "MatchIPProtocolValue", arg0, arg1) ret0, _ := ret[0].(openflow.FlowBuilder) return ret0 } -// MatchIPDscp indicates an expected call of MatchIPDscp -func (mr *MockFlowBuilderMockRecorder) MatchIPDscp(arg0 interface{}) *gomock.Call { +// MatchIPProtocolValue indicates an expected call of MatchIPProtocolValue +func (mr *MockFlowBuilderMockRecorder) MatchIPProtocolValue(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchIPDscp", reflect.TypeOf((*MockFlowBuilder)(nil).MatchIPDscp), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchIPProtocolValue", reflect.TypeOf((*MockFlowBuilder)(nil).MatchIPProtocolValue), arg0, arg1) } // MatchInPort mocks base method @@ -1747,6 +1775,20 @@ func (mr *MockFlowBuilderMockRecorder) MatchSrcMAC(arg0 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchSrcMAC", reflect.TypeOf((*MockFlowBuilder)(nil).MatchSrcMAC), arg0) } +// MatchSrcPort mocks base method +func (m *MockFlowBuilder) MatchSrcPort(arg0 uint16, arg1 *uint16) openflow.FlowBuilder { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MatchSrcPort", arg0, arg1) + ret0, _ := ret[0].(openflow.FlowBuilder) + return ret0 +} + +// MatchSrcPort indicates an expected call of MatchSrcPort +func (mr *MockFlowBuilderMockRecorder) MatchSrcPort(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchSrcPort", reflect.TypeOf((*MockFlowBuilder)(nil).MatchSrcPort), arg0, arg1) +} + // MatchTunMetadata mocks base method func (m *MockFlowBuilder) MatchTunMetadata(arg0 int, arg1 uint32) openflow.FlowBuilder { m.ctrl.T.Helper() diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index baab1e87e19..22032228315 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -17,6 +17,7 @@ package e2e import ( "context" "fmt" + "net" "strings" "testing" "time" @@ -475,6 +476,47 @@ func TestTraceflowIntraNode(t *testing.T) { }, expectedPhase: v1alpha1.Failed, }, + { + name: "intraNodeICMPDstIPLiveTraceflowIPv4", + ipVersion: 4, + tf: &v1alpha1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-", testNamespace, node1Pods[0], dstPodIPv4Str)), + }, + Spec: v1alpha1.TraceflowSpec{ + Source: v1alpha1.Source{ + Namespace: testNamespace, + Pod: node1Pods[0], + }, + Destination: v1alpha1.Destination{ + IP: dstPodIPv4Str, + }, + LiveTraffic: true, + }, + }, + expectedPhase: v1alpha1.Succeeded, + expectedResults: []v1alpha1.NodeResult{ + { + Node: node1, + Observations: []v1alpha1.Observation{ + { + Component: v1alpha1.SpoofGuard, + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.NetworkPolicy, + ComponentInfo: "EgressRule", + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Output", + Action: v1alpha1.Delivered, + }, + }, + }, + }, + }, { name: "intraNodeTraceflowIPv6", ipVersion: 6, @@ -698,6 +740,47 @@ func TestTraceflowIntraNode(t *testing.T) { }, expectedPhase: v1alpha1.Failed, }, + { + name: "intraNodeICMPDstIPLiveTraceflowIPv6", + ipVersion: 6, + tf: &v1alpha1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-", testNamespace, node1Pods[0], dstPodIPv6Str)), + }, + Spec: v1alpha1.TraceflowSpec{ + Source: v1alpha1.Source{ + Namespace: testNamespace, + Pod: node1Pods[0], + }, + Destination: v1alpha1.Destination{ + IP: dstPodIPv6Str, + }, + LiveTraffic: true, + }, + }, + expectedPhase: v1alpha1.Succeeded, + expectedResults: []v1alpha1.NodeResult{ + { + Node: node1, + Observations: []v1alpha1.Observation{ + { + Component: v1alpha1.SpoofGuard, + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.NetworkPolicy, + ComponentInfo: "EgressRule", + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Output", + Action: v1alpha1.Delivered, + }, + }, + }, + }, + }, } if gwIPv4Str != "" { @@ -1161,6 +1244,63 @@ func TestTraceflowInterNode(t *testing.T) { }, }, }, + { + name: "interNodeICMPDstPodLiveTraceflowIPv4", + ipVersion: 4, + tf: &v1alpha1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-", testNamespace, node1Pods[0], node2Pods[0])), + }, + Spec: v1alpha1.TraceflowSpec{ + Source: v1alpha1.Source{ + Namespace: testNamespace, + Pod: node1Pods[0], + }, + Destination: v1alpha1.Destination{ + Namespace: testNamespace, + Pod: node2Pods[0], + }, + LiveTraffic: true, + }, + }, + expectedPhase: v1alpha1.Succeeded, + expectedResults: []v1alpha1.NodeResult{ + { + Node: node1, + Observations: []v1alpha1.Observation{ + { + Component: v1alpha1.SpoofGuard, + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.NetworkPolicy, + ComponentInfo: "EgressRule", + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Output", + Action: v1alpha1.Forwarded, + }, + }, + }, + { + Node: node2, + Observations: []v1alpha1.Observation{ + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Classification", + Action: v1alpha1.Received, + }, + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Output", + Action: v1alpha1.Delivered, + }, + }, + }, + }, + }, { name: "interNodeTraceflowIPv6", ipVersion: 6, @@ -1426,6 +1566,63 @@ func TestTraceflowInterNode(t *testing.T) { }, }, }, + { + name: "interNodeICMPDstPodLiveTraceflowIPv6", + ipVersion: 6, + tf: &v1alpha1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-", testNamespace, node1Pods[0], node2Pods[0])), + }, + Spec: v1alpha1.TraceflowSpec{ + Source: v1alpha1.Source{ + Namespace: testNamespace, + Pod: node1Pods[0], + }, + Destination: v1alpha1.Destination{ + Namespace: testNamespace, + Pod: node2Pods[0], + }, + LiveTraffic: true, + }, + }, + expectedPhase: v1alpha1.Succeeded, + expectedResults: []v1alpha1.NodeResult{ + { + Node: node1, + Observations: []v1alpha1.Observation{ + { + Component: v1alpha1.SpoofGuard, + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.NetworkPolicy, + ComponentInfo: "EgressRule", + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Output", + Action: v1alpha1.Forwarded, + }, + }, + }, + { + Node: node2, + Observations: []v1alpha1.Observation{ + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Classification", + Action: v1alpha1.Received, + }, + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Output", + Action: v1alpha1.Delivered, + }, + }, + }, + }, + }, } t.Run("traceflowGroupTest", func(t *testing.T) { @@ -1648,6 +1845,32 @@ func runTestTraceflow(t *testing.T, data *TestData, tc testcase) { } }() + if tc.tf.Spec.LiveTraffic { + // LiveTraffic Traceflow test supports only ICMP traffic from + // the source Pod to an IP or another Pod. + var dstPodIPs *PodIPs + srcPod := tc.tf.Spec.Source.Pod + dstPod := tc.tf.Spec.Destination.Pod + dstIP := tc.tf.Spec.Destination.IP + if dstIP != "" { + ip := net.ParseIP(dstIP) + if ip.To4() != nil { + dstPodIPs = &PodIPs{ipv4: &ip} + } else { + dstPodIPs = &PodIPs{ipv6: &ip} + } + } else { + podIPs := waitForPodIPs(t, data, []string{dstPod}) + dstPodIPs = podIPs[dstPod] + } + // Give some time for Nodes to install OVS flows. + time.Sleep(5 * time.Second) + // Send an ICMP echo packet from the source Pod to the destination. + if err := data.runPingCommandFromTestPod(srcPod, dstPodIPs, 2); err != nil { + t.Logf("Ping '%s' -> '%v' failed: ERROR (%v)", srcPod, *dstPodIPs, err) + } + } + tf, err := data.waitForTraceflow(t, tc.tf.Name, tc.expectedPhase) if err != nil { t.Fatalf("Error: Get Traceflow failed: %v", err) diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index 4ab743d0b3d..9d4bf4137c8 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -67,7 +67,7 @@ var ( peerGW = net.ParseIP("192.168.2.1") vMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff") - ipDscp = uint8(10) + ipDSCP = uint8(10) ) func newOFBridge(brName string) binding.Bridge { @@ -974,7 +974,7 @@ func prepareFlows(table binding.Table) ([]binding.Flow, []*ExpectFlow) { Cookie(getCookieID()). MatchProtocol(binding.ProtocolIP). MatchSrcIP(podIP). - MatchIPDscp(ipDscp). + MatchIPDSCP(ipDSCP). Action().GotoTable(table.GetNext()). Done(), table.BuildFlow(priorityNormal+20).MatchProtocol(binding.ProtocolTCP).Cookie(getCookieID()).MatchDstPort(uint16(8080), nil). @@ -1015,7 +1015,7 @@ func prepareFlows(table binding.Table) ([]binding.Flow, []*ExpectFlow) { &ExpectFlow{"priority=200,dl_dst=aa:aa:aa:aa:aa:13", fmt.Sprintf("load:0x3->NXM_NX_REG1[],load:0x1->NXM_NX_REG0[16],%s", gotoTableAction)}, &ExpectFlow{"priority=200,ip,reg0=0x10000/0x10000", "output:NXM_NX_REG1[]"}, &ExpectFlow{"priority=200,ip,nw_dst=172.16.0.0/16", "output:1"}, - &ExpectFlow{fmt.Sprintf("priority=200,ip,nw_src=192.168.1.3,nw_tos=%d", ipDscp<<2), gotoTableAction}, + &ExpectFlow{fmt.Sprintf("priority=200,ip,nw_src=192.168.1.3,nw_tos=%d", ipDSCP<<2), gotoTableAction}, &ExpectFlow{"priority=220,tcp,tp_dst=8080", "conjunction(1001,3/3)"}, &ExpectFlow{"priority=220,ip,nw_src=192.168.1.3", "conjunction(1001,1/3)"}, &ExpectFlow{"priority=220,ip,nw_dst=192.168.3.0/24", "conjunction(1001,2/3)"},