Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid duplicate Node Results in Live Traceflow Status #4715

Merged
merged 2 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions pkg/agent/controller/traceflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ import (
binding "antrea.io/antrea/pkg/ovs/openflow"
)

var skipTraceflowUpdateErr = errors.New("skip Traceflow update")

func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
if !c.traceflowListerSynced() {
return errors.New("traceflow controller is not started")
return errors.New("Traceflow controller is not started")
}
oldTf, nodeResult, packet, err := c.parsePacketIn(pktIn)
if err == skipTraceflowUpdateErr {
return nil
}
if err != nil {
return fmt.Errorf("parsePacketIn error: %v", err)
}
Expand All @@ -47,8 +52,7 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
tf, err := c.traceflowInformer.Lister().Get(oldTf.Name)
if err != nil {
klog.Warningf("Get traceflow failed: %+v", err)
return err
return fmt.Errorf("get Traceflow failed: %w", err)
}
update := tf.DeepCopy()
update.Status.Results = append(update.Status.Results, *nodeResult)
Expand All @@ -57,14 +61,13 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
}
_, err = c.traceflowClient.CrdV1alpha1().Traceflows().UpdateStatus(context.TODO(), update, v1.UpdateOptions{})
if err != nil {
klog.Warningf("Update traceflow failed: %+v", err)
return err
return fmt.Errorf("update Traceflow failed: %w", err)
}
klog.Infof("Updated traceflow %s: %+v", tf.Name, update.Status)
klog.InfoS("Updated Traceflow", "tf", klog.KObj(tf), "status", update.Status)
return nil
})
if err != nil {
return fmt.Errorf("update traceflow error: %v", err)
return fmt.Errorf("Traceflow update error: %w", err)
}
return nil
}
Expand Down Expand Up @@ -130,7 +133,19 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl
}

var capturedPacket *crdv1alpha1.Packet
if tfState.liveTraffic && firstPacket {
if tfState.liveTraffic {
// Live Traceflow only considers the first packet of each
// connection. However, it is possible for 2 connections to
// match the Live Traceflow flows in OVS (before the flows can
// be uninstalled below), leading to 2 Packet In messages being
// processed. If we don't ignore all additional Packet Ins, we
// can end up with duplicate Node observations in the Traceflow
// Status. This situation is more likely when the Live TraceFlow
// request does not specify source / destination ports.
if !firstPacket {
klog.InfoS("An additional Traceflow packet was received unexpectedly for Live Traceflow, ignoring it")
return nil, nil, nil, skipTraceflowUpdateErr
}
// Uninstall the OVS flows after receiving the first packet, to
// avoid capturing too many matched packets.
c.ofClient.UninstallTraceflowFlows(tag)
Expand Down
64 changes: 48 additions & 16 deletions pkg/agent/controller/traceflow/packetin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,24 @@ func TestParseCapturedPacket(t *testing.T) {
}
}

func getTestPacketBytes() []byte {
ipPacket := &protocol.IPv4{
Version: 0x4,
IHL: 5,
Protocol: uint8(8),
DSCP: 1,
Length: 20,
NWSrc: net.IP(pod1IPv4),
NWDst: net.IP(dstIPv4),
}
ethernetPkt := protocol.NewEthernet()
ethernetPkt.HWSrc = pod1MAC
ethernetPkt.Ethertype = protocol.IPv4_MSG
ethernetPkt.Data = ipPacket
pktBytes, _ := ethernetPkt.MarshalBinary()
return pktBytes
}

func TestParsePacketIn(t *testing.T) {
xreg0 := make([]byte, 8)
binary.BigEndian.PutUint32(xreg0[0:4], 262144) // RemoteSNATRegMark in 32bit reg0
Expand All @@ -226,20 +244,7 @@ func TestParsePacketIn(t *testing.T) {
}
matchTunDst := openflow15.NewTunnelIpv4DstField(net.ParseIP(egressIP), nil)

ipPacket := &protocol.IPv4{
Version: 0x4,
IHL: 5,
Protocol: uint8(8),
DSCP: 1,
Length: 20,
NWSrc: net.IP(pod1IPv4),
NWDst: net.IP(dstIPv4),
}
ethernetPkt := protocol.NewEthernet()
ethernetPkt.HWSrc = pod1MAC
ethernetPkt.Ethertype = protocol.IPv4_MSG
ethernetPkt.Data = ipPacket
pktBytes, _ := ethernetPkt.MarshalBinary()
pktBytes := getTestPacketBytes()

egressQuerier := &fakeEgressQuerier{
egressName: egressName,
Expand Down Expand Up @@ -441,9 +446,7 @@ func TestParsePacketIn(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

tfc := newFakeTraceflowController(t, []runtime.Object{tt.expectedTf}, tt.networkConfig, tt.nodeConfig, nil, egressQuerier)
defer tfc.mockController.Finish()
stopCh := make(chan struct{})
defer close(stopCh)
tfc.crdInformerFactory.Start(stopCh)
Expand All @@ -457,3 +460,32 @@ func TestParsePacketIn(t *testing.T) {
})
}
}

func TestParsePacketInLiveDuplicates(t *testing.T) {
networkConfig := &config.NetworkConfig{
TrafficEncapMode: 0,
}
nodeConfig := &config.NodeConfig{
TunnelOFPort: 1,
GatewayConfig: &config.GatewayConfig{
OFPort: 2,
},
}
tfState := &traceflowState{
name: "dummy-traceflow-pod-to-ipv4",
tag: 1,
isSender: true,
liveTraffic: true,
receivedPacket: true, // assume we have already received a packet
}
pktIn := &ofctrl.PacketIn{
TableId: openflow.L2ForwardingOutTable.GetID(),
Data: util.NewBuffer(getTestPacketBytes()),
}

tfc := newFakeTraceflowController(t, nil, networkConfig, nodeConfig, nil, nil)
tfc.runningTraceflows[tfState.tag] = tfState

_, _, _, err := tfc.parsePacketIn(pktIn)
assert.ErrorIs(t, err, skipTraceflowUpdateErr)
}
5 changes: 2 additions & 3 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ func (c *client) parsePacketIn(featurePacketIn *featureStartPacketIn) {
// Use corresponding handlers subscribed to the reason to handle PacketIn
for name, handler := range c.packetInHandlers[featurePacketIn.reason] {
klog.V(2).InfoS("Received packetIn", "reason", featurePacketIn.reason, "handler", name)
err := handler.HandlePacketIn(pktIn)
if err != nil {
klog.Errorf("PacketIn handler %s failed to process packet: %+v", name, err)
if err := handler.HandlePacketIn(pktIn); err != nil {
klog.ErrorS(err, "PacketIn handler failed to process packet", "handler", name)
}
}
}
Expand Down