diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 8253d69ee37..a8954b99528 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -1295,6 +1295,11 @@ spec: name: Live-Traffic priority: 10 type: boolean + - description: Capture only the dropped packet. + jsonPath: .spec.droppedOnly + name: Dropped-Only + priority: 10 + type: boolean - description: Timeout in seconds. jsonPath: .spec.timeout name: Timeout @@ -1323,6 +1328,8 @@ spec: service: type: string type: object + droppedOnly: + type: boolean liveTraffic: type: boolean packet: @@ -1395,6 +1402,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 696b6e0df99..3657ebf4a8c 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -1295,6 +1295,11 @@ spec: name: Live-Traffic priority: 10 type: boolean + - description: Capture only the dropped packet. + jsonPath: .spec.droppedOnly + name: Dropped-Only + priority: 10 + type: boolean - description: Timeout in seconds. jsonPath: .spec.timeout name: Timeout @@ -1323,6 +1328,8 @@ spec: service: type: string type: object + droppedOnly: + type: boolean liveTraffic: type: boolean packet: @@ -1395,6 +1402,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index ca3d9e849ae..0102703592c 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -1295,6 +1295,11 @@ spec: name: Live-Traffic priority: 10 type: boolean + - description: Capture only the dropped packet. + jsonPath: .spec.droppedOnly + name: Dropped-Only + priority: 10 + type: boolean - description: Timeout in seconds. jsonPath: .spec.timeout name: Timeout @@ -1323,6 +1328,8 @@ spec: service: type: string type: object + droppedOnly: + type: boolean liveTraffic: type: boolean packet: @@ -1395,6 +1402,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 14f48fc7891..82f573d490b 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -1295,6 +1295,11 @@ spec: name: Live-Traffic priority: 10 type: boolean + - description: Capture only the dropped packet. + jsonPath: .spec.droppedOnly + name: Dropped-Only + priority: 10 + type: boolean - description: Timeout in seconds. jsonPath: .spec.timeout name: Timeout @@ -1323,6 +1328,8 @@ spec: service: type: string type: object + droppedOnly: + type: boolean liveTraffic: type: boolean packet: @@ -1395,6 +1402,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index a3c9f677e69..746e2e5ce68 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -1295,6 +1295,11 @@ spec: name: Live-Traffic priority: 10 type: boolean + - description: Capture only the dropped packet. + jsonPath: .spec.droppedOnly + name: Dropped-Only + priority: 10 + type: boolean - description: Timeout in seconds. jsonPath: .spec.timeout name: Timeout @@ -1323,6 +1328,8 @@ spec: service: type: string type: object + droppedOnly: + type: boolean liveTraffic: type: boolean packet: @@ -1395,6 +1402,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/base/crds.yml b/build/yamls/base/crds.yml index a07b0a12a13..ce3397ee598 100644 --- a/build/yamls/base/crds.yml +++ b/build/yamls/base/crds.yml @@ -128,6 +128,11 @@ spec: name: Live-Traffic type: boolean priority: 10 + - jsonPath: .spec.droppedOnly + description: Capture only the dropped packet. + name: Dropped-Only + type: boolean + priority: 10 - jsonPath: .spec.timeout description: Timeout in seconds. name: Timeout @@ -226,6 +231,8 @@ spec: type: integer liveTraffic: type: boolean + droppedOnly: + type: boolean timeout: type: integer status: @@ -273,6 +280,51 @@ spec: type: string tunnelDstIP: type: string + capturedPacket: + properties: + srcIP: + type: string + dstIP: + type: string + length: + type: integer + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + srcPort: + type: integer + flags: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + # ICMP echo is not supported. + type: object + type: object subresources: status: {} scope: Cluster diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index bde21dd2fa1..7332d90abbb 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -23,7 +23,6 @@ import ( "github.com/contiv/libOpenflow/openflow13" "github.com/contiv/libOpenflow/protocol" - "github.com/contiv/libOpenflow/util" "github.com/contiv/ofnet/ofctrl" "gopkg.in/natefinch/lumberjack.v2" "k8s.io/klog" @@ -282,7 +281,7 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { if prot == protocol.Type_TCP { // Get TCP data. - oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, err := getTCPHeaderData(pktIn.Data.Data) + oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, err := binding.GetTCPHeaderData(pktIn.Data.Data) if err != nil { return err } @@ -330,26 +329,3 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { true) } } - -// getTCPHeaderData gets TCP header data from IP packet. -func getTCPHeaderData(ipPkt util.Message) (tcpSrcPort uint16, tcpDstPort uint16, tcpSeqNum uint32, tcpAckNum uint32, err error) { - var tcpBytes []byte - - // Transfer Buffer to TCP - switch typedIPPkt := ipPkt.(type) { - case *protocol.IPv4: - tcpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() - case *protocol.IPv6: - tcpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() - } - if err != nil { - return 0, 0, 0, 0, err - } - tcpIn := new(protocol.TCP) - err = tcpIn.UnmarshalBinary(tcpBytes) - if err != nil { - return 0, 0, 0, 0, err - } - - return tcpIn.PortSrc, tcpIn.PortDst, tcpIn.SeqNum, tcpIn.AckNum, nil -} diff --git a/pkg/agent/controller/networkpolicy/packetin_test.go b/pkg/agent/controller/networkpolicy/packetin_test.go index 3ae7bf864b9..1d0c3f4394a 100644 --- a/pkg/agent/controller/networkpolicy/packetin_test.go +++ b/pkg/agent/controller/networkpolicy/packetin_test.go @@ -63,55 +63,3 @@ func TestGetPacketInfo(t *testing.T) { }) } } - -func TestGetTCPHeaderData(t *testing.T) { - type args struct { - tcp protocol.TCP - expectTCPSrcPort uint16 - expectTCPDstPort uint16 - expectTCPSeqNum uint32 - expectTCPAckNum uint32 - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "ipv4", - args: args{ - tcp: protocol.TCP{ - PortSrc: 1080, - PortDst: 80, - SeqNum: 0, - AckNum: 0, - }, - expectTCPSrcPort: 1080, - expectTCPDstPort: 80, - expectTCPSeqNum: 0, - expectTCPAckNum: 0, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tcp := tt.args.tcp - pktIn := new(protocol.IPv4) - bytes, _ := tcp.MarshalBinary() - bf := new(util.Buffer) - bf.UnmarshalBinary(bytes) - pktIn.Data = bf - - tcpSrcPort, tcpDstPort, tcpSeqNum, tcpAckNum, err := getTCPHeaderData(pktIn) - - if (err != nil) != tt.wantErr { - t.Errorf("getPacketInfo() error = %v, wantErr %v", err, tt.wantErr) - } - assert.Equal(t, tt.args.expectTCPSrcPort, tcpSrcPort, "Expect to retrieve exact TCP src port while differed") - assert.Equal(t, tt.args.expectTCPDstPort, tcpDstPort, "Expect to retrieve exact TCP dst port while differed") - assert.Equal(t, tt.args.expectTCPSeqNum, tcpSeqNum, "Expect to retrieve exact TCP seq num while differed") - assert.Equal(t, tt.args.expectTCPAckNum, tcpAckNum, "Expect to retrieve exact TCP ack num while differed") - }) - } -} diff --git a/pkg/agent/controller/traceflow/packetin.go b/pkg/agent/controller/traceflow/packetin.go index b294ba3b636..c1220ca8222 100644 --- a/pkg/agent/controller/traceflow/packetin.go +++ b/pkg/agent/controller/traceflow/packetin.go @@ -38,7 +38,7 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error { if !c.traceflowListerSynced() { return errors.New("traceflow controller is not started") } - oldTf, nodeResult, err := c.parsePacketIn(pktIn) + oldTf, nodeResult, packet, err := c.parsePacketIn(pktIn) if err != nil { klog.Errorf("parsePacketIn error: %+v", err) return err @@ -53,12 +53,15 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error { } update := tf.DeepCopy() update.Status.Results = append(update.Status.Results, *nodeResult) + if packet != nil { + update.Status.CapturedPacket = packet + } _, err = c.traceflowClient.CrdV1alpha1().Traceflows().UpdateStatus(context.TODO(), update, v1.UpdateOptions{}) if err != nil { klog.Warningf("Update traceflow failed: %+v", err) return err } - klog.Infof("Updated traceflow %s: %+v", tf.Name, nodeResult) + klog.Infof("Updated traceflow %s: %+v", tf.Name, update.Status) return nil }) if err != nil { @@ -67,7 +70,7 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error { return err } -func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Traceflow, *crdv1alpha1.NodeResult, error) { +func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Traceflow, *crdv1alpha1.NodeResult, *crdv1alpha1.Packet, error) { matchers := pktIn.GetMatches() var match *ofctrl.MatchField @@ -77,17 +80,17 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if pktIn.Data.Ethertype == protocol.IPv4_MSG { ipPacket, ok := pktIn.Data.Data.(*protocol.IPv4) if !ok { - return nil, nil, errors.New("invalid traceflow IPv4 packet") + return nil, nil, nil, errors.New("invalid traceflow IPv4 packet") } tag = ipPacket.DSCP } else if pktIn.Data.Ethertype == protocol.IPv6_MSG { ipv6Packet, ok := pktIn.Data.Data.(*protocol.IPv6) if !ok { - return nil, nil, errors.New("invalid traceflow IPv6 packet") + return nil, nil, nil, errors.New("invalid traceflow IPv6 packet") } tag = ipv6Packet.TrafficClass >> 2 } else { - return nil, nil, fmt.Errorf("unsupported traceflow packet Ethertype: %d", pktIn.Data.Ethertype) + return nil, nil, nil, fmt.Errorf("unsupported traceflow packet Ethertype: %d", pktIn.Data.Ethertype) } firstPacket := false @@ -99,18 +102,25 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl } c.runningTraceflowsMutex.RUnlock() if !exists { - return nil, nil, fmt.Errorf("Traceflow for dataplane tag %d not found in cache", pktIn.Data.Ethertype) + return nil, nil, nil, fmt.Errorf("Traceflow for dataplane tag %d not found in cache", pktIn.Data.Ethertype) } + var capturedPacket *crdv1alpha1.Packet if tfState.liveTraffic && firstPacket { // Uninstall the OVS flows after receiving the first packet, to // avoid capturing too many matched packets. c.ofClient.UninstallTraceflowFlows(tag) + // Report the captured dropped packet, if the Traceflow is for + // the dropped packet only; otherwise only the sender reports + // the first captured packet. + if tfState.isSender || tfState.droppedOnly { + capturedPacket = parseCapturedPacket(pktIn) + } } tf, err := c.traceflowLister.Get(tfState.name) if err != nil { - return nil, nil, fmt.Errorf("failed to get Traceflow %s CRD: %v", tfState.name, err) + return nil, nil, nil, fmt.Errorf("failed to get Traceflow %s CRD: %v", tfState.name, err) } obs := []crdv1alpha1.Observation{} @@ -135,25 +145,25 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl case protocol.IPv4_MSG: ipPacket, ok := pktIn.Data.Data.(*protocol.IPv4) if !ok { - return nil, nil, errors.New("invalid traceflow IPv4 packet") + return nil, nil, nil, errors.New("invalid traceflow IPv4 packet") } ctNwDst, err = getCTDstValue(matchers, false) if err != nil { - return nil, nil, err + return nil, nil, nil, err } ipDst = ipPacket.NWDst.String() case protocol.IPv6_MSG: ipPacket, ok := pktIn.Data.Data.(*protocol.IPv6) if !ok { - return nil, nil, errors.New("invalid traceflow IPv6 packet") + return nil, nil, nil, errors.New("invalid traceflow IPv6 packet") } ctNwDst, err = getCTDstValue(matchers, true) if err != nil { - return nil, nil, err + return nil, nil, nil, err } ipDst = ipPacket.NWDst.String() default: - return nil, nil, fmt.Errorf("unsupported traceflow packet ether type %d", pktIn.Data.Ethertype) + return nil, nil, nil, fmt.Errorf("unsupported traceflow packet ether type %d", pktIn.Data.Ethertype) } if isValidCtNw(ctNwDst) && ipDst != ctNwDst { ob := &crdv1alpha1.Observation{ @@ -168,7 +178,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if match = getMatchRegField(matchers, uint32(openflow.EgressReg)); match != nil { egressInfo, err := getRegValue(match, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } ob := getNetworkPolicyObservation(tableID, false) npRef := c.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(egressInfo) @@ -182,7 +192,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if match = getMatchRegField(matchers, uint32(openflow.IngressReg)); match != nil { ingressInfo, err := getRegValue(match, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } ob := getNetworkPolicyObservation(tableID, true) npRef := c.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(ingressInfo) @@ -198,7 +208,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if match = getMatchRegField(matchers, uint32(openflow.CNPDenyConjIDReg)); match != nil { notAllowConjInfo, err := getRegValue(match, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } npRef := c.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(notAllowConjInfo) if npRef != nil { @@ -219,14 +229,14 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if match = getMatchTunnelDstField(matchers, isIPv6); match != nil { tunnelDstIP, err = getTunnelDstValue(match) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } var outputPort uint32 if match = getMatchRegField(matchers, uint32(openflow.PortCacheReg)); match != nil { outputPort, err = getRegValue(match, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } gatewayIP := c.nodeConfig.GatewayConfig.IPv4 @@ -252,7 +262,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl } nodeResult := crdv1alpha1.NodeResult{Node: c.nodeConfig.Name, Timestamp: time.Now().Unix(), Observations: obs} - return tf, &nodeResult, nil + return tf, &nodeResult, capturedPacket, nil } func getMatchRegField(matchers *ofctrl.Matchers, regNum uint32) *ofctrl.MatchField { @@ -341,3 +351,22 @@ func isValidCtNw(ipStr string) bool { } return true } + +func parseCapturedPacket(pktIn *ofctrl.PacketIn) *crdv1alpha1.Packet { + pkt, _ := binding.ParsePacketIn(pktIn) + capturedPacket := crdv1alpha1.Packet{SrcIP: pkt.SourceIP.String(), DstIP: pkt.DestinationIP.String(), Length: pkt.IPLength} + if pkt.IsIPv6 { + ipProto := int32(pkt.IPProto) + capturedPacket.IPv6Header = &crdv1alpha1.IPv6Header{NextHeader: &ipProto, HopLimit: int32(pkt.TTL)} + } else { + capturedPacket.IPHeader.Protocol = int32(pkt.IPProto) + capturedPacket.IPHeader.TTL = int32(pkt.TTL) + capturedPacket.IPHeader.Flags = int32(pkt.IPFlags) + } + if pkt.IPProto == protocol.Type_TCP { + capturedPacket.TransportHeader.TCP = &crdv1alpha1.TCPHeader{SrcPort: int32(pkt.SourcePort), DstPort: int32(pkt.DestinationPort), Flags: int32(pkt.TCPFlags)} + } else if pkt.IPProto == protocol.Type_UDP { + capturedPacket.TransportHeader.UDP = &crdv1alpha1.UDPHeader{SrcPort: int32(pkt.SourcePort), DstPort: int32(pkt.DestinationPort)} + } + return &capturedPacket +} diff --git a/pkg/agent/controller/traceflow/packetin_test.go b/pkg/agent/controller/traceflow/packetin_test.go index 5a9f8779638..41fedb4c755 100644 --- a/pkg/agent/controller/traceflow/packetin_test.go +++ b/pkg/agent/controller/traceflow/packetin_test.go @@ -15,9 +15,15 @@ package traceflow import ( + "net" "reflect" "testing" + "github.com/contiv/libOpenflow/protocol" + "github.com/contiv/libOpenflow/util" + "github.com/contiv/ofnet/ofctrl" + "github.com/stretchr/testify/assert" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" crdv1alpha1 "github.com/vmware-tanzu/antrea/pkg/apis/crd/v1alpha1" ) @@ -89,3 +95,67 @@ func Test_getNetworkPolicyObservation(t *testing.T) { }) } } + +func TestParseCapturedPacket(t *testing.T) { + srcIPv4 := net.ParseIP("10.1.1.11") + dstIPv4 := net.ParseIP("10.1.1.12") + srcIPv6 := net.ParseIP("fd12:ab:34:a001::11") + dstIPv6 := net.ParseIP("fd12:ab:34:a001::12") + + tcpPktIn := protocol.IPv4{Length: 1000, Flags: 1, TTL: 64, NWSrc: srcIPv4, NWDst: dstIPv4, Protocol: protocol.Type_TCP} + tcp := protocol.TCP{PortSrc: 1080, PortDst: 80, SeqNum: 1234, Code: 2} + bytes, _ := tcp.MarshalBinary() + bf := new(util.Buffer) + bf.UnmarshalBinary(bytes) + tcpPktIn.Data = bf + tcpPktCap := crdv1alpha1.Packet{ + SrcIP: tcpPktIn.NWSrc.String(), DstIP: tcpPktIn.NWDst.String(), Length: tcpPktIn.Length, + IPHeader: crdv1alpha1.IPHeader{Protocol: int32(tcpPktIn.Protocol), TTL: int32(tcpPktIn.TTL), Flags: int32(tcpPktIn.Flags)}, + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{SrcPort: int32(tcp.PortSrc), DstPort: int32(tcp.PortDst), Flags: int32(tcp.Code)}, + }, + } + + udpPktIn := protocol.IPv4{Length: 50, Flags: 0, TTL: 128, NWSrc: srcIPv4, NWDst: dstIPv4, Protocol: protocol.Type_UDP} + udp := protocol.UDP{PortSrc: 1080, PortDst: 80} + bytes, _ = udp.MarshalBinary() + bf = new(util.Buffer) + bf.UnmarshalBinary(bytes) + udpPktIn.Data = bf + udpPktCap := crdv1alpha1.Packet{ + SrcIP: udpPktIn.NWSrc.String(), DstIP: udpPktIn.NWDst.String(), Length: udpPktIn.Length, + IPHeader: crdv1alpha1.IPHeader{Protocol: int32(udpPktIn.Protocol), TTL: int32(udpPktIn.TTL), Flags: int32(udpPktIn.Flags)}, + TransportHeader: crdv1alpha1.TransportHeader{ + UDP: &crdv1alpha1.UDPHeader{SrcPort: int32(udp.PortSrc), DstPort: int32(udp.PortDst)}, + }, + } + + icmpv6PktIn := protocol.IPv6{Length: 960, HopLimit: 8, NWSrc: srcIPv6, NWDst: dstIPv6, NextHeader: protocol.Type_IPv6ICMP} + nextHdr := int32(icmpv6PktIn.NextHeader) + icmpv6PktCap := crdv1alpha1.Packet{ + SrcIP: icmpv6PktIn.NWSrc.String(), DstIP: icmpv6PktIn.NWDst.String(), Length: icmpv6PktIn.Length + 40, + IPv6Header: &crdv1alpha1.IPv6Header{NextHeader: &nextHdr, HopLimit: int32(icmpv6PktIn.HopLimit)}, + } + + tests := []struct { + name string + pktInData util.Message + pktCap *crdv1alpha1.Packet + isIPv6 bool + }{ + {"tcp", &tcpPktIn, &tcpPktCap, false}, + {"udp", &udpPktIn, &udpPktCap, false}, + {"icmpv6", &icmpv6PktIn, &icmpv6PktCap, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ethType := uint16(protocol.IPv4_MSG) + if tt.isIPv6 { + ethType = uint16(protocol.IPv6_MSG) + } + pktIn := ofctrl.PacketIn{Data: protocol.Ethernet{Ethertype: ethType, Data: tt.pktInData}} + packet := parseCapturedPacket(&pktIn) + assert.True(t, reflect.DeepEqual(packet, tt.pktCap), "parsed packet does not match the expected") + }) + } +} diff --git a/pkg/agent/controller/traceflow/traceflow_controller.go b/pkg/agent/controller/traceflow/traceflow_controller.go index 6d542552294..423df3e54b2 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller.go +++ b/pkg/agent/controller/traceflow/traceflow_controller.go @@ -74,6 +74,7 @@ type traceflowState struct { name string tag uint8 liveTraffic bool + droppedOnly bool isSender bool // Agent received the first Traceflow packet from OVS. receivedPacket bool @@ -308,6 +309,9 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error { srcOFPort = uint32(podInterfaces[0].OFPort) // On the source Node, trace the first packet of the first // connection that matches the Traceflow spec. + // TODO: support specifying only the Destination Pod for + // live-traffic Traceflow, which will trace the matched traffic + // to the destination Pod from any source. if liveTraffic { matchPacket = packet } @@ -316,7 +320,10 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error { // Store Traceflow to cache. c.runningTraceflowsMutex.Lock() - tfState := traceflowState{name: tf.Name, tag: tf.Status.DataplaneTag, liveTraffic: tf.Spec.LiveTraffic, isSender: isSender} + tfState := traceflowState{ + name: tf.Name, tag: tf.Status.DataplaneTag, + liveTraffic: liveTraffic, droppedOnly: tf.Spec.DroppedOnly && liveTraffic, + isSender: isSender} c.runningTraceflows[tfState.tag] = &tfState c.runningTraceflowsMutex.Unlock() @@ -326,7 +333,7 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error { if timeout == 0 { timeout = crdv1alpha1.DefaultTraceflowTimeout } - err = c.ofClient.InstallTraceflowFlows(tfState.tag, liveTraffic, matchPacket, srcOFPort, timeout) + err = c.ofClient.InstallTraceflowFlows(tfState.tag, liveTraffic, tfState.droppedOnly, matchPacket, srcOFPort, timeout) if err != nil { return err } @@ -506,8 +513,8 @@ func (c *Controller) preparePacket(tf *crdv1alpha1.Traceflow, intf *interfacesto } } - if packet.IPProto == 0 || packet.IPProto == protocol.Type_ICMP || packet.IPProto == protocol.Type_IPv6ICMP { - // IPProto defaults to ICMP. + // Defaults to ICMP if not live-traffic Traceflow. + if packet.IPProto == 0 && !liveTraffic || packet.IPProto == protocol.Type_ICMP || packet.IPProto == protocol.Type_IPv6ICMP { isICMP = true } if isICMP { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 650fba6616f..15889c7e28a 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -227,7 +227,7 @@ type Client interface { SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, inPort uint32, outPort int32) error // InstallTraceflowFlows installs flows for a Traceflow request. - InstallTraceflowFlows(dataplaneTag uint8, liveTraffic bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error + InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error // UninstallTraceflowFlows uninstalls flows for a Traceflow request. UninstallTraceflowFlows(dataplaneTag uint8) error @@ -860,11 +860,11 @@ func (c *client) SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, return c.bridge.SendPacketOut(packetOutObj) } -func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error { +func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error { cacheKey := fmt.Sprintf("%x", dataplaneTag) flows := []binding.Flow{} flows = append(flows, c.traceflowConnectionTrackFlows(dataplaneTag, packet, srcOFPort, timeoutSeconds, cookie.Default)...) - flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, liveTraffic, timeoutSeconds, cookie.Default)...) + flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, liveTraffic, droppedOnly, timeoutSeconds, cookie.Default)...) flows = append(flows, c.traceflowNetworkPolicyFlows(dataplaneTag, timeoutSeconds, cookie.Default)...) return c.addFlows(c.tfFlowCache, cacheKey, flows) } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index a4bb9b4b293..d0aba4cc9f6 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -280,7 +280,7 @@ func Test_client_InstallTraceflowFlows(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() c := tt.prepareFunc(ctrl) - if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, nil, 0, 300); (err != nil) != tt.wantErr { + if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, false, nil, 0, 300); (err != nil) != tt.wantErr { t.Errorf("InstallTraceflowFlows() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index ae475e940f4..c8c207c278c 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -934,54 +934,59 @@ func (c *client) l2ForwardCalcFlow(dstMAC net.HardwareAddr, ofPort uint32, skipI // traceflowL2ForwardOutputFlows generates Traceflow specific flows that outputs traceflow packets // to OVS port and Antrea Agent after L2forwarding calculation. -func (c *client) traceflowL2ForwardOutputFlows(dataplaneTag uint8, liveTraffic bool, timeout uint16, category cookie.Category) []binding.Flow { +func (c *client) traceflowL2ForwardOutputFlows(dataplaneTag uint8, liveTraffic, droppedOnly bool, timeout uint16, category cookie.Category) []binding.Flow { flows := []binding.Flow{} l2FwdOutTable := c.pipeline[L2ForwardingOutTable] for _, ipProtocol := range []binding.Protocol{binding.ProtocolIP, binding.ProtocolIPv6} { if c.encapMode.SupportsEncap() { // SendToController and Output if output port is tunnel port. - flows = append(flows, l2FwdOutTable.BuildFlow(priorityNormal+3). + fb1 := l2FwdOutTable.BuildFlow(priorityNormal+3). MatchReg(int(PortCacheReg), config.DefaultTunOFPort). MatchIPDSCP(dataplaneTag). SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). Action().OutputRegRange(int(PortCacheReg), ofPortRegRange). - Action().SendToController(uint8(PacketInReasonTF)). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done()) + Cookie(c.cookieAllocator.Request(category).Raw()) // For injected packets, only SendToController if output port is local // gateway. In encapMode, a Traceflow packet going out of the gateway // port (i.e. exiting the overlay) essentially means that the Traceflow // request is complete. - flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+2). + fb2 := l2FwdOutTable.BuildFlow(priorityNormal+2). MatchReg(int(PortCacheReg), config.HostGatewayOFPort). MatchIPDSCP(dataplaneTag). SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). - Action().SendToController(uint8(PacketInReasonTF)). Cookie(c.cookieAllocator.Request(category).Raw()) + + // Do not send to controller if captures only dropped packet. + if !droppedOnly { + fb1 = fb1.Action().SendToController(uint8(PacketInReasonTF)) + fb2 = fb2.Action().SendToController(uint8(PacketInReasonTF)) + } if liveTraffic { // Clear the loaded DSCP bits before output. - flowBuilder = flowBuilder.Action().LoadIPDSCP(0). + fb2 = fb2.Action().LoadIPDSCP(0). Action().OutputRegRange(int(PortCacheReg), ofPortRegRange) } - flows = append(flows, flowBuilder.Done()) + flows = append(flows, fb1.Done(), fb2.Done()) } else { // SendToController and Output if output port is local gateway. Unlike in // encapMode, inter-Node Pod-to-Pod traffic is expected to go out of the // gateway port on the way to its destination. - flows = append(flows, l2FwdOutTable.BuildFlow(priorityNormal+2). + fb1 := l2FwdOutTable.BuildFlow(priorityNormal+2). MatchReg(int(PortCacheReg), config.HostGatewayOFPort). MatchIPDSCP(dataplaneTag). SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). Action().OutputRegRange(int(PortCacheReg), ofPortRegRange). - Action().SendToController(uint8(PacketInReasonTF)). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done()) + Cookie(c.cookieAllocator.Request(category).Raw()) + if !droppedOnly { + fb1 = fb1.Action().SendToController(uint8(PacketInReasonTF)) + } + flows = append(flows, fb1.Done()) } // Only SendToController if output port is local gateway and destination IP is gateway. gatewayIP := c.nodeConfig.GatewayConfig.IPv4 @@ -989,34 +994,38 @@ func (c *client) traceflowL2ForwardOutputFlows(dataplaneTag uint8, liveTraffic b gatewayIP = c.nodeConfig.GatewayConfig.IPv6 } if gatewayIP != nil { - flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+3). + fb := l2FwdOutTable.BuildFlow(priorityNormal+3). MatchReg(int(PortCacheReg), config.HostGatewayOFPort). MatchDstIP(gatewayIP). MatchIPDSCP(dataplaneTag). SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). - Action().SendToController(uint8(PacketInReasonTF)). Cookie(c.cookieAllocator.Request(category).Raw()) + if !droppedOnly { + fb = fb.Action().SendToController(uint8(PacketInReasonTF)) + } if liveTraffic { - flowBuilder = flowBuilder.Action().LoadIPDSCP(0). + fb = fb.Action().LoadIPDSCP(0). Action().OutputRegRange(int(PortCacheReg), ofPortRegRange) } - flows = append(flows, flowBuilder.Done()) + flows = append(flows, fb.Done()) } // Only SendToController if output port is Pod port. - flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+2). + fb := l2FwdOutTable.BuildFlow(priorityNormal+2). MatchIPDSCP(dataplaneTag). SetHardTimeout(timeout). MatchProtocol(ipProtocol). MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange). - Action().SendToController(uint8(PacketInReasonTF)). Cookie(c.cookieAllocator.Request(category).Raw()) + if !droppedOnly { + fb = fb.Action().SendToController(uint8(PacketInReasonTF)) + } if liveTraffic { - flowBuilder = flowBuilder.Action().LoadIPDSCP(0). + fb = fb.Action().LoadIPDSCP(0). Action().OutputRegRange(int(PortCacheReg), ofPortRegRange) } - flows = append(flows, flowBuilder.Done()) + flows = append(flows, fb.Done()) } return flows } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 6197350eacb..7022d37ec74 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -447,17 +447,17 @@ func (mr *MockClientMockRecorder) InstallServiceGroup(arg0, arg1, arg2 interface } // InstallTraceflowFlows mocks base method -func (m *MockClient) InstallTraceflowFlows(arg0 byte, arg1 bool, arg2 *openflow.Packet, arg3 uint32, arg4 uint16) error { +func (m *MockClient) InstallTraceflowFlows(arg0 byte, arg1, arg2 bool, arg3 *openflow.Packet, arg4 uint32, arg5 uint16) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallTraceflowFlows", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "InstallTraceflowFlows", arg0, arg1, arg2, arg3, arg4, arg5) ret0, _ := ret[0].(error) return ret0 } // InstallTraceflowFlows indicates an expected call of InstallTraceflowFlows -func (mr *MockClientMockRecorder) InstallTraceflowFlows(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallTraceflowFlows(arg0, arg1, arg2, arg3, arg4, arg5 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).InstallTraceflowFlows), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).InstallTraceflowFlows), arg0, arg1, arg2, arg3, arg4, arg5) } // IsConnected mocks base method diff --git a/pkg/antctl/raw/traceflow/command.go b/pkg/antctl/raw/traceflow/command.go index eebdb5bd845..f65ee65d828 100644 --- a/pkg/antctl/raw/traceflow/command.go +++ b/pkg/antctl/raw/traceflow/command.go @@ -49,8 +49,9 @@ var ( outputType string flow string liveTraffic bool - nowait bool + droppedOnly bool timeout time.Duration + nowait bool }{} ) @@ -60,13 +61,23 @@ var protocols = map[string]int32{ "udp": 17, } +type CapturedPacket struct { + SrcIP string `json:"srcIP" yaml:"srcIP"` + DstIP string `json:"dstIP" yaml:"dstIP"` + Length uint16 `json:"length" yaml:"length"` + IPHeader *v1alpha1.IPHeader `json:"ipHeader,omitempty" yaml:"ipHeader,omitempty"` + IPv6Header *v1alpha1.IPv6Header `json:"ipv6Header,omitempty" yaml:"ipv6Header,omitempty"` + TransportHeader *v1alpha1.TransportHeader `json:"transportHeader,omitempty" yaml:"tranportHeader,omitempty"` +} + // Response is the response of antctl Traceflow. type Response struct { - Name string `json:"name" yaml:"name"` // Traceflow name - Phase v1alpha1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase - Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0" - Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1" - NodeResults []v1alpha1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results + Name string `json:"name" yaml:"name"` // Traceflow name + Phase v1alpha1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase + Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0" + Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1" + NodeResults []v1alpha1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results + CapturedPacket *CapturedPacket `json:"capturedPacket,omitempty" yaml:"capturedPacket,omitempty"` // Captured packet in live-traffic Traceflow } func init() { @@ -75,18 +86,18 @@ func init() { Short: "Start a Traceflows", Long: "Start a Traceflows from one Pod to another Pod/Service/IP.", Aliases: []string{"tf", "traceflows"}, - Example: ` Start a Traceflow from busybox0 to busybox1, both Pods are in Namespace default - $antctl traceflow -S busybox0 -D busybox1 - Start a Traceflow from busybox0 to destination IP, source is in Namespace default - $antctl traceflow -S busybox0 -D 123.123.123.123 - Start a Traceflow from busybox0 to destination Service, source and destination are in Namespace default - $antctl traceflow -S busybox0 -D svc0 -f tcp,tcp_dst=80,tcp_flags=2 - Start a Traceflow from busybox0 in Namespace ns0 to busybox1 in Namespace ns1, output type is json - $antctl traceflow -S ns0/busybox0 -D ns1/busybox1 -o json - Start a Traceflow from busybox0 to busybox1, with a TCP packet to destination port 80 - $antctl traceflow -S busybox0 -D busybox1 -f tcp,tcp_dst=80 - Start a Traceflow for live TCP traffic from busybox0 to TCP port 80, with 1 minute timeout - $antctl traceflow -S busybox0 -f tcp,tcp_dst=80 --live-traffic -t 1m + Example: ` Start a Traceflow from pod1 to pod2, both Pods are in Namespace default + $antctl traceflow -S pod1 -D pod2 + Start a Traceflow from pod1 in Namepace ns1 to a destination IP + $antctl traceflow -S ns1/pod1 -D 123.123.123.123 + Start a Traceflow from pod1 to Service svc1 in Namespace ns1 + $antctl traceflow -S pod1 -D ns1/svc1 -f tcp,tcp_dst=80 + Start a Traceflow from pod1 to pod2, with a UDP packet to destination port 1234 + $antctl traceflow -S pod1 -D pod2 -f udp,udp_dst=1234 + Start a Traceflow for live TCP traffic from pod1 to svc1, with 1 minute timeout + $antctl traceflow -S pod1 -D svc1 -f tcp --live-traffic -t 1m + Start a Traceflow to capture the first dropped TCP packet from pod1 to port 80 within 10 minutes + $antctl traceflow -S pod1 -f tcp,tcp_dst=80 --live-traffic --dropped-only -t 10m `, RunE: runE, Args: cobra.NoArgs, @@ -97,6 +108,7 @@ func init() { Command.Flags().StringVarP(&option.outputType, "output", "o", "yaml", "output type: yaml (default), json") Command.Flags().StringVarP(&option.flow, "flow", "f", "", "specify the flow (packet headers) of the Traceflow packet, including tcp_src, tcp_dst, tcp_flags, udp_src, udp_dst, ipv6") Command.Flags().BoolVarP(&option.liveTraffic, "live-traffic", "L", false, "if set, the Traceflow will trace the first packet of the matched live traffic flow") + Command.Flags().BoolVarP(&option.droppedOnly, "dropped-only", "", false, "if set, capture only the dropped packet in a live-traffic Traceflow") Command.Flags().BoolVarP(&option.nowait, "nowait", "", false, "if set, command returns without retrieving results") } @@ -120,6 +132,11 @@ func runE(cmd *cobra.Command, _ []string) error { return nil } + if !option.liveTraffic && option.droppedOnly { + fmt.Println("--dropped-only works only with live-traffic Traceflow") + return nil + } + kubeconfigPath, err := cmd.Flags().GetString("kubeconfig") if err != nil { return err @@ -248,8 +265,9 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) { Source: src, Destination: dst, Packet: *pkt, - Timeout: uint16(option.timeout.Seconds()), LiveTraffic: option.liveTraffic, + DroppedOnly: option.droppedOnly, + Timeout: uint16(option.timeout.Seconds()), }, } return tf, nil @@ -358,6 +376,18 @@ func output(tf *v1alpha1.Traceflow) error { } else if len(tf.Spec.Destination.Service) != 0 { r.Destination = fmt.Sprintf("%s/%s", tf.Spec.Destination.Namespace, tf.Spec.Destination.Service) } + + pkt := tf.Status.CapturedPacket + if pkt != nil { + r.CapturedPacket = &CapturedPacket{SrcIP: pkt.SrcIP, DstIP: pkt.DstIP, Length: pkt.Length, IPv6Header: pkt.IPv6Header} + if pkt.IPv6Header == nil { + r.CapturedPacket.IPHeader = &pkt.IPHeader + } + if pkt.TransportHeader.TCP != nil || pkt.TransportHeader.UDP != nil || pkt.TransportHeader.ICMP != nil { + r.CapturedPacket.TransportHeader = &pkt.TransportHeader + } + } + if option.outputType == "json" { if err := jsonOutput(&r); err != nil { return fmt.Errorf("error when converting output to json: %w", err) diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index ed1d49c1917..db5d75e2b1b 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -115,6 +115,9 @@ type TraceflowSpec struct { // rather than an injected packet, when set to true. The first packet of // the first connection that matches the packet spec will be traced. LiveTraffic bool `json:"liveTraffic,omitempty"` + // DroppedOnly indicates only the dropped packet should be captured in a + // live-traffic Traceflow. + DroppedOnly bool `json:"droppedOnly,omitempty"` // Timeout specifies the timeout of the Traceflow in seconds. Defaults // to 20 seconds if not set. Timeout uint16 `json:"timeout,omitempty"` @@ -143,30 +146,30 @@ type Destination struct { // IPHeader describes spec of an IPv4 header. type IPHeader struct { // SrcIP is the source IP. - SrcIP string `json:"srcIP,omitempty"` + SrcIP string `json:"srcIP,omitempty" yaml:"srcIP,omitempty"` // Protocol is the IP protocol. - Protocol int32 `json:"protocol,omitempty"` + Protocol int32 `json:"protocol,omitempty" yaml:"protocol,omitempty"` // TTL is the IP TTL. - TTL int32 `json:"ttl,omitempty"` + TTL int32 `json:"ttl,omitempty" yaml:"ttl,omitempty"` // Flags is the flags for IP. - Flags int32 `json:"flags,omitempty"` + Flags int32 `json:"flags,omitempty" yaml:"flags,omitempty"` } // IPv6Header describes spec of an IPv6 header. type IPv6Header struct { // SrcIP is the source IPv6. - SrcIP string `json:"srcIP,omitempty"` + SrcIP string `json:"srcIP,omitempty" yaml:"srcIP,omitempty"` // NextHeader is the IPv6 protocol. - NextHeader *int32 `json:"nextHeader,omitempty"` + NextHeader *int32 `json:"nextHeader,omitempty" yaml:"nextHeader,omitempty"` // HopLimit is the IPv6 Hop Limit. - HopLimit int32 `json:"hopLimit,omitempty"` + HopLimit int32 `json:"hopLimit,omitempty" yaml:"hopLimit,omitempty"` } // TransportHeader describes spec of a TransportHeader. type TransportHeader struct { - ICMP *ICMPEchoRequestHeader `json:"icmp,omitempty"` - UDP *UDPHeader `json:"udp,omitempty"` - TCP *TCPHeader `json:"tcp,omitempty"` + ICMP *ICMPEchoRequestHeader `json:"icmp,omitempty" yaml:"icmp,omitempty"` + UDP *UDPHeader `json:"udp,omitempty" yaml:"udp,omitempty"` + TCP *TCPHeader `json:"tcp,omitempty" yaml:"tcp,omitempty"` } // ICMPEchoRequestHeader describes spec of an ICMP echo request header. @@ -197,6 +200,10 @@ type TCPHeader struct { // Packet includes header info. type Packet struct { + SrcIP string `json:"srcIP,omitempty"` + DstIP string `json:"dstIP,omitempty"` + // Length is the IP packet length (includes the IPv4 or IPv6 header length). + Length uint16 `json:"length,omitempty"` // TODO: change type IPHeader to *IPHeader and correct all internal references IPHeader IPHeader `json:"ipHeader,omitempty"` IPv6Header *IPv6Header `json:"ipv6Header,omitempty"` @@ -213,6 +220,8 @@ type TraceflowStatus struct { DataplaneTag uint8 `json:"dataplaneTag,omitempty"` // Results is the collection of all observations on different nodes. Results []NodeResult `json:"results,omitempty"` + // CapturedPacket is the captured packet in live-traffic Traceflow. + CapturedPacket *Packet `json:"capturedPacket,omitempty"` } type NodeResult struct { diff --git a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go index 7cc82efef09..9da75781396 100644 --- a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go @@ -696,6 +696,11 @@ func (in *TraceflowStatus) DeepCopyInto(out *TraceflowStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.CapturedPacket != nil { + in, out := &in.CapturedPacket, &out.CapturedPacket + *out = new(Packet) + (*in).DeepCopyInto(*out) + } return } diff --git a/pkg/controller/traceflow/controller.go b/pkg/controller/traceflow/controller.go index 8aa5f140295..2e074d3b6bb 100644 --- a/pkg/controller/traceflow/controller.go +++ b/pkg/controller/traceflow/controller.go @@ -282,33 +282,45 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error { } func (c *Controller) checkTraceflowStatus(tf *crdv1alpha1.Traceflow) error { - sender := false - receiver := false - for i, nodeResult := range tf.Status.Results { - for j, ob := range nodeResult.Observations { - if ob.Component == crdv1alpha1.ComponentSpoofGuard { - sender = true - } - if ob.Action == crdv1alpha1.ActionDelivered || ob.Action == crdv1alpha1.ActionDropped || ob.Action == crdv1alpha1.ActionForwardedOutOfOverlay { - receiver = true - } - if ob.TranslatedDstIP != "" { - // Add Pod ns/name to observation if TranslatedDstIP (a.k.a. Service Endpoint address) is Pod IP. - pods, err := c.podInformer.Informer().GetIndexer().ByIndex(podIPsIndex, ob.TranslatedDstIP) - if err != nil { - klog.Infof("Unable to find Pod from IP, error: %+v", err) - } else if len(pods) > 0 { - pod, ok := pods[0].(*corev1.Pod) - if !ok { - klog.Warningf("Invalid Pod obj in cache") - } else { - tf.Status.Results[i].Observations[j].Pod = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) + succeeded := false + if tf.Spec.LiveTraffic && tf.Spec.DroppedOnly { + // There should be only one reported NodeResult for droppedOnly + // Traceflow. + if len(tf.Status.Results) > 0 { + succeeded = true + } + } else { + sender := false + receiver := false + for i, nodeResult := range tf.Status.Results { + for j, ob := range nodeResult.Observations { + if ob.Component == crdv1alpha1.ComponentSpoofGuard { + sender = true + } + if ob.Action == crdv1alpha1.ActionDelivered || + ob.Action == crdv1alpha1.ActionDropped || + ob.Action == crdv1alpha1.ActionForwardedOutOfOverlay { + receiver = true + } + if ob.TranslatedDstIP != "" { + // Add Pod ns/name to observation if TranslatedDstIP (a.k.a. Service Endpoint address) is Pod IP. + pods, err := c.podInformer.Informer().GetIndexer().ByIndex(podIPsIndex, ob.TranslatedDstIP) + if err != nil { + klog.Infof("Unable to find Pod from IP, error: %+v", err) + } else if len(pods) > 0 { + pod, ok := pods[0].(*corev1.Pod) + if !ok { + klog.Warningf("Invalid Pod obj in cache") + } else { + tf.Status.Results[i].Observations[j].Pod = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) + } } } } } + succeeded = sender && receiver } - if sender && receiver { + if succeeded { c.deallocateTagForTF(tf) return c.updateTraceflowStatus(tf, crdv1alpha1.Succeeded, "", 0) } diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 0744978ea20..1a16765d5a5 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -370,6 +370,7 @@ type Packet struct { SourceMAC net.HardwareAddr DestinationIP net.IP SourceIP net.IP + IPLength uint16 IPProto uint8 IPFlags uint16 TTL uint8 diff --git a/pkg/ovs/openflow/ofctrl_packetin.go b/pkg/ovs/openflow/ofctrl_packetin.go new file mode 100644 index 00000000000..21a6963b08c --- /dev/null +++ b/pkg/ovs/openflow/ofctrl_packetin.go @@ -0,0 +1,108 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "github.com/contiv/libOpenflow/protocol" + "github.com/contiv/libOpenflow/util" + "github.com/contiv/ofnet/ofctrl" +) + +// GetTCPHeaderData gets TCP header data from IP packet. +func GetTCPHeaderData(ipPkt util.Message) (tcpSrcPort uint16, tcpDstPort uint16, tcpSeqNum uint32, tcpAckNum uint32, tcpFlags uint8, err error) { + var tcpBytes []byte + + // Transfer Buffer to TCP + switch typedIPPkt := ipPkt.(type) { + case *protocol.IPv4: + tcpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() + case *protocol.IPv6: + tcpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() + } + if err != nil { + return 0, 0, 0, 0, 0, err + } + tcpIn := new(protocol.TCP) + err = tcpIn.UnmarshalBinary(tcpBytes) + if err != nil { + return 0, 0, 0, 0, 0, err + } + + return tcpIn.PortSrc, tcpIn.PortDst, tcpIn.SeqNum, tcpIn.AckNum, tcpIn.Code, nil +} + +func getUDPHeaderData(ipPkt util.Message) (udpSrcPort uint16, udpDstPort uint16, err error) { + var udpBytes []byte + + switch typedIPPkt := ipPkt.(type) { + case *protocol.IPv4: + udpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() + case *protocol.IPv6: + udpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() + } + if err != nil { + return 0, 0, err + } + udpIn := new(protocol.UDP) + err = udpIn.UnmarshalBinary(udpBytes) + if err != nil { + return 0, 0, err + } + + return udpIn.PortSrc, udpIn.PortDst, nil +} + +func ParsePacketIn(pktIn *ofctrl.PacketIn) (*Packet, error) { + packet := Packet{} + packet.DestinationMAC = pktIn.Data.HWDst + packet.SourceMAC = pktIn.Data.HWSrc + + if pktIn.Data.Ethertype == protocol.IPv4_MSG { + ipPkt := pktIn.Data.Data.(*protocol.IPv4) + packet.DestinationIP = ipPkt.NWDst + packet.SourceIP = ipPkt.NWSrc + packet.TTL = ipPkt.TTL + packet.IPProto = ipPkt.Protocol + packet.IPFlags = ipPkt.Flags + packet.IPLength = ipPkt.Length + } else if pktIn.Data.Ethertype == protocol.IPv6_MSG { + ipPkt := pktIn.Data.Data.(*protocol.IPv6) + packet.DestinationIP = ipPkt.NWDst + packet.SourceIP = ipPkt.NWSrc + packet.TTL = ipPkt.HopLimit + packet.IPProto = ipPkt.NextHeader + // IPv6 header includes only playload length. Add 40 to count in + // the IPv6 header length. + packet.IPLength = ipPkt.Length + 40 + packet.IsIPv6 = true + } else { + // Not an IP packet. + return &packet, nil + } + + var err error + if packet.IPProto == protocol.Type_TCP { + packet.SourcePort, packet.DestinationPort, _, _, packet.TCPFlags, err = GetTCPHeaderData(pktIn.Data.Data) + if err != nil { + return nil, err + } + } else if packet.IPProto == protocol.Type_UDP { + packet.SourcePort, packet.DestinationPort, err = getUDPHeaderData(pktIn.Data.Data) + if err != nil { + return nil, err + } + } + return &packet, nil +} diff --git a/pkg/ovs/openflow/ofctrl_packetin_test.go b/pkg/ovs/openflow/ofctrl_packetin_test.go new file mode 100644 index 00000000000..c358464f264 --- /dev/null +++ b/pkg/ovs/openflow/ofctrl_packetin_test.go @@ -0,0 +1,75 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "testing" + + "github.com/contiv/libOpenflow/protocol" + "github.com/contiv/libOpenflow/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetTCPHeaderData(t *testing.T) { + type args struct { + tcp protocol.TCP + expectTCPSrcPort uint16 + expectTCPDstPort uint16 + expectTCPSeqNum uint32 + expectTCPAckNum uint32 + expectTCPCode uint8 + } + tests := []struct { + name string + args args + }{ + { + name: "ipv4", + args: args{ + tcp: protocol.TCP{ + PortSrc: 1080, + PortDst: 80, + SeqNum: 0, + AckNum: 0, + Code: 2, + }, + expectTCPSrcPort: 1080, + expectTCPDstPort: 80, + expectTCPSeqNum: 0, + expectTCPAckNum: 0, + expectTCPCode: 2, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tcp := tt.args.tcp + pktIn := new(protocol.IPv4) + bytes, _ := tcp.MarshalBinary() + bf := new(util.Buffer) + bf.UnmarshalBinary(bytes) + pktIn.Data = bf + + tcpSrcPort, tcpDstPort, tcpSeqNum, tcpAckNum, tcpCode, err := GetTCPHeaderData(pktIn) + require.NoError(t, err, "GetTCPHeaderData() returned an error") + assert.Equal(t, tt.args.expectTCPSrcPort, tcpSrcPort) + assert.Equal(t, tt.args.expectTCPDstPort, tcpDstPort) + assert.Equal(t, tt.args.expectTCPSeqNum, tcpSeqNum) + assert.Equal(t, tt.args.expectTCPAckNum, tcpAckNum) + assert.Equal(t, tt.args.expectTCPCode, tcpCode) + }) + } +} diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index d210563d041..ee1465350e6 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "reflect" "strings" "testing" "time" @@ -40,6 +41,7 @@ type testcase struct { tf *v1alpha1.Traceflow expectedPhase v1alpha1.TraceflowPhase expectedResults []v1alpha1.NodeResult + expectedPktCap *v1alpha1.Packet // required IP version, skip if not match, default is 0 (no restrict) ipVersion int } @@ -516,6 +518,12 @@ func TestTraceflowIntraNode(t *testing.T) { }, }, }, + expectedPktCap: &v1alpha1.Packet{ + SrcIP: node1IPs[0].ipv4.String(), + DstIP: dstPodIPv4Str, + Length: 84, // default ping packet length. + IPHeader: v1alpha1.IPHeader{Protocol: 1, TTL: 64, Flags: 2}, + }, }, { name: "intraNodeTraceflowIPv6", @@ -1916,4 +1924,7 @@ func runTestTraceflow(t *testing.T, data *TestData, tc testcase) { } } } + if tc.expectedPktCap != nil && !reflect.DeepEqual(tc.expectedPktCap, tf.Status.CapturedPacket) { + t.Fatalf("Captured packet should be: %+v, but got: %+v", tc.expectedPktCap, tf.Status.CapturedPacket) + } }