Skip to content

Commit

Permalink
fix: fix dual stack estimation.
Browse files Browse the repository at this point in the history
Signed-off-by: IRONICBo <boironic@gmail.com>
  • Loading branch information
IRONICBo committed Apr 26, 2024
1 parent 6915393 commit cea5be0
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 52 deletions.
2 changes: 1 addition & 1 deletion build/charts/antrea/crds/nodelatencymonitor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ spec:
format: int32
minimum: 1
description: "Ping interval in seconds, must be at least 1."
default: 10
default: 60
metadata:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4335,7 +4335,7 @@ spec:
format: int32
minimum: 1
description: "Ping interval in seconds, must be at least 1."
default: 10
default: 60
metadata:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/antrea-crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4312,7 +4312,7 @@ spec:
format: int32
minimum: 1
description: "Ping interval in seconds, must be at least 1."
default: 10
default: 60
metadata:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4335,7 +4335,7 @@ spec:
format: int32
minimum: 1
description: "Ping interval in seconds, must be at least 1."
default: 10
default: 60
metadata:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4335,7 +4335,7 @@ spec:
format: int32
minimum: 1
description: "Ping interval in seconds, must be at least 1."
default: 10
default: 60
metadata:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4335,7 +4335,7 @@ spec:
format: int32
minimum: 1
description: "Ping interval in seconds, must be at least 1."
default: 10
default: 60
metadata:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4335,7 +4335,7 @@ spec:
format: int32
minimum: 1
description: "Ping interval in seconds, must be at least 1."
default: 10
default: 60
metadata:
type: object
properties:
Expand Down
4 changes: 2 additions & 2 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,8 +929,8 @@ func run(o *Options) error {
nodeLatencyMonitor := monitortool.NewNodeLatencyMonitor(
nodeInformer,
nodeLatencyMonitorInformer,
nodeConfig.GatewayConfig,
networkConfig.TrafficEncapMode.IsNetworkPolicyOnly(),
nodeConfig,
networkConfig.TrafficEncapMode,
)
go nodeLatencyMonitor.Run(stopCh)
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/agent/monitortool/latency_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type LatencyStore struct {
// Lock for the latency store
mutex sync.RWMutex

// isNetworkPolicyOnly is the flag to indicate if the Antrea Agent is running in network policy only mode.
// Whether the agent is running in network policy only mode
isNetworkPolicyOnly bool
// The map of node ip to latency entry, it will be changed by latency monitor
nodeIPLatencyMap map[string]*NodeIPLatencyEntry
Expand Down Expand Up @@ -120,7 +120,7 @@ func (l *LatencyStore) updateNodeMap(node *corev1.Node) {
if l.isNetworkPolicyOnly {
transportIPs, err := getTransportIPs(node)
if err != nil {
klog.Errorf("Failed to get transport IPs for Node %s: %v", node.Name, err)
klog.ErrorS(err, "Failed to get transport IPs for Node", "nodeName", node.Name)
return
}

Expand All @@ -129,29 +129,27 @@ func (l *LatencyStore) updateNodeMap(node *corev1.Node) {
} else {
gw0IPs, err := getGWIPs(node)
if err != nil {
klog.Errorf("Failed to get gateway IPs for Node %s: %v", node.Name, err)
klog.ErrorS(err, "Failed to get gateway IPs for Node", "nodeName", node.Name)
return
}

// Add the node to the node IP map
l.nodeGatewayMap[node.Name] = gw0IPs
}

klog.Infof("Node %s has gateway IPs %v", node.Name, l.nodeGatewayMap[node.Name])
}

func getTransportIPs(node *corev1.Node) ([]net.IP, error) {
var transportIPs []net.IP
dualIP, err := k8s.GetNodeTransportAddrs(node)
ips, err := k8s.GetNodeTransportAddrs(node)
if err != nil {
return transportIPs, err
}

if dualIP.IPv4 != nil {
transportIPs = append(transportIPs, dualIP.IPv4)
if ips.IPv4 != nil {
transportIPs = append(transportIPs, ips.IPv4)
}
if dualIP.IPv6 != nil {
transportIPs = append(transportIPs, dualIP.IPv6)
if ips.IPv6 != nil {
transportIPs = append(transportIPs, ips.IPv6)
}

return transportIPs, nil
Expand All @@ -163,22 +161,24 @@ func getGWIPs(node *corev1.Node) ([]net.IP, error) {
podCIDRStrs := getPodCIDRsOnNode(node)
if len(podCIDRStrs) == 0 {
// Skip the node if it does not have a PodCIDR.
klog.Errorf("Node %s does not have a PodCIDR", node.Name)
return gwIPs, errors.New("node does not have a PodCIDR")
err := errors.New("node does not have a PodCIDR")
klog.ErrorS(err, "Node does not have a PodCIDR", "nodeName", node.Name)
return gwIPs, err
}

// the 0th entry must match the podCIDR field. It may contain at most 1 value for
// each of IPv4 and IPv6.
// Both podCIDRStrs need to be parsed to get the gateway IP.
for _, podCIDR := range podCIDRStrs {
if podCIDR == "" {
klog.Errorf("PodCIDR is empty for Node %s", node.Name)
return gwIPs, errors.New("PodCIDR is empty")
err := errors.New("PodCIDR is empty")
klog.ErrorS(err, "PodCIDR is empty", "nodeName", node.Name)
return gwIPs, err
}

peerPodCIDRAddr, _, err := net.ParseCIDR(podCIDR)
if err != nil {
klog.Errorf("Failed to parse PodCIDR %s for Node %s", podCIDR, node.Name)
klog.ErrorS(err, "Failed to parse PodCIDR", "nodeName", node.Name)
return gwIPs, err
}

Expand Down
69 changes: 44 additions & 25 deletions pkg/agent/monitortool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,21 @@ func getICMPSeq() uint32 {

// NodeLatencyMonitor is a tool to monitor the latency of the node.
type NodeLatencyMonitor struct {
// Gateway config
gatewayConfig *config.GatewayConfig
// Node config
nodeConfig *config.NodeConfig
// latencyStore is the cache to store the latency of each nodes.
latencyStore *LatencyStore
// latencyConfig is the config for the latency monitor.
latencyConfig *LatencyConfig
// latencyConfigChanged is the channel to notify the latency config changed.
latencyConfigChanged chan struct{}
// gatewayConfig is the traffic encap mode for the latency monitor,
// indicate if the Antrea Agent is running in network policy only mode.
trafficEncapMode config.TrafficEncapModeType
// isIPv4Enabled is the flag to indicate if the IPv4 is enabled.
isIPv4Enabled bool
// isIPv6Enabled is the flag to indicate if the IPv6 is enabled.
isIPv6Enabled bool

// The map of node name to node info, it will changed by node watcher
nodeInformer coreinformers.NodeInformer
Expand All @@ -90,17 +97,31 @@ type LatencyConfig struct {

func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer,
nlmInformer crdinformers.NodeLatencyMonitorInformer,
gatewayConfig *config.GatewayConfig,
isNetworkPolicyOnly bool) *NodeLatencyMonitor {
nodeConfig *config.NodeConfig,
trafficEncapMode config.TrafficEncapModeType) *NodeLatencyMonitor {
m := &NodeLatencyMonitor{
gatewayConfig: gatewayConfig,
latencyStore: NewLatencyStore(isNetworkPolicyOnly),
nodeConfig: nodeConfig,
trafficEncapMode: trafficEncapMode,
latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()),
latencyConfig: &LatencyConfig{Enable: false},
latencyConfigChanged: make(chan struct{}, 1),
nodeInformer: nodeInformer,
nodeLatencyMonitorInformer: nlmInformer,
}

// Get the IPv4/IPv6 enabled status
isIPv4Enabled, err := config.IsIPv4Enabled(m.nodeConfig, m.trafficEncapMode)
if err != nil {
klog.ErrorS(err, "Failed to get IPv4 enabled status")
}
isIPv6Enabled, err := config.IsIPv6Enabled(m.nodeConfig, m.trafficEncapMode)
if err != nil {
klog.ErrorS(err, "Failed to get IPv6 enabled status")
}
m.isIPv4Enabled = isIPv4Enabled
m.isIPv6Enabled = isIPv6Enabled

// Add node informer event handler for Node
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: m.onNodeAdd,
UpdateFunc: m.onNodeUpdate,
Expand Down Expand Up @@ -188,8 +209,6 @@ func (m *NodeLatencyMonitor) updateLatencyConfig(nlm *v1alpha1.NodeLatencyMonito

// onNodeLatencyMonitorDelete is the event handler for deleting NodeLatencyMonitor.
func (m *NodeLatencyMonitor) onNodeLatencyMonitorDelete(obj interface{}) {
klog.InfoS("NodeLatencyMonitor deleted", "NodeLatencyMonitor")

// Update the latency config
m.latencyConfig = &LatencyConfig{Enable: false}

Expand Down Expand Up @@ -344,14 +363,14 @@ func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) {
klog.InfoS("Start to ping node", "Node name", name, "Node IP", toIP)
if toIP.To4() != nil && ipv4Socket != nil {
if err := m.sendPing(ipv4Socket, toIP); err != nil {
klog.Warningf("Failed to send ICMP message to node, err: %#v, Node name: %#v, Node IP: %v", err, name, toIP)
klog.InfoS("Failed to send ICMP message to node", "Node name", name, "Node IP", toIP)
}
} else if toIP.To16() != nil && ipv6Socket != nil {
if err := m.sendPing(ipv6Socket, toIP); err != nil {
klog.Warningf("Failed to send ICMP message to node, err: %#v, Node name: %#v, Node IP: %v", err, name, toIP)
klog.InfoS("Failed to send ICMP message to node", "Node name", name, "Node IP", toIP)
}
} else {
klog.Warningf("Failed to send ICMP message to node, Node name: %#v, Node IP: %v", name, toIP)
klog.InfoS("Failed to send ICMP message to node", "Node name", name, "Node IP", toIP)
}
}
}
Expand Down Expand Up @@ -381,7 +400,7 @@ func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) {

func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
// Low level goroutine to handle ping loop
var currentTicker *time.Ticker
var ticker *time.Ticker
var tickerCh <-chan time.Time
var ipv4Socket, ipv6Socket net.PacketConn
var err error
Expand All @@ -394,19 +413,19 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
if ipv6Socket != nil {
ipv6Socket.Close()
}
if currentTicker != nil {
currentTicker.Stop()
if ticker != nil {
ticker.Stop()
}
close(tickerStopCh)
}()

// Update current ticker based on the latencyConfig
updateTicker := func(interval time.Duration) {
if currentTicker != nil {
currentTicker.Stop() // Stop the current ticker
if ticker != nil {
ticker.Stop() // Stop the current ticker
}
currentTicker = time.NewTicker(interval)
tickerCh = currentTicker.C
ticker = time.NewTicker(interval)
tickerCh = ticker.C
}

// Start the pingAll goroutine
Expand All @@ -427,8 +446,8 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
}
case <-stopCh:
// Stop the ticker loop
if currentTicker != nil {
currentTicker.Stop()
if ticker != nil {
ticker.Stop()
}
close(tickerStopCh)
return
Expand All @@ -441,7 +460,7 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
// If the recvPing socket is closed, restart it
// In case of IPv4-only or IPv6-only, we need to check the socket status,
// and restart it if it is closed(CRD is deleted).
if ipv4Socket == nil && m.gatewayConfig.IPv4 != nil {
if ipv4Socket == nil && m.isIPv4Enabled {
// Create a new socket for IPv4 when the gatewayConfig is IPv4-only
ipv4Socket, err = icmp.ListenPacket(IPv4ProtocolICMPRaw, "0.0.0.0")
if err != nil {
Expand All @@ -450,7 +469,7 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
}
go m.recvPing(ipv4Socket, true, tickerStopCh)
}
if ipv6Socket == nil && m.gatewayConfig.IPv6 != nil {
if ipv6Socket == nil && m.isIPv6Enabled {
// Create a new socket for IPv6 when the gatewayConfig is IPv6-only
ipv6Socket, err = icmp.ListenPacket(IPv6ProtocolICMPRaw, "::")
if err != nil {
Expand All @@ -461,9 +480,9 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
}
} else {
// latencyConfig deleted
if currentTicker != nil {
currentTicker.Stop()
currentTicker = nil
if ticker != nil {
ticker.Stop()
ticker = nil
}
close(tickerStopCh)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/monitortool/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestNewNodeLatencyMonitor(t *testing.T) {
nodeLatencyMonitor := NewNodeLatencyMonitor(
nodeInformer,
nlmInformer,
&config.GatewayConfig{},
false,
&config.NodeConfig{},
config.TrafficEncapModeNetworkPolicyOnly,
)
assert.NotNil(t, nodeLatencyMonitor)

Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/crd/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&ExternalNodeList{},
&SupportBundleCollection{},
&SupportBundleCollectionList{},
&NodeLatencyMonitor{},
&NodeLatencyMonitorList{},
)

metav1.AddToGroupVersion(
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/crd/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ type NodeLatencyMonitorSpec struct {
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// NodeLatencyMonitor is only a singleton resource, so it does not use a list type.
// But current k8s client-gen does not support generating client for singleton informer resource,
// so we have to define a list type for CRD Informer.
// Maybe we will remove it in the future.
type NodeLatencyMonitorList struct {
metav1.TypeMeta `json:",inline"`
// +optional
Expand Down
2 changes: 1 addition & 1 deletion pkg/features/antrea_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ const (
// Enable layer 7 flow export on Pods and Namespaces
L7FlowExporter featuregate.Feature = "L7FlowExporter"

// alpha: v2.0
// alpha: v2.1
// Enable the NodeLatencyMonitor feature.
NodeLatencyMonitor featuregate.Feature = "NodeLatencyMonitor"
)
Expand Down

0 comments on commit cea5be0

Please sign in to comment.