Skip to content

Commit b736007

Browse files
Merge pull request rancher#35539 from ibuildthecloud/move-endpoint
Move endpoint controller to agent
2 parents c5147eb + cf6a72c commit b736007

File tree

9 files changed

+73
-104
lines changed

9 files changed

+73
-104
lines changed

pkg/controllers/managementagent/controllers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/rancher/norman/types"
99
projectclient "github.com/rancher/rancher/pkg/client/generated/project/v3"
1010
"github.com/rancher/rancher/pkg/controllers/managementagent/dnsrecord"
11+
"github.com/rancher/rancher/pkg/controllers/managementagent/endpoints"
1112
"github.com/rancher/rancher/pkg/controllers/managementagent/externalservice"
1213
"github.com/rancher/rancher/pkg/controllers/managementagent/ingress"
1314
"github.com/rancher/rancher/pkg/controllers/managementagent/ingresshostgen"
@@ -26,6 +27,7 @@ import (
2627
func Register(ctx context.Context, cluster *config.UserOnlyContext) error {
2728
dnsrecord.Register(ctx, cluster)
2829
externalservice.Register(ctx, cluster)
30+
endpoints.Register(ctx, cluster)
2931
ingress.Register(ctx, cluster)
3032
ingresshostgen.Register(ctx, cluster)
3133
nslabels.Register(ctx, cluster)

pkg/controllers/managementuserlegacy/endpoints/endpoints.go renamed to pkg/controllers/managementagent/endpoints/endpoints.go

Lines changed: 30 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,12 @@ import (
1515
v1 "github.com/rancher/rancher/pkg/generated/norman/core/v1"
1616
managementv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
1717
"github.com/rancher/rancher/pkg/ingresswrapper"
18-
"github.com/rancher/rancher/pkg/namespace"
1918
nodehelper "github.com/rancher/rancher/pkg/node"
2019
"github.com/rancher/rancher/pkg/settings"
2120
"github.com/rancher/rancher/pkg/types/config"
2221
"github.com/sirupsen/logrus"
2322
corev1 "k8s.io/api/core/v1"
24-
kextv1beta1 "k8s.io/api/extensions/v1beta1"
25-
knetworkingv1 "k8s.io/api/networking/v1"
26-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2723
"k8s.io/apimachinery/pkg/labels"
28-
"k8s.io/apimachinery/pkg/runtime"
2924
"k8s.io/apimachinery/pkg/util/sets"
3025
"k8s.io/apimachinery/pkg/util/validation"
3126
)
@@ -35,87 +30,38 @@ const (
3530
endpointsAnnotation = "field.cattle.io/publicEndpoints"
3631
)
3732

38-
func Register(ctx context.Context, workload *config.UserContext) {
39-
isRKE := false
40-
cluster, err := workload.Management.Management.Clusters("").Get(workload.ClusterName, metav1.GetOptions{})
41-
if err != nil {
42-
logrus.WithError(err).Warnf("Can not get cluster %s when registering endpoint controller", workload.ClusterName)
43-
}
44-
if cluster != nil {
45-
//assume that cluster always has a spec
46-
isRKE = cluster.Spec.RancherKubernetesEngineConfig != nil
47-
}
48-
49-
ignore := func() bool {
50-
return false
51-
}
52-
if cluster.Spec.Internal {
53-
deployments := workload.Apps.Deployments("").Controller().Lister()
54-
ignore = func() bool {
55-
_, err := deployments.Get(namespace.System, "cattle-cluster-agent")
56-
return err == nil
57-
}
58-
}
59-
33+
func Register(ctx context.Context, workload *config.UserOnlyContext) {
6034
s := &ServicesController{
6135
services: workload.Core.Services(""),
62-
workloadController: workloadUtil.NewWorkloadController(ctx, workload.UserOnlyContext(), nil),
63-
machinesLister: workload.Management.Management.Nodes(workload.ClusterName).Controller().Lister(),
36+
workloadController: workloadUtil.NewWorkloadController(ctx, workload, nil),
37+
nodesLister: workload.Core.Nodes("").Controller().Lister(),
6438
clusterName: workload.ClusterName,
6539
}
66-
workload.Core.Services("").AddHandler(ctx, "servicesEndpointsController", func(key string, obj *corev1.Service) (runtime.Object, error) {
67-
if ignore() {
68-
return obj, nil
69-
}
70-
return s.sync(key, obj)
71-
})
40+
workload.Core.Services("").AddHandler(ctx, "servicesEndpointsController", s.sync)
7241

7342
p := &PodsController{
7443
podLister: workload.Core.Pods("").Controller().Lister(),
75-
workloadController: workloadUtil.NewWorkloadController(ctx, workload.UserOnlyContext(), nil),
44+
workloadController: workloadUtil.NewWorkloadController(ctx, workload, nil),
7645
}
77-
workload.Core.Pods("").AddHandler(ctx, "hostPortEndpointsController", func(key string, obj *corev1.Pod) (runtime.Object, error) {
78-
if ignore() {
79-
return obj, nil
80-
}
81-
return p.sync(key, obj)
82-
})
46+
workload.Core.Pods("").AddHandler(ctx, "hostPortEndpointsController", p.sync)
8347

8448
w := &WorkloadEndpointsController{
85-
ingressLister: ingresswrapper.NewCompatLister(workload.Networking, workload.Extensions, workload.K8sClient),
86-
serviceLister: workload.Core.Services("").Controller().Lister(),
87-
podLister: workload.Core.Pods("").Controller().Lister(),
88-
machinesLister: workload.Management.Management.Nodes(workload.ClusterName).Controller().Lister(),
89-
nodeLister: workload.Core.Nodes("").Controller().Lister(),
90-
clusterName: workload.ClusterName,
91-
isRKE: isRKE,
92-
}
93-
w.WorkloadController = workloadUtil.NewWorkloadController(ctx, workload.UserOnlyContext(), func(key string, workload *workloadUtil.Workload) error {
94-
if ignore() {
95-
return nil
96-
}
97-
return w.UpdateEndpoints(key, workload)
98-
})
49+
ingressLister: ingresswrapper.NewCompatLister(workload.Networking, workload.Extensions, workload.K8sClient),
50+
serviceLister: workload.Core.Services("").Controller().Lister(),
51+
podLister: workload.Core.Pods("").Controller().Lister(),
52+
nodeLister: workload.Core.Nodes("").Controller().Lister(),
53+
clusterName: workload.ClusterName,
54+
}
55+
w.WorkloadController = workloadUtil.NewWorkloadController(ctx, workload, w.UpdateEndpoints)
9956

10057
i := &IngressEndpointsController{
101-
workloadController: workloadUtil.NewWorkloadController(ctx, workload.UserOnlyContext(), nil),
58+
workloadController: workloadUtil.NewWorkloadController(ctx, workload, nil),
10259
ingressInterface: ingresswrapper.NewCompatInterface(workload.Networking, workload.Extensions, workload.K8sClient),
103-
isRKE: isRKE,
10460
}
10561
if i.ingressInterface.ServerSupportsIngressV1 {
106-
workload.Networking.Ingresses("").AddHandler(ctx, "ingressEndpointsController", func(key string, obj *knetworkingv1.Ingress) (runtime.Object, error) {
107-
if ignore() {
108-
return obj, nil
109-
}
110-
return ingresswrapper.CompatSyncV1(i.sync)(key, obj)
111-
})
62+
workload.Networking.Ingresses("").AddHandler(ctx, "ingressEndpointsController", ingresswrapper.CompatSyncV1(i.sync))
11263
} else {
113-
workload.Extensions.Ingresses("").AddHandler(ctx, "ingressEndpointsController", func(key string, obj *kextv1beta1.Ingress) (runtime.Object, error) {
114-
if ignore() {
115-
return obj, nil
116-
}
117-
return ingresswrapper.CompatSyncV1Beta1(i.sync)(key, obj)
118-
})
64+
workload.Extensions.Ingresses("").AddHandler(ctx, "ingressEndpointsController", ingresswrapper.CompatSyncV1Beta1(i.sync))
11965
}
12066
}
12167

@@ -220,7 +166,7 @@ func convertServiceToPublicEndpoints(svc *corev1.Service, clusterName string, no
220166
return eps, nil
221167
}
222168

223-
func convertHostPortToEndpoint(pod *corev1.Pod, clusterName string, node *managementv3.Node) ([]v32.PublicEndpoint, error) {
169+
func convertHostPortToEndpoint(pod *corev1.Pod, clusterName string, node *v1.Node) ([]v32.PublicEndpoint, error) {
224170
var eps []v32.PublicEndpoint
225171
if pod.DeletionTimestamp != nil {
226172
return eps, nil
@@ -240,7 +186,7 @@ func convertHostPortToEndpoint(pod *corev1.Pod, clusterName string, node *manage
240186
if p.HostIP != "" {
241187
address = p.HostIP
242188
} else {
243-
address = nodehelper.GetEndpointNodeIP(node)
189+
address = nodehelper.GetEndpointV1NodeIP(node)
244190
}
245191
p := v32.PublicEndpoint{
246192
NodeName: fmt.Sprintf("%s:%s", clusterName, node.Name),
@@ -262,21 +208,14 @@ func publicEndpointToString(p v32.PublicEndpoint) string {
262208
return fmt.Sprintf("%s_%v_%v_%s_%s_%s_%s_%s_%s", p.NodeName, p.Addresses, p.Port, p.Protocol, p.ServiceName, p.PodName, p.IngressName, p.Hostname, p.Path)
263209
}
264210

265-
func getNodeNameToMachine(clusterName string, machineLister managementv3.NodeLister, nodeLister v1.NodeLister) (map[string]*managementv3.Node, error) {
266-
machines, err := machineLister.List(clusterName, labels.NewSelector())
211+
func getNodeNameToMachine(nodeLister v1.NodeLister) (map[string]*v1.Node, error) {
212+
nodes, err := nodeLister.List("", labels.NewSelector())
267213
if err != nil {
268214
return nil, err
269215
}
270-
machineMap := map[string]*managementv3.Node{}
271-
for _, machine := range machines {
272-
var node *corev1.Node
273-
node, err = nodehelper.GetNodeForMachine(machine, nodeLister)
274-
if err != nil {
275-
return nil, err
276-
}
277-
if node != nil {
278-
machineMap[node.Name] = machine
279-
}
216+
machineMap := map[string]*v1.Node{}
217+
for _, node := range nodes {
218+
machineMap[node.Name] = node
280219
}
281220
return machineMap, nil
282221
}
@@ -290,15 +229,15 @@ func isMachineReady(machine *managementv3.Node) bool {
290229
return false
291230
}
292231

293-
func getAllNodesPublicEndpointIP(machineLister managementv3.NodeLister, clusterName string) (string, error) {
232+
func getAllNodesPublicEndpointIP(nodesLister v1.NodeLister, clusterName string) (string, error) {
294233
var addresses []string
295-
machines, err := machineLister.List(clusterName, labels.NewSelector())
234+
nodes, err := nodesLister.List(clusterName, labels.NewSelector())
296235
if err != nil {
297236
return "", err
298237
}
299-
for _, machine := range machines {
300-
if machine.Spec.Worker && nodehelper.IsMachineReady(machine) {
301-
nodePublicIP := getEndpointNodeAddress(machine)
238+
for _, node := range nodes {
239+
if node.Labels["node-role.kubernetes.io/worker"] == "true" && nodehelper.IsNodeReady(node) {
240+
nodePublicIP := getEndpointNodeAddress(node)
302241
if nodePublicIP != "" {
303242
addresses = append(addresses, nodePublicIP)
304243
}
@@ -396,8 +335,8 @@ func convertIngressToPublicEndpoints(obj ingresswrapper.Ingress, isRKE bool) ([]
396335
return eps, nil
397336
}
398337

399-
func getEndpointNodeAddress(machine *managementv3.Node) string {
400-
endpointAddress := nodehelper.GetEndpointNodeIP(machine)
338+
func getEndpointNodeAddress(machine *v1.Node) string {
339+
endpointAddress := nodehelper.GetEndpointV1NodeIP(machine)
401340
if endpointAddress == "" {
402341
return ""
403342
}

pkg/controllers/managementuserlegacy/endpoints/ingress_endpoints.go renamed to pkg/controllers/managementagent/endpoints/ingress_endpoints.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ import (
66

77
workloadutil "github.com/rancher/rancher/pkg/controllers/managementagent/workload"
88
"github.com/rancher/rancher/pkg/ingresswrapper"
9+
"github.com/rancher/rancher/pkg/settings"
910
"github.com/sirupsen/logrus"
1011
"k8s.io/apimachinery/pkg/runtime"
1112
)
1213

1314
type IngressEndpointsController struct {
1415
workloadController workloadutil.CommonController
1516
ingressInterface ingresswrapper.CompatInterface
16-
isRKE bool
1717
}
1818

1919
func (c *IngressEndpointsController) sync(key string, obj ingresswrapper.Ingress) (runtime.Object, error) {
@@ -39,7 +39,7 @@ func (c *IngressEndpointsController) sync(key string, obj ingresswrapper.Ingress
3939
}
4040

4141
func (c *IngressEndpointsController) reconcileEndpointsForIngress(obj ingresswrapper.Ingress) (bool, error) {
42-
fromObj, err := convertIngressToPublicEndpoints(obj, c.isRKE)
42+
fromObj, err := convertIngressToPublicEndpoints(obj, settings.IsRKE.Get() == "true")
4343
if err != nil {
4444
return false, err
4545
}

pkg/controllers/managementuserlegacy/endpoints/service_endpoints.go renamed to pkg/controllers/managementagent/endpoints/service_endpoints.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package endpoints
33
import (
44
workloadutil "github.com/rancher/rancher/pkg/controllers/managementagent/workload"
55
v1 "github.com/rancher/rancher/pkg/generated/norman/core/v1"
6-
v3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
76
"github.com/sirupsen/logrus"
87
corev1 "k8s.io/api/core/v1"
98
"k8s.io/apimachinery/pkg/runtime"
@@ -15,7 +14,7 @@ import (
1514
type ServicesController struct {
1615
services v1.ServiceInterface
1716
workloadController workloadutil.CommonController
18-
machinesLister v3.NodeLister
17+
nodesLister v1.NodeLister
1918
clusterName string
2019
}
2120

@@ -41,7 +40,7 @@ func (s *ServicesController) sync(key string, obj *corev1.Service) (runtime.Obje
4140

4241
func (s *ServicesController) reconcileEndpointsForService(svc *corev1.Service) (bool, error) {
4342
// 1. update service with endpoints
44-
allNodesIP, err := getAllNodesPublicEndpointIP(s.machinesLister, s.clusterName)
43+
allNodesIP, err := getAllNodesPublicEndpointIP(s.nodesLister, s.clusterName)
4544
if err != nil {
4645
return false, err
4746
}

pkg/controllers/managementuserlegacy/endpoints/workload_endpoints.go renamed to pkg/controllers/managementagent/endpoints/workload_endpoints.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import (
66
"strings"
77

88
v32 "github.com/rancher/rancher/pkg/apis/project.cattle.io/v3"
9+
"github.com/rancher/rancher/pkg/settings"
910

1011
workloadutil "github.com/rancher/rancher/pkg/controllers/managementagent/workload"
1112
v1 "github.com/rancher/rancher/pkg/generated/norman/core/v1"
12-
managementv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
1313
"github.com/rancher/rancher/pkg/ingresswrapper"
1414
"github.com/sirupsen/logrus"
1515
corev1 "k8s.io/api/core/v1"
@@ -25,10 +25,8 @@ type WorkloadEndpointsController struct {
2525
serviceLister v1.ServiceLister
2626
podLister v1.PodLister
2727
WorkloadController workloadutil.CommonController
28-
machinesLister managementv3.NodeLister
2928
nodeLister v1.NodeLister
3029
clusterName string
31-
isRKE bool
3230
serverSupportsIngressV1 bool
3331
}
3432

@@ -77,18 +75,18 @@ func (c *WorkloadEndpointsController) UpdateEndpoints(key string, obj *workloadu
7775
return err
7876
}
7977

80-
nodeNameToMachine, err := getNodeNameToMachine(c.clusterName, c.machinesLister, c.nodeLister)
78+
nodeNameToMachine, err := getNodeNameToMachine(c.nodeLister)
8179
if err != nil {
8280
return err
8381
}
84-
allNodesIP, err := getAllNodesPublicEndpointIP(c.machinesLister, c.clusterName)
82+
allNodesIP, err := getAllNodesPublicEndpointIP(c.nodeLister, c.clusterName)
8583
if err != nil {
8684
return err
8785
}
8886
// get ingress endpoint group by service
8987
serviceToIngressEndpoints := make(map[string][]v32.PublicEndpoint)
9088
for _, ingress := range ingresses {
91-
epsMap, err := convertIngressToServicePublicEndpointsMap(ingress, c.isRKE)
89+
epsMap, err := convertIngressToServicePublicEndpointsMap(ingress, settings.IsRKE.Get() == "true")
9290
if err != nil {
9391
return err
9492
}

pkg/controllers/managementuserlegacy/controllers.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/rancher/rancher/pkg/controllers/managementuserlegacy/alert"
88
"github.com/rancher/rancher/pkg/controllers/managementuserlegacy/approuter"
99
"github.com/rancher/rancher/pkg/controllers/managementuserlegacy/cis"
10-
"github.com/rancher/rancher/pkg/controllers/managementuserlegacy/endpoints"
1110
"github.com/rancher/rancher/pkg/controllers/managementuserlegacy/globaldns"
1211
"github.com/rancher/rancher/pkg/controllers/managementuserlegacy/helm"
1312
"github.com/rancher/rancher/pkg/controllers/managementuserlegacy/istio"
@@ -25,7 +24,6 @@ func Register(ctx context.Context, cluster *config.UserContext, clusterRec *mana
2524
cis.Register(ctx, cluster)
2625
pipeline.Register(ctx, cluster)
2726
systemimage.Register(ctx, cluster)
28-
endpoints.Register(ctx, cluster)
2927
approuter.Register(ctx, cluster)
3028
alert.Register(ctx, cluster)
3129
globaldns.Register(ctx, cluster)

pkg/node/node.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,29 @@ func GetNodeInternalAddress(node *corev1.Node) string {
8383
return ""
8484
}
8585

86+
func GetEndpointV1NodeIP(node *v1.Node) string {
87+
externalIP := ""
88+
internalIP := ""
89+
for _, ip := range node.Status.Addresses {
90+
if ip.Type == "ExternalIP" && ip.Address != "" {
91+
externalIP = ip.Address
92+
break
93+
} else if ip.Type == "InternalIP" && ip.Address != "" {
94+
internalIP = ip.Address
95+
}
96+
}
97+
if externalIP != "" {
98+
return externalIP
99+
}
100+
if node.Annotations != nil {
101+
externalIP = node.Annotations[externalAddressAnnotation]
102+
if externalIP != "" {
103+
return externalIP
104+
}
105+
}
106+
return internalIP
107+
}
108+
86109
func GetEndpointNodeIP(node *v3.Node) string {
87110
externalIP := ""
88111
internalIP := ""
@@ -170,6 +193,15 @@ func GetMachineForNode(node *corev1.Node, clusterNamespace string, machineLister
170193
return nil, nil
171194
}
172195

196+
func IsNodeReady(node *v1.Node) bool {
197+
for _, cond := range node.Status.Conditions {
198+
if cond.Type == corev1.NodeReady {
199+
return cond.Status == corev1.ConditionTrue
200+
}
201+
}
202+
return false
203+
}
204+
173205
func IsMachineReady(machine *v3.Node) bool {
174206
for _, cond := range machine.Status.InternalNodeStatus.Conditions {
175207
if cond.Type == corev1.NodeReady {

pkg/settings/setting.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ var (
4747
InstallUUID = NewSetting("install-uuid", "")
4848
InternalServerURL = NewSetting("internal-server-url", "")
4949
InternalCACerts = NewSetting("internal-cacerts", "")
50+
IsRKE = NewSetting("is-rke", "")
5051
JailerTimeout = NewSetting("jailer-timeout", "60")
5152
KubeconfigGenerateToken = NewSetting("kubeconfig-generate-token", "true")
5253
KubeconfigTokenTTLMinutes = NewSetting("kubeconfig-token-ttl-minutes", "960") // 16 hours

0 commit comments

Comments
 (0)