From c3103a9314c1c4795e313c62903218f80d79e7f2 Mon Sep 17 00:00:00 2001 From: Asklv <47499836+IRONICBo@users.noreply.github.com> Date: Fri, 31 May 2024 08:42:58 +0800 Subject: [PATCH] Support simple ping mesh in Agent (#6120) We introduce a new feature to measure inter-Node latency in a K8s cluster running Antrea. The feature is currently Alpha and uses the NodeLatencyMonitor FeatureGate. In addition to the FeatureGate, enablement of the feature is controlled by a new CRD, called NodeLatencyMonitor. This CRD supports at most one CR instance, which must be named "default". When the CR exists, Antrea Agents will start "pinging" each other to take latency measurements. Each Agent only stores the latest measured value (at least at the moment), we do not store time series data. We support both IPv4 and IPv6. When an oberlay is used by Antrea, the ping is sent over the tunnel (by using the gateway IP as the destination). This change does not add any functionality besides collecting latency data at each Agent. A follow-up change will take care of reporting the latency data to the Antrea Controller, so it can be consumed via an APIService. For #5514 Signed-off-by: IRONICBo Signed-off-by: Asklv --- build/charts/antrea/conf/antrea-agent.conf | 3 + .../antrea/crds/nodelatencymonitor.yaml | 48 ++ .../antrea/templates/agent/clusterrole.yaml | 1 + build/yamls/antrea-aks.yml | 59 ++- build/yamls/antrea-crds.yml | 49 ++ build/yamls/antrea-eks.yml | 59 ++- build/yamls/antrea-gke.yml | 59 ++- build/yamls/antrea-ipsec.yml | 59 ++- build/yamls/antrea.yml | 59 ++- cmd/antrea-agent/agent.go | 13 + pkg/agent/monitortool/latency_store.go | 235 +++++++++ pkg/agent/monitortool/latency_store_test.go | 175 +++++++ pkg/agent/monitortool/monitor.go | 469 ++++++++++++++++++ pkg/apis/crd/v1alpha1/register.go | 2 + pkg/apis/crd/v1alpha1/types.go | 34 ++ .../crd/v1alpha1/zz_generated.deepcopy.go | 76 +++ .../handlers/featuregates/handler_test.go | 1 + .../typed/crd/v1alpha1/crd_client.go | 5 + .../crd/v1alpha1/fake/fake_crd_client.go | 4 + .../v1alpha1/fake/fake_nodelatencymonitor.go | 119 +++++ .../typed/crd/v1alpha1/generated_expansion.go | 2 + .../typed/crd/v1alpha1/nodelatencymonitor.go | 166 +++++++ .../crd/v1alpha1/interface.go | 7 + .../crd/v1alpha1/nodelatencymonitor.go | 87 ++++ .../informers/externalversions/generic.go | 2 + .../crd/v1alpha1/expansion_generated.go | 4 + .../crd/v1alpha1/nodelatencymonitor.go | 66 +++ pkg/features/antrea_features.go | 7 + 28 files changed, 1860 insertions(+), 10 deletions(-) create mode 100644 build/charts/antrea/crds/nodelatencymonitor.yaml create mode 100644 pkg/agent/monitortool/latency_store.go create mode 100644 pkg/agent/monitortool/latency_store_test.go create mode 100644 pkg/agent/monitortool/monitor.go create mode 100644 pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_nodelatencymonitor.go create mode 100644 pkg/client/clientset/versioned/typed/crd/v1alpha1/nodelatencymonitor.go create mode 100644 pkg/client/informers/externalversions/crd/v1alpha1/nodelatencymonitor.go create mode 100644 pkg/client/listers/crd/v1alpha1/nodelatencymonitor.go diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 11ee1f54b0e..71896837a62 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -88,6 +88,9 @@ featureGates: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "L7FlowExporter" "default" false) }} +# Enable NodeLatencyMonitor to monitor the latency between Nodes. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NodeLatencyMonitor" "default" false) }} + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/charts/antrea/crds/nodelatencymonitor.yaml b/build/charts/antrea/crds/nodelatencymonitor.yaml new file mode 100644 index 00000000000..3a90d6acd48 --- /dev/null +++ b/build/charts/antrea/crds/nodelatencymonitor.yaml @@ -0,0 +1,48 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 05ca58c1ab6..25bce70e3d5 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -174,6 +174,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 5581d58a19a..73dcb502d4a 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3624,6 +3675,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4259,6 +4313,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4920,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec labels: app: antrea component: antrea-agent @@ -5158,7 +5213,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-crds.yml b/build/yamls/antrea-crds.yml index b41fc8e8e16..c449e2ced17 100644 --- a/build/yamls/antrea-crds.yml +++ b/build/yamls/antrea-crds.yml @@ -2667,6 +2667,55 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: name: supportbundlecollections.crd.antrea.io spec: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index e60b8f5c8f9..d0435aef30f 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3624,6 +3675,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4259,6 +4313,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4920,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec labels: app: antrea component: antrea-agent @@ -5159,7 +5214,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 89bc22a6b17..8401b0a94b0 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3624,6 +3675,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4259,6 +4313,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4920,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d5cdb5356795c44a69c66fad1b4d67f7c00cdcbe837f3b3b50260e4d9dfd1e7e + checksum/config: 47c353c05a3d0d6da5534e0cd1202fefb175fff41421eb3517bc9f3dd084e2ce labels: app: antrea component: antrea-agent @@ -5156,7 +5211,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d5cdb5356795c44a69c66fad1b4d67f7c00cdcbe837f3b3b50260e4d9dfd1e7e + checksum/config: 47c353c05a3d0d6da5534e0cd1202fefb175fff41421eb3517bc9f3dd084e2ce labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 8fd77921733..91964dd82a3 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3637,6 +3688,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4272,6 +4326,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4933,7 +4988,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 50f2864cf09e4732327b963130bd59a9fc06c560784b161c94e813c000367615 + checksum/config: ce4457f4d8a5bda332dbdd877c15ad152e3bbbcb0c9626c57ccd17518e407562 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -5215,7 +5270,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 50f2864cf09e4732327b963130bd59a9fc06c560784b161c94e813c000367615 + checksum/config: ce4457f4d8a5bda332dbdd877c15ad152e3bbbcb0c9626c57ccd17518e407562 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index b7d129b0c88..53b32787a9b 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3624,6 +3675,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4259,6 +4313,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4920,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ac3c14eed7ca0dc28bf2d659cd2c4e4a39d55278fb9a8759c30ea12eff89e518 + checksum/config: 7c86f86246f3b203e46613dacbd724e9ca8eae3428451acdc7bfe54123deb534 labels: app: antrea component: antrea-agent @@ -5156,7 +5211,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ac3c14eed7ca0dc28bf2d659cd2c4e4a39d55278fb9a8759c30ea12eff89e518 + checksum/config: 7c86f86246f3b203e46613dacbd724e9ca8eae3428451acdc7bfe54123deb534 labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index ea3a0e5fec5..246a1f19a0c 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -52,6 +52,7 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/metrics" + "antrea.io/antrea/pkg/agent/monitortool" "antrea.io/antrea/pkg/agent/multicast" mcroute "antrea.io/antrea/pkg/agent/multicluster" "antrea.io/antrea/pkg/agent/nodeip" @@ -123,6 +124,7 @@ func run(o *Options) error { endpointsInformer := informerFactory.Core().V1().Endpoints() endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices() namespaceInformer := informerFactory.Core().V1().Namespaces() + nodeLatencyMonitorInformer := crdInformerFactory.Crd().V1alpha1().NodeLatencyMonitors() // Create Antrea Clientset for the given config. antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) @@ -929,6 +931,17 @@ func run(o *Options) error { go flowExporter.Run(stopCh) } + // Start the node latency monitor. + if features.DefaultFeatureGate.Enabled(features.NodeLatencyMonitor) && o.nodeType == config.K8sNode { + nodeLatencyMonitor := monitortool.NewNodeLatencyMonitor( + nodeInformer, + nodeLatencyMonitorInformer, + nodeConfig, + networkConfig.TrafficEncapMode, + ) + go nodeLatencyMonitor.Run(stopCh) + } + <-stopCh klog.Info("Stopping Antrea agent") return nil diff --git a/pkg/agent/monitortool/latency_store.go b/pkg/agent/monitortool/latency_store.go new file mode 100644 index 00000000000..78882a72798 --- /dev/null +++ b/pkg/agent/monitortool/latency_store.go @@ -0,0 +1,235 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitortool + +import ( + "errors" + "net" + "sync" + "time" + + "github.com/containernetworking/plugins/pkg/ip" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/util/k8s" +) + +// LatencyStore is a store for latency information of connections between Nodes. +type LatencyStore struct { + // Lock for the latency store + mutex sync.RWMutex + + // Whether the agent is running in networkPolicyOnly mode + isNetworkPolicyOnly bool + // The map of Node IP to latency entry, it will be changed by latency monitor + nodeIPLatencyMap map[string]*NodeIPLatencyEntry + // The map of Node name to Node IP(s), it will be changed by Node watcher + // If the agent is running in networkPolicyOnly mode, the value will be the transport IP of the Node. + // Otherwise, the value will be the gateway IP of the Node + nodeTargetIPsMap map[string][]net.IP +} + +// NodeIPLatencyEntry is the entry of the latency map. +type NodeIPLatencyEntry struct { + // The timestamp of the last sent packet + LastSendTime time.Time + // The timestamp of the last received packet + LastRecvTime time.Time + // The last valid rtt of the connection + LastMeasuredRTT time.Duration +} + +// NewLatencyStore creates a new LatencyStore. +func NewLatencyStore(isNetworkPolicyOnly bool) *LatencyStore { + store := &LatencyStore{ + nodeIPLatencyMap: make(map[string]*NodeIPLatencyEntry), + nodeTargetIPsMap: make(map[string][]net.IP), + isNetworkPolicyOnly: isNetworkPolicyOnly, + } + + return store +} + +// getNodeIPLatencyEntry returns the NodeIPLatencyEntry for the given Node IP +// For now, it is only used for testing purposes. +func (s *LatencyStore) getNodeIPLatencyEntry(nodeIP string) (NodeIPLatencyEntry, bool) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + entry, ok := s.nodeIPLatencyMap[nodeIP] + if !ok { + return NodeIPLatencyEntry{}, ok + } + + return *entry, ok +} + +// SetNodeIPLatencyEntry sets the NodeIPLatencyEntry for the given Node IP +func (s *LatencyStore) SetNodeIPLatencyEntry(nodeIP string, mutator func(entry *NodeIPLatencyEntry)) { + s.mutex.Lock() + defer s.mutex.Unlock() + + entry, ok := s.nodeIPLatencyMap[nodeIP] + if !ok { + entry = &NodeIPLatencyEntry{} + s.nodeIPLatencyMap[nodeIP] = entry + } + + mutator(entry) +} + +// addNode adds a Node to the latency store +func (s *LatencyStore) addNode(node *corev1.Node) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.updateNodeMap(node) +} + +// deleteNode deletes a Node from the latency store +func (s *LatencyStore) deleteNode(node *corev1.Node) { + s.mutex.Lock() + defer s.mutex.Unlock() + + delete(s.nodeTargetIPsMap, node.Name) +} + +// updateNode updates a Node name in the latency store +func (s *LatencyStore) updateNode(new *corev1.Node) { + s.mutex.Lock() + defer s.mutex.Unlock() + + // Node name will not be changed in the same Node update operation. + s.updateNodeMap(new) +} + +// updateNodeMap updates the nodeTargetIPsMap with the IPs of the given Node. +func (s *LatencyStore) updateNodeMap(node *corev1.Node) { + nodeIPs, err := s.getNodeIPs(node) + if err != nil { + klog.ErrorS(err, "Failed to get IPs for Node", "nodeName", node.Name) + return + } + + s.nodeTargetIPsMap[node.Name] = nodeIPs +} + +// getNodeIPs returns the target IPs of the given Node based on the agent mode. +func (s *LatencyStore) getNodeIPs(node *corev1.Node) ([]net.IP, error) { + if s.isNetworkPolicyOnly { + transportIPs, err := getTransportIPs(node) + if err != nil { + return nil, err + } + + return transportIPs, nil + } else { + gw0IPs, err := getGWIPs(node) + if err != nil { + return nil, err + } + + return gw0IPs, nil + } +} + +// getTransportIPs returns the transport IPs of the given Node. +func getTransportIPs(node *corev1.Node) ([]net.IP, error) { + var transportIPs []net.IP + ips, err := k8s.GetNodeTransportAddrs(node) + if err != nil { + return transportIPs, err + } + + if ips.IPv4 != nil { + transportIPs = append(transportIPs, ips.IPv4) + } + if ips.IPv6 != nil { + transportIPs = append(transportIPs, ips.IPv6) + } + + return transportIPs, nil +} + +// getGWIPs returns the gateway IPs of the given Node. +func getGWIPs(node *corev1.Node) ([]net.IP, error) { + var gwIPs []net.IP + + podCIDRStrs := getPodCIDRsOnNode(node) + if len(podCIDRStrs) == 0 { + return nil, errors.New("node does not have a PodCIDR") + } + + for _, podCIDR := range podCIDRStrs { + peerPodCIDRAddr, _, err := net.ParseCIDR(podCIDR) + if err != nil { + return nil, err + } + + // Add first IP in CIDR to the map + peerGatewayIP := ip.NextIP(peerPodCIDRAddr) + gwIPs = append(gwIPs, peerGatewayIP) + } + + return gwIPs, nil +} + +// getPodCIDRsOnNode returns the PodCIDRs of the given Node. +func getPodCIDRsOnNode(node *corev1.Node) []string { + if node.Spec.PodCIDRs != nil { + return node.Spec.PodCIDRs + } + + if node.Spec.PodCIDR == "" { + return nil + } + return []string{node.Spec.PodCIDR} +} + +// ListNodeIPs returns the list of all Node IPs in the latency store. +func (s *LatencyStore) ListNodeIPs() []net.IP { + s.mutex.RLock() + defer s.mutex.RUnlock() + + // Allocate a slice with a capacity equal to twice the size of the map, + // as we can have up to 2 IP addresses per Node in dual-stack case. + nodeIPs := make([]net.IP, 0, 2*len(s.nodeTargetIPsMap)) + for _, ips := range s.nodeTargetIPsMap { + nodeIPs = append(nodeIPs, ips...) + } + + return nodeIPs +} + +// DeleteStaleNodeIPs deletes the stale Node IPs from the nodeIPLatencyMap. +func (s *LatencyStore) DeleteStaleNodeIPs() { + s.mutex.Lock() + defer s.mutex.Unlock() + + nodeIPSet := sets.New[string]() + for _, ips := range s.nodeTargetIPsMap { + for _, ip := range ips { + nodeIPSet.Insert(ip.String()) + } + } + + for nodeIP := range s.nodeIPLatencyMap { + if !nodeIPSet.Has(nodeIP) { + delete(s.nodeIPLatencyMap, nodeIP) + } + } +} diff --git a/pkg/agent/monitortool/latency_store_test.go b/pkg/agent/monitortool/latency_store_test.go new file mode 100644 index 00000000000..8ffdd96a7d5 --- /dev/null +++ b/pkg/agent/monitortool/latency_store_test.go @@ -0,0 +1,175 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitortool + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + entry = NodeIPLatencyEntry{ + LastSendTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + LastRecvTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + LastMeasuredRTT: 1 * time.Second, + } + entry2 = NodeIPLatencyEntry{ + LastSendTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + LastRecvTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + LastMeasuredRTT: 2 * time.Second, + } +) + +func TestLatencyStore_getNodeIPLatencyEntry(t *testing.T) { + tests := []struct { + key string + expectedEntry NodeIPLatencyEntry + }{ + { + key: "10.244.2.1", + expectedEntry: entry, + }, + { + key: "10.244.2.2", + expectedEntry: NodeIPLatencyEntry{}, + }, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + latencyStore := &LatencyStore{ + isNetworkPolicyOnly: false, + nodeIPLatencyMap: map[string]*NodeIPLatencyEntry{ + "10.244.2.1": &entry, + }, + nodeTargetIPsMap: map[string][]net.IP{ + "Node1": {net.ParseIP("10.244.2.1")}, + }, + } + + entry, _ := latencyStore.getNodeIPLatencyEntry(tt.key) + assert.Equal(t, tt.expectedEntry.LastMeasuredRTT, entry.LastMeasuredRTT) + assert.Equal(t, tt.expectedEntry.LastSendTime, entry.LastSendTime) + assert.Equal(t, tt.expectedEntry.LastRecvTime, entry.LastRecvTime) + }) + } +} + +func TestLatencyStore_SetNodeIPLatencyEntry(t *testing.T) { + tests := []struct { + key string + updatedEntry NodeIPLatencyEntry + expectedEntry NodeIPLatencyEntry + }{ + { + key: "10.244.2.1", + updatedEntry: entry, + expectedEntry: entry, + }, + { + key: "10.244.2.2", + updatedEntry: entry2, + expectedEntry: entry2, + }, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + latencyStore := &LatencyStore{ + isNetworkPolicyOnly: false, + nodeIPLatencyMap: map[string]*NodeIPLatencyEntry{ + "10.244.2.1": &entry, + }, + nodeTargetIPsMap: map[string][]net.IP{ + "Node1": {net.ParseIP("10.244.2.1")}, + }, + } + + mutator := func(entry *NodeIPLatencyEntry) { + entry.LastSendTime = tt.updatedEntry.LastSendTime + entry.LastRecvTime = tt.updatedEntry.LastRecvTime + entry.LastMeasuredRTT = tt.updatedEntry.LastMeasuredRTT + } + latencyStore.SetNodeIPLatencyEntry(tt.key, mutator) + entry, ok := latencyStore.getNodeIPLatencyEntry(tt.key) + assert.Equal(t, tt.expectedEntry, entry) + assert.True(t, ok) + }) + } +} + +func TestLatencyStore_DeleteStaleNodeIPs(t *testing.T) { + testKey := "10.244.2.1" + + latencyStore := &LatencyStore{ + isNetworkPolicyOnly: false, + nodeIPLatencyMap: map[string]*NodeIPLatencyEntry{ + testKey: &entry, + }, + nodeTargetIPsMap: map[string][]net.IP{ + "Node1": {net.ParseIP(testKey)}, + }, + } + + // Remove Node + latencyStore.deleteNode(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Node1", + }, + }) + + // Check that the entry is still present + _, ok := latencyStore.getNodeIPLatencyEntry(testKey) + assert.True(t, ok) + + // Check if that the entry has been deleted + latencyStore.DeleteStaleNodeIPs() + _, ok = latencyStore.getNodeIPLatencyEntry(testKey) + assert.False(t, ok) +} + +func TestLatencyStore_ListNodeIPs(t *testing.T) { + tests := []struct { + latentStore *LatencyStore + expectedList []net.IP + }{ + { + latentStore: &LatencyStore{ + isNetworkPolicyOnly: false, + nodeIPLatencyMap: map[string]*NodeIPLatencyEntry{ + "10.244.2.1": &entry, + }, + nodeTargetIPsMap: map[string][]net.IP{ + "Node1": {net.ParseIP("10.244.2.1")}, + }, + }, + expectedList: []net.IP{ + net.ParseIP("10.244.2.1"), + }, + }, + } + + for _, tt := range tests { + t.Run("List Node IPs", func(t *testing.T) { + nodeIPs := tt.latentStore.ListNodeIPs() + assert.Equal(t, tt.expectedList, nodeIPs) + }) + } +} diff --git a/pkg/agent/monitortool/monitor.go b/pkg/agent/monitortool/monitor.go new file mode 100644 index 00000000000..efd6693535d --- /dev/null +++ b/pkg/agent/monitortool/monitor.go @@ -0,0 +1,469 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitortool + +import ( + "math/rand" + "net" + "sync" + "sync/atomic" + "time" + + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" + corev1 "k8s.io/api/core/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" +) + +var ( + icmpSeq uint32 + // #nosec G404: random number generator not used for security purposes. + icmpEchoID = rand.Int31n(1 << 16) +) + +const ( + ipv4ProtocolICMPRaw = "ip4:icmp" + ipv6ProtocolICMPRaw = "ip6:ipv6-icmp" + protocolICMP = 1 + protocolICMPv6 = 58 +) + +// getICMPSeq returns the next sequence number as uint16, +// wrapping around to 0 after reaching the maximum value of uint16. +func getICMPSeq() uint16 { + // Increment the sequence number atomically and get the new value. + // We use atomic.AddUint32 and pass 1 as the increment. + // The returned value is the new value post-increment. + newVal := atomic.AddUint32(&icmpSeq, 1) + + return uint16(newVal) +} + +// NodeLatencyMonitor is a tool to monitor the latency of the Node. +type NodeLatencyMonitor struct { + // latencyStore is the cache to store the latency of each Nodes. + latencyStore *LatencyStore + // latencyConfigChanged is the channel to notify the latency config changed. + latencyConfigChanged chan latencyConfig + // isIPv4Enabled is the flag to indicate whether the IPv4 is enabled. + isIPv4Enabled bool + // isIPv6Enabled is the flag to indicate whether the IPv6 is enabled. + isIPv6Enabled bool + + // nodeName is the name of the current Node, used to filter out the current Node from the latency monitor. + nodeName string + nodeInformer coreinformers.NodeInformer + nodeLatencyMonitorInformer crdinformers.NodeLatencyMonitorInformer +} + +// latencyConfig is the config for the latency monitor. +type latencyConfig struct { + // Enable is the flag to enable the latency monitor. + Enable bool + // Interval is the interval time to ping all Nodes. + Interval time.Duration +} + +// NewNodeLatencyMonitor creates a new NodeLatencyMonitor. +func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer, + nlmInformer crdinformers.NodeLatencyMonitorInformer, + nodeConfig *config.NodeConfig, + trafficEncapMode config.TrafficEncapModeType) *NodeLatencyMonitor { + m := &NodeLatencyMonitor{ + latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()), + latencyConfigChanged: make(chan latencyConfig), + nodeInformer: nodeInformer, + nodeLatencyMonitorInformer: nlmInformer, + nodeName: nodeConfig.Name, + } + + m.isIPv4Enabled, _ = config.IsIPv4Enabled(nodeConfig, trafficEncapMode) + m.isIPv6Enabled, _ = config.IsIPv6Enabled(nodeConfig, trafficEncapMode) + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: m.onNodeAdd, + UpdateFunc: m.onNodeUpdate, + DeleteFunc: m.onNodeDelete, + }) + + nlmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: m.onNodeLatencyMonitorAdd, + UpdateFunc: m.onNodeLatencyMonitorUpdate, + DeleteFunc: m.onNodeLatencyMonitorDelete, + }) + + return m +} + +// Is current node +func (m *NodeLatencyMonitor) isCurrentNode(node *corev1.Node) bool { + return node.Name == m.nodeName +} + +// onNodeAdd is the event handler for adding Node. +func (m *NodeLatencyMonitor) onNodeAdd(obj interface{}) { + node := obj.(*corev1.Node) + if m.isCurrentNode(node) { + return + } + + m.latencyStore.addNode(node) + + klog.V(4).InfoS("Node added", "Node", klog.KObj(node)) +} + +// onNodeUpdate is the event handler for updating Node. +func (m *NodeLatencyMonitor) onNodeUpdate(oldObj, newObj interface{}) { + node := newObj.(*corev1.Node) + if m.isCurrentNode(node) { + return + } + + m.latencyStore.updateNode(node) + + klog.V(4).InfoS("Node updated", "Node", klog.KObj(node)) +} + +// onNodeDelete is the event handler for deleting Node. +func (m *NodeLatencyMonitor) onNodeDelete(obj interface{}) { + node, ok := obj.(*corev1.Node) + if !ok { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.ErrorS(nil, "Received unexpected object", "obj", obj) + return + } + node, ok = deletedState.Obj.(*corev1.Node) + if !ok { + klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-Node object", "obj", deletedState.Obj) + return + } + } + + if m.isCurrentNode(node) { + return + } + + m.latencyStore.deleteNode(node) +} + +// onNodeLatencyMonitorAdd is the event handler for adding NodeLatencyMonitor. +func (m *NodeLatencyMonitor) onNodeLatencyMonitorAdd(obj interface{}) { + nlm := obj.(*v1alpha1.NodeLatencyMonitor) + klog.V(4).InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm)) + + m.updateLatencyConfig(nlm) +} + +// onNodeLatencyMonitorUpdate is the event handler for updating NodeLatencyMonitor. +func (m *NodeLatencyMonitor) onNodeLatencyMonitorUpdate(oldObj, newObj interface{}) { + oldNLM := oldObj.(*v1alpha1.NodeLatencyMonitor) + newNLM := newObj.(*v1alpha1.NodeLatencyMonitor) + klog.V(4).InfoS("NodeLatencyMonitor updated", "NodeLatencyMonitor", klog.KObj(newNLM)) + + if oldNLM.GetGeneration() == newNLM.GetGeneration() { + return + } + + m.updateLatencyConfig(newNLM) +} + +// updateLatencyConfig updates the latency config based on the NodeLatencyMonitor CRD. +func (m *NodeLatencyMonitor) updateLatencyConfig(nlm *v1alpha1.NodeLatencyMonitor) { + pingInterval := time.Duration(nlm.Spec.PingIntervalSeconds) * time.Second + + latencyConfig := latencyConfig{ + Enable: true, + Interval: pingInterval, + } + + m.latencyConfigChanged <- latencyConfig +} + +// onNodeLatencyMonitorDelete is the event handler for deleting NodeLatencyMonitor. +func (m *NodeLatencyMonitor) onNodeLatencyMonitorDelete(obj interface{}) { + klog.V(4).InfoS("NodeLatencyMonitor deleted") + latencyConfig := latencyConfig{Enable: false} + + m.latencyConfigChanged <- latencyConfig +} + +// sendPing sends an ICMP message to the target IP address. +func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error { + var requestType icmp.Type + + ip := &net.IPAddr{IP: addr} + + if addr.To4() == nil { + requestType = ipv6.ICMPTypeEchoRequest + } else { + requestType = ipv4.ICMPTypeEcho + } + + timeStart := time.Now() + seqID := getICMPSeq() + body := &icmp.Echo{ + ID: int(icmpEchoID), + Seq: int(seqID), + Data: []byte(timeStart.Format(time.RFC3339Nano)), + } + msg := icmp.Message{ + Type: requestType, + Code: 0, + Body: body, + } + klog.V(4).InfoS("Sending ICMP message", "IP", ip, "SeqID", seqID, "body", body) + + // Serialize the ICMP message + msgBytes, err := msg.Marshal(nil) + if err != nil { + return err + } + + // Send the ICMP message + _, err = socket.WriteTo(msgBytes, ip) + if err != nil { + return err + } + + // Create or update the latency store + mutator := func(entry *NodeIPLatencyEntry) { + entry.LastSendTime = timeStart + } + m.latencyStore.SetNodeIPLatencyEntry(addr.String(), mutator) + + return nil +} + +// recvPing receives an ICMP message from the target IP address. +func (m *NodeLatencyMonitor) recvPing(socket net.PacketConn, isIPv4 bool) { + // We only expect small packets, if we receive a larger packet, we will drop the extra data. + readBuffer := make([]byte, 128) + for { + n, peer, err := socket.ReadFrom(readBuffer) + if err != nil { + // When the socket is closed in the Run method, this error will be logged, which is not ideal. + // In the future, we may try setting a ReadDeadline on the socket before each ReadFrom and using + // a channel to signal that the loop should terminate. + klog.ErrorS(err, "Failed to read ICMP message") + return + } + + destIP := peer.String() + + // Parse the ICMP message + var msg *icmp.Message + if isIPv4 { + msg, err = icmp.ParseMessage(protocolICMP, readBuffer[:n]) + if err != nil { + klog.ErrorS(err, "Failed to parse ICMP message") + continue + } + if msg.Type != ipv4.ICMPTypeEcho && msg.Type != ipv4.ICMPTypeEchoReply { + klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) + continue + } + // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) + if msg.Type == ipv4.ICMPTypeEcho { + klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) + continue + } + } else { + msg, err = icmp.ParseMessage(protocolICMPv6, readBuffer[:n]) + if err != nil { + klog.ErrorS(err, "Failed to parse ICMP message") + continue + } + if msg.Type != ipv6.ICMPTypeEchoRequest && msg.Type != ipv6.ICMPTypeEchoReply { + klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) + continue + } + // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) + if msg.Type == ipv6.ICMPTypeEchoRequest { + klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) + continue + } + } + + echo, ok := msg.Body.(*icmp.Echo) + if !ok { + klog.ErrorS(nil, "Failed to assert type as *icmp.Echo") + continue + } + if echo.ID != int(icmpEchoID) { + klog.V(4).InfoS("Ignoring ICMP message with wrong echo ID", "msg", msg) + continue + } + + klog.V(4).InfoS("Received ICMP message", "IP", destIP, "msg", msg) + + // Parse the time from the ICMP data + sentTime, err := time.Parse(time.RFC3339Nano, string(echo.Data)) + if err != nil { + klog.ErrorS(err, "Failed to parse time from ICMP data") + continue + } + + // Calculate the round-trip time + end := time.Now() + rtt := end.Sub(sentTime) + klog.V(4).InfoS("Updating latency entry for Node IP", "IP", destIP, "lastSendTime", sentTime, "lastRecvTime", end, "RTT", rtt) + + // Update the latency store + mutator := func(entry *NodeIPLatencyEntry) { + entry.LastRecvTime = end + entry.LastMeasuredRTT = rtt + } + m.latencyStore.SetNodeIPLatencyEntry(destIP, mutator) + } +} + +// pingAll sends ICMP messages to all the Nodes. +func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) { + klog.V(4).InfoS("Pinging all Nodes") + nodeIPs := m.latencyStore.ListNodeIPs() + for _, toIP := range nodeIPs { + if toIP.To4() != nil && ipv4Socket != nil { + if err := m.sendPing(ipv4Socket, toIP); err != nil { + klog.ErrorS(err, "Cannot send ICMP message to Node IP", "IP", toIP) + } + } else if toIP.To16() != nil && ipv6Socket != nil { + if err := m.sendPing(ipv6Socket, toIP); err != nil { + klog.ErrorS(err, "Cannot send ICMP message to Node IP", "IP", toIP) + } + } else { + klog.ErrorS(nil, "Cannot send ICMP message to Node IP because socket is not initialized for IP family", "IP", toIP) + } + } + klog.V(4).InfoS("Done pinging all Nodes") +} + +// Run starts the NodeLatencyMonitor. +func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) { + go m.monitorLoop(stopCh) + + <-stopCh +} + +// monitorLoop is the main loop to monitor the latency of the Node. +func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { + klog.InfoS("NodeLatencyMonitor is running") + // Low level goroutine to handle ping loop + var ticker *time.Ticker + var tickerCh <-chan time.Time + var ipv4Socket, ipv6Socket net.PacketConn + var err error + + defer func() { + if ipv4Socket != nil { + ipv4Socket.Close() + } + if ipv6Socket != nil { + ipv6Socket.Close() + } + if ticker != nil { + ticker.Stop() + } + }() + + // Update current ticker based on the latencyConfig + updateTicker := func(interval time.Duration) { + if ticker != nil { + ticker.Stop() // Stop the current ticker + } + ticker = time.NewTicker(interval) + tickerCh = ticker.C + } + + wg := sync.WaitGroup{} + // Start the pingAll goroutine + for { + select { + case <-tickerCh: + // Try to send pingAll signal + m.pingAll(ipv4Socket, ipv6Socket) + // We no not delete IPs from nodeIPLatencyMap as part of the Node delete event handler + // to avoid consistency issues and because it would not be sufficient to avoid stale entries completely. + // This means that we have to periodically invoke DeleteStaleNodeIPs to avoid stale entries in the map. + m.latencyStore.DeleteStaleNodeIPs() + case <-stopCh: + return + case latencyConfig := <-m.latencyConfigChanged: + // Start or stop the pingAll goroutine based on the latencyConfig + if latencyConfig.Enable { + // latencyConfig changed + updateTicker(latencyConfig.Interval) + + // If the recvPing socket is closed, + // recreate it if it is closed(CRD is deleted). + if ipv4Socket == nil && m.isIPv4Enabled { + // Create a new socket for IPv4 when it is IPv4-only + ipv4Socket, err = icmp.ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0") + if err != nil { + klog.ErrorS(err, "Failed to create ICMP socket for IPv4") + return + } + wg.Add(1) + go func() { + defer wg.Done() + m.recvPing(ipv4Socket, true) + }() + } + if ipv6Socket == nil && m.isIPv6Enabled { + // Create a new socket for IPv6 when it is IPv6-only + ipv6Socket, err = icmp.ListenPacket(ipv6ProtocolICMPRaw, "::") + if err != nil { + klog.ErrorS(err, "Failed to create ICMP socket for IPv6") + return + } + wg.Add(1) + go func() { + defer wg.Done() + m.recvPing(ipv6Socket, false) + }() + } + } else { + // latencyConfig deleted + if ticker != nil { + ticker.Stop() + ticker = nil + } + tickerCh = nil + + // We close the sockets as a signal to recvPing that it needs to stop. + // Note that at that point, we are guaranteed that there is no ongoing Write + // to the socket, because pingAll runs in the same goroutine as this code. + if ipv4Socket != nil { + ipv4Socket.Close() + } + if ipv6Socket != nil { + ipv6Socket.Close() + } + + // After closing the sockets, wait for the recvPing goroutines to return + wg.Wait() + ipv4Socket = nil + ipv6Socket = nil + } + } + } +} diff --git a/pkg/apis/crd/v1alpha1/register.go b/pkg/apis/crd/v1alpha1/register.go index 5e5b95b4983..75b6c69cde6 100644 --- a/pkg/apis/crd/v1alpha1/register.go +++ b/pkg/apis/crd/v1alpha1/register.go @@ -53,6 +53,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ExternalNodeList{}, &SupportBundleCollection{}, &SupportBundleCollectionList{}, + &NodeLatencyMonitor{}, + &NodeLatencyMonitorList{}, ) metav1.AddToGroupVersion( diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index 726c0b7c935..ac7a7557055 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -291,3 +291,37 @@ type TLSProtocol struct { // SNI (Server Name Indication) indicates the server domain name in the TLS/SSL hello message. SNI string `json:"sni,omitempty"` } + +// +genclient +// +genclient:nonNamespaced +// +genclient:noStatus +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeLatencyMonitor is used to monitor the latency between nodes in a Kubernetes cluster. It is a singleton resource, +// meaning only one instance of it can exist in the cluster. +type NodeLatencyMonitor struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec NodeLatencyMonitorSpec `json:"spec"` +} + +type NodeLatencyMonitorSpec struct { + // PingInterval specifies the interval in seconds between ping requests. + // Ping interval should be greater than or equal to 1s. + PingIntervalSeconds int32 `json:"pingIntervalSeconds"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeLatencyMonitor is only a singleton resource, so it does not use a list type. +// But current k8s client-gen does not support generating client for singleton informer resource, +// so we have to define a list type for CRD Informer. +// Maybe we will remove it in the future. +type NodeLatencyMonitorList struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ListMeta `json:"metadata,omitempty"` + + Items []NodeLatencyMonitor `json:"items"` +} diff --git a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go index 820d4be306a..f06755a80a3 100644 --- a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go @@ -292,6 +292,82 @@ func (in *NetworkInterface) DeepCopy() *NetworkInterface { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeLatencyMonitor) DeepCopyInto(out *NodeLatencyMonitor) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeLatencyMonitor. +func (in *NodeLatencyMonitor) DeepCopy() *NodeLatencyMonitor { + if in == nil { + return nil + } + out := new(NodeLatencyMonitor) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeLatencyMonitor) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeLatencyMonitorList) DeepCopyInto(out *NodeLatencyMonitorList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]NodeLatencyMonitor, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeLatencyMonitorList. +func (in *NodeLatencyMonitorList) DeepCopy() *NodeLatencyMonitorList { + if in == nil { + return nil + } + out := new(NodeLatencyMonitorList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeLatencyMonitorList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeLatencyMonitorSpec) DeepCopyInto(out *NodeLatencyMonitorSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeLatencyMonitorSpec. +func (in *NodeLatencyMonitorSpec) DeepCopy() *NodeLatencyMonitorSpec { + if in == nil { + return nil + } + out := new(NodeLatencyMonitorSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SupportBundleCollection) DeepCopyInto(out *SupportBundleCollection) { *out = *in diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index 51f49a68ff5..3a8f3d78494 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -68,6 +68,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "Multicast", Status: multicastStatus, Version: "BETA"}, {Component: "agent", Name: "Multicluster", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "NetworkPolicyStats", Status: "Enabled", Version: "BETA"}, + {Component: "agent", Name: "NodeLatencyMonitor", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "NodeNetworkPolicy", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "NodePortLocal", Status: "Enabled", Version: "GA"}, {Component: "agent", Name: "SecondaryNetwork", Status: "Disabled", Version: "ALPHA"}, diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go index 64e4f3bd836..64b9ed11579 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go @@ -27,6 +27,7 @@ import ( type CrdV1alpha1Interface interface { RESTClient() rest.Interface ExternalNodesGetter + NodeLatencyMonitorsGetter SupportBundleCollectionsGetter } @@ -39,6 +40,10 @@ func (c *CrdV1alpha1Client) ExternalNodes(namespace string) ExternalNodeInterfac return newExternalNodes(c, namespace) } +func (c *CrdV1alpha1Client) NodeLatencyMonitors() NodeLatencyMonitorInterface { + return newNodeLatencyMonitors(c) +} + func (c *CrdV1alpha1Client) SupportBundleCollections() SupportBundleCollectionInterface { return newSupportBundleCollections(c) } diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go index e442aae39df..036a1f768af 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go @@ -30,6 +30,10 @@ func (c *FakeCrdV1alpha1) ExternalNodes(namespace string) v1alpha1.ExternalNodeI return &FakeExternalNodes{c, namespace} } +func (c *FakeCrdV1alpha1) NodeLatencyMonitors() v1alpha1.NodeLatencyMonitorInterface { + return &FakeNodeLatencyMonitors{c} +} + func (c *FakeCrdV1alpha1) SupportBundleCollections() v1alpha1.SupportBundleCollectionInterface { return &FakeSupportBundleCollections{c} } diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_nodelatencymonitor.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_nodelatencymonitor.go new file mode 100644 index 00000000000..76a415f8958 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_nodelatencymonitor.go @@ -0,0 +1,119 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeNodeLatencyMonitors implements NodeLatencyMonitorInterface +type FakeNodeLatencyMonitors struct { + Fake *FakeCrdV1alpha1 +} + +var nodelatencymonitorsResource = v1alpha1.SchemeGroupVersion.WithResource("nodelatencymonitors") + +var nodelatencymonitorsKind = v1alpha1.SchemeGroupVersion.WithKind("NodeLatencyMonitor") + +// Get takes name of the nodeLatencyMonitor, and returns the corresponding nodeLatencyMonitor object, and an error if there is any. +func (c *FakeNodeLatencyMonitors) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetAction(nodelatencymonitorsResource, name), &v1alpha1.NodeLatencyMonitor{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NodeLatencyMonitor), err +} + +// List takes label and field selectors, and returns the list of NodeLatencyMonitors that match those selectors. +func (c *FakeNodeLatencyMonitors) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.NodeLatencyMonitorList, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootListAction(nodelatencymonitorsResource, nodelatencymonitorsKind, opts), &v1alpha1.NodeLatencyMonitorList{}) + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.NodeLatencyMonitorList{ListMeta: obj.(*v1alpha1.NodeLatencyMonitorList).ListMeta} + for _, item := range obj.(*v1alpha1.NodeLatencyMonitorList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested nodeLatencyMonitors. +func (c *FakeNodeLatencyMonitors) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewRootWatchAction(nodelatencymonitorsResource, opts)) +} + +// Create takes the representation of a nodeLatencyMonitor and creates it. Returns the server's representation of the nodeLatencyMonitor, and an error, if there is any. +func (c *FakeNodeLatencyMonitors) Create(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.CreateOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootCreateAction(nodelatencymonitorsResource, nodeLatencyMonitor), &v1alpha1.NodeLatencyMonitor{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NodeLatencyMonitor), err +} + +// Update takes the representation of a nodeLatencyMonitor and updates it. Returns the server's representation of the nodeLatencyMonitor, and an error, if there is any. +func (c *FakeNodeLatencyMonitors) Update(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.UpdateOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateAction(nodelatencymonitorsResource, nodeLatencyMonitor), &v1alpha1.NodeLatencyMonitor{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NodeLatencyMonitor), err +} + +// Delete takes name of the nodeLatencyMonitor and deletes it. Returns an error if one occurs. +func (c *FakeNodeLatencyMonitors) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewRootDeleteActionWithOptions(nodelatencymonitorsResource, name, opts), &v1alpha1.NodeLatencyMonitor{}) + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeNodeLatencyMonitors) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewRootDeleteCollectionAction(nodelatencymonitorsResource, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.NodeLatencyMonitorList{}) + return err +} + +// Patch applies the patch and returns the patched nodeLatencyMonitor. +func (c *FakeNodeLatencyMonitors) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NodeLatencyMonitor, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootPatchSubresourceAction(nodelatencymonitorsResource, name, pt, data, subresources...), &v1alpha1.NodeLatencyMonitor{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NodeLatencyMonitor), err +} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go index 05db7473c1b..34fa36922dd 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go @@ -18,4 +18,6 @@ package v1alpha1 type ExternalNodeExpansion interface{} +type NodeLatencyMonitorExpansion interface{} + type SupportBundleCollectionExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/nodelatencymonitor.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/nodelatencymonitor.go new file mode 100644 index 00000000000..cfaef7eebd1 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/nodelatencymonitor.go @@ -0,0 +1,166 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + scheme "antrea.io/antrea/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// NodeLatencyMonitorsGetter has a method to return a NodeLatencyMonitorInterface. +// A group's client should implement this interface. +type NodeLatencyMonitorsGetter interface { + NodeLatencyMonitors() NodeLatencyMonitorInterface +} + +// NodeLatencyMonitorInterface has methods to work with NodeLatencyMonitor resources. +type NodeLatencyMonitorInterface interface { + Create(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.CreateOptions) (*v1alpha1.NodeLatencyMonitor, error) + Update(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.UpdateOptions) (*v1alpha1.NodeLatencyMonitor, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.NodeLatencyMonitor, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.NodeLatencyMonitorList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NodeLatencyMonitor, err error) + NodeLatencyMonitorExpansion +} + +// nodeLatencyMonitors implements NodeLatencyMonitorInterface +type nodeLatencyMonitors struct { + client rest.Interface +} + +// newNodeLatencyMonitors returns a NodeLatencyMonitors +func newNodeLatencyMonitors(c *CrdV1alpha1Client) *nodeLatencyMonitors { + return &nodeLatencyMonitors{ + client: c.RESTClient(), + } +} + +// Get takes name of the nodeLatencyMonitor, and returns the corresponding nodeLatencyMonitor object, and an error if there is any. +func (c *nodeLatencyMonitors) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + result = &v1alpha1.NodeLatencyMonitor{} + err = c.client.Get(). + Resource("nodelatencymonitors"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of NodeLatencyMonitors that match those selectors. +func (c *nodeLatencyMonitors) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.NodeLatencyMonitorList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.NodeLatencyMonitorList{} + err = c.client.Get(). + Resource("nodelatencymonitors"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested nodeLatencyMonitors. +func (c *nodeLatencyMonitors) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Resource("nodelatencymonitors"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a nodeLatencyMonitor and creates it. Returns the server's representation of the nodeLatencyMonitor, and an error, if there is any. +func (c *nodeLatencyMonitors) Create(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.CreateOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + result = &v1alpha1.NodeLatencyMonitor{} + err = c.client.Post(). + Resource("nodelatencymonitors"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(nodeLatencyMonitor). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a nodeLatencyMonitor and updates it. Returns the server's representation of the nodeLatencyMonitor, and an error, if there is any. +func (c *nodeLatencyMonitors) Update(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.UpdateOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + result = &v1alpha1.NodeLatencyMonitor{} + err = c.client.Put(). + Resource("nodelatencymonitors"). + Name(nodeLatencyMonitor.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(nodeLatencyMonitor). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the nodeLatencyMonitor and deletes it. Returns an error if one occurs. +func (c *nodeLatencyMonitors) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Resource("nodelatencymonitors"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *nodeLatencyMonitors) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Resource("nodelatencymonitors"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched nodeLatencyMonitor. +func (c *nodeLatencyMonitors) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NodeLatencyMonitor, err error) { + result = &v1alpha1.NodeLatencyMonitor{} + err = c.client.Patch(pt). + Resource("nodelatencymonitors"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go index 89897d70a68..c83d379324d 100644 --- a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( type Interface interface { // ExternalNodes returns a ExternalNodeInformer. ExternalNodes() ExternalNodeInformer + // NodeLatencyMonitors returns a NodeLatencyMonitorInformer. + NodeLatencyMonitors() NodeLatencyMonitorInformer // SupportBundleCollections returns a SupportBundleCollectionInformer. SupportBundleCollections() SupportBundleCollectionInformer } @@ -44,6 +46,11 @@ func (v *version) ExternalNodes() ExternalNodeInformer { return &externalNodeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// NodeLatencyMonitors returns a NodeLatencyMonitorInformer. +func (v *version) NodeLatencyMonitors() NodeLatencyMonitorInformer { + return &nodeLatencyMonitorInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} +} + // SupportBundleCollections returns a SupportBundleCollectionInformer. func (v *version) SupportBundleCollections() SupportBundleCollectionInformer { return &supportBundleCollectionInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/nodelatencymonitor.go b/pkg/client/informers/externalversions/crd/v1alpha1/nodelatencymonitor.go new file mode 100644 index 00000000000..69cb82ef0b3 --- /dev/null +++ b/pkg/client/informers/externalversions/crd/v1alpha1/nodelatencymonitor.go @@ -0,0 +1,87 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + versioned "antrea.io/antrea/pkg/client/clientset/versioned" + internalinterfaces "antrea.io/antrea/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// NodeLatencyMonitorInformer provides access to a shared informer and lister for +// NodeLatencyMonitors. +type NodeLatencyMonitorInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.NodeLatencyMonitorLister +} + +type nodeLatencyMonitorInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// NewNodeLatencyMonitorInformer constructs a new informer for NodeLatencyMonitor type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewNodeLatencyMonitorInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredNodeLatencyMonitorInformer(client, resyncPeriod, indexers, nil) +} + +// NewFilteredNodeLatencyMonitorInformer constructs a new informer for NodeLatencyMonitor type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredNodeLatencyMonitorInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().NodeLatencyMonitors().List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().NodeLatencyMonitors().Watch(context.TODO(), options) + }, + }, + &crdv1alpha1.NodeLatencyMonitor{}, + resyncPeriod, + indexers, + ) +} + +func (f *nodeLatencyMonitorInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredNodeLatencyMonitorInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *nodeLatencyMonitorInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&crdv1alpha1.NodeLatencyMonitor{}, f.defaultInformer) +} + +func (f *nodeLatencyMonitorInformer) Lister() v1alpha1.NodeLatencyMonitorLister { + return v1alpha1.NewNodeLatencyMonitorLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index f5bef826e02..e692c359dd2 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -55,6 +55,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=crd.antrea.io, Version=v1alpha1 case v1alpha1.SchemeGroupVersion.WithResource("externalnodes"): return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().ExternalNodes().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("nodelatencymonitors"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().NodeLatencyMonitors().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("supportbundlecollections"): return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().SupportBundleCollections().Informer()}, nil diff --git a/pkg/client/listers/crd/v1alpha1/expansion_generated.go b/pkg/client/listers/crd/v1alpha1/expansion_generated.go index fe8777f36ed..e3a50a984f1 100644 --- a/pkg/client/listers/crd/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/crd/v1alpha1/expansion_generated.go @@ -24,6 +24,10 @@ type ExternalNodeListerExpansion interface{} // ExternalNodeNamespaceLister. type ExternalNodeNamespaceListerExpansion interface{} +// NodeLatencyMonitorListerExpansion allows custom methods to be added to +// NodeLatencyMonitorLister. +type NodeLatencyMonitorListerExpansion interface{} + // SupportBundleCollectionListerExpansion allows custom methods to be added to // SupportBundleCollectionLister. type SupportBundleCollectionListerExpansion interface{} diff --git a/pkg/client/listers/crd/v1alpha1/nodelatencymonitor.go b/pkg/client/listers/crd/v1alpha1/nodelatencymonitor.go new file mode 100644 index 00000000000..214da973843 --- /dev/null +++ b/pkg/client/listers/crd/v1alpha1/nodelatencymonitor.go @@ -0,0 +1,66 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// NodeLatencyMonitorLister helps list NodeLatencyMonitors. +// All objects returned here must be treated as read-only. +type NodeLatencyMonitorLister interface { + // List lists all NodeLatencyMonitors in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.NodeLatencyMonitor, err error) + // Get retrieves the NodeLatencyMonitor from the index for a given name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.NodeLatencyMonitor, error) + NodeLatencyMonitorListerExpansion +} + +// nodeLatencyMonitorLister implements the NodeLatencyMonitorLister interface. +type nodeLatencyMonitorLister struct { + indexer cache.Indexer +} + +// NewNodeLatencyMonitorLister returns a new NodeLatencyMonitorLister. +func NewNodeLatencyMonitorLister(indexer cache.Indexer) NodeLatencyMonitorLister { + return &nodeLatencyMonitorLister{indexer: indexer} +} + +// List lists all NodeLatencyMonitors in the indexer. +func (s *nodeLatencyMonitorLister) List(selector labels.Selector) (ret []*v1alpha1.NodeLatencyMonitor, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.NodeLatencyMonitor)) + }) + return ret, err +} + +// Get retrieves the NodeLatencyMonitor from the index for a given name. +func (s *nodeLatencyMonitorLister) Get(name string) (*v1alpha1.NodeLatencyMonitor, error) { + obj, exists, err := s.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("nodelatencymonitor"), name) + } + return obj.(*v1alpha1.NodeLatencyMonitor), nil +} diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 36fb997ffdc..fe673654552 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -158,6 +158,10 @@ const ( // alpha: v1.15 // Enable layer 7 flow export on Pods and Namespaces L7FlowExporter featuregate.Feature = "L7FlowExporter" + + // alpha: v2.1 + // Enable the NodeLatencyMonitor feature. + NodeLatencyMonitor featuregate.Feature = "NodeLatencyMonitor" ) var ( @@ -199,6 +203,7 @@ var ( EgressSeparateSubnet: {Default: false, PreRelease: featuregate.Alpha}, NodeNetworkPolicy: {Default: false, PreRelease: featuregate.Alpha}, L7FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, + NodeLatencyMonitor: {Default: false, PreRelease: featuregate.Alpha}, } // AgentGates consists of all known feature gates for the Antrea Agent. @@ -229,6 +234,7 @@ var ( EgressSeparateSubnet, NodeNetworkPolicy, L7FlowExporter, + NodeLatencyMonitor, ) // ControllerGates consists of all known feature gates for the Antrea Controller. @@ -276,6 +282,7 @@ var ( EgressSeparateSubnet: {}, NodeNetworkPolicy: {}, L7FlowExporter: {}, + NodeLatencyMonitor: {}, } // supportedFeaturesOnExternalNode records the features supported on an external // Node. Antrea Agent checks the enabled features if it is running on an