Skip to content

Commit

Permalink
add externalTrafficPolicy feature support for service;
Browse files Browse the repository at this point in the history
add backend label support for service;
add endpoint controller and cluster node controller;
Signed-off-by: gangqiangwang <gangqiangwang@yunify.com>;
  • Loading branch information
wanggangqiang authored and wanggangqiang committed Aug 19, 2022
1 parent b6a663f commit 29e947b
Show file tree
Hide file tree
Showing 11 changed files with 619 additions and 18 deletions.
14 changes: 12 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cloud-provider"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/app"
"k8s.io/cloud-provider/app/config"
"k8s.io/cloud-provider/options"
Expand All @@ -36,8 +36,11 @@ import (
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/klog/v2"

// For existing cloud providers, the option to import legacy providers is still available.
// e.g. _"k8s.io/legacy-cloud-providers/<provider>"
"github.com/yunify/qingcloud-cloud-controller-manager/pkg/controllers/clusternode"
"github.com/yunify/qingcloud-cloud-controller-manager/pkg/controllers/endpoint"
_ "github.com/yunify/qingcloud-cloud-controller-manager/pkg/qingcloud"
)

Expand All @@ -50,7 +53,7 @@ func main() {
}

fss := cliflag.NamedFlagSets{}
command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, app.DefaultInitFuncConstructors, fss, wait.NeverStop)
command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers(), fss, wait.NeverStop)

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
Expand Down Expand Up @@ -87,3 +90,10 @@ func cloudInitializer(config *config.CompletedConfig) cloudprovider.Interface {
}
return cloud
}

func controllerInitializers() map[string]app.InitFuncConstructor {
controllerInitializers := app.DefaultInitFuncConstructors
controllerInitializers["endpoint"] = endpoint.StartEndpointControllerWrapper
controllerInitializers["clusternode"] = clusternode.StartClusterNodeControllerWrapper
return controllerInitializers
}
3 changes: 2 additions & 1 deletion deploy/kube-cloud-controller-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ rules:
resources:
- services
verbs:
- get
- list
- patch
- update
Expand Down Expand Up @@ -80,7 +81,7 @@ rules:
- apiGroups:
- ""
resources:
- endpoints
- pods
verbs:
- create
- get
Expand Down
16 changes: 14 additions & 2 deletions docs/configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ spec:
targetPort: 80
```

## 四、配置LB的监听器属性
## 四、配置LB的监听器

### 如何配置
### 配置LB监听器的属性
1. 设置监听器的健康检查方式,`service.beta.kubernetes.io/qingcloud-lb-listener-healthycheckmethod`,对于 tcp 协议默认是 tcp 方式,对于 udp 协议默认是 udp 方式
2. 设置监听器的健康检查参数,`service.beta.kubernetes.io/qingcloud-lb-listener-healthycheckoption`,默认是 "10|5|2|5"
3. 支持 roundrobin/leastconn/source 三种负载均衡方式,`service.beta.kubernetes.io/qingcloud-lb-listener-balancemode`,默认是 roundrobin
Expand All @@ -122,6 +122,17 @@ spec:

因为一个LB会有多个监听器,所以进行service注解设置时,通过如下格式区分不同监听器:`80:xxx,443:xxx`。

### 配置LB监听器的backend

根据 `service` 的 `externalTrafficPolicy` 字段的取值,决定LB监听器backend的添加策略
- `Local`: 只会添加提供服务pod所在的节点为LB监听器的backend;
- `Cluster`: 如果`service`中不显式指定 `externalTrafficPolicy` 字段的值,则默认为`Cluster`;这种模式下,可以通过给服务添加相关注解来指定LB监听器backend的添加规则;


`Cluster`模式下,目前支持的 `service` 注解有:
- 使用指定Label的Worker节点作为后端服务器, `service.beta.kubernetes.io/qingcloud-lb-backend-label`,可以指定多个Label,多个Label以逗号分隔。例如:`key1=value1,key2=value2`,多个Label之间是And关系


### 参考Service
```yaml
kind: Service
Expand All @@ -136,6 +147,7 @@ metadata:
service.beta.kubernetes.io/qingcloud-lb-listener-balancemode: "8090:source"
service.beta.kubernetes.io/qingcloud-lb-listener-protocol: "8090:https"
service.beta.kubernetes.io/qingcloud-lb-listener-cert: "8090:sc-77oko7zj"
service.beta.kubernetes.io/qingcloud-lb-backend-label: "gpu=true"
spec:
selector:
app: mylbapp
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
k8s.io/client-go v0.21.1
k8s.io/cloud-provider v0.21.1
k8s.io/component-base v0.21.1
k8s.io/controller-manager v0.21.1
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.9.0
)
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type LoadBalancerListenerSpec struct {
ListenerPort *int `json:"listener_port" name:"listener_port"`
ListenerProtocol *string `json:"listener_protocol" name:"listener_protocol"`
LoadBalancerListenerName *string `json:"loadbalancer_listener_name" name:"loadbalancer_listener_name"`
LoadBalancerListenerID *string `json:"loadbalancer_listener_id" name:"loadbalancer_listener_id"`
LoadBalancerID *string `json:"loadbalancer_id" name:"loadbalancer_id"`
HealthyCheckMethod *string `json:"healthy_check_method" name:"healthy_check_method"`
HealthyCheckOption *string `json:"healthy_check_option" name:"healthy_check_option"`
Expand Down
212 changes: 212 additions & 0 deletions pkg/controllers/clusternode/cluster_node_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package clusternode

import (
"context"
"fmt"
"net/http"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapp "k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/klog"

"github.com/yunify/qingcloud-cloud-controller-manager/pkg/qingcloud"
)

const (
clusterNodeSyncPeriod = 30 * time.Second
clusterNodeWorkers = 10
)

type ClusterNodeController struct {
cloud cloudprovider.Interface

// svc
serviceLister corelisters.ServiceLister
serviceListerSynced cache.InformerSynced

// clusternode
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
nodeQueue workqueue.RateLimitingInterface
}

func StartClusterNodeControllerWrapper(completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) cloudproviderapp.InitFunc {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return startClusterNodeController(completedConfig, cloud, ctx.Stop)
}
}

func startClusterNodeController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
// Start the endpoint controller
clusterNodeController, err := New(
cloud,
ctx.ClientBuilder.ClientOrDie("clusternode-controller"),
ctx.SharedInformers.Core().V1().Services(),
ctx.SharedInformers.Core().V1().Nodes(),
)
if err != nil {
// This error shouldn't fail. It lives like this as a legacy.
klog.Errorf("Failed to start endpoint controller: %v", err)
return nil, false, nil
}

go clusterNodeController.Run(stopCh, clusterNodeWorkers)

return nil, true, nil
}

// producer
func New(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
) (*ClusterNodeController, error) {

cnc := &ClusterNodeController{
cloud: cloud,
serviceLister: serviceInformer.Lister(),
serviceListerSynced: serviceInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster-node"),
}

nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
oldNode, ok1 := old.(*corev1.Node)
curNode, ok2 := cur.(*corev1.Node)
if ok1 && ok2 && cnc.needsUpdate(oldNode, curNode) {
cnc.enqueueNode(cur)
}
},
},
)

return cnc, nil
}

//check if node label changed
func (cnc *ClusterNodeController) needsUpdate(old, new *corev1.Node) bool {

if len(old.Labels) != len(new.Labels) {
return true
}

for newLabelKey, newLabelValue := range new.Labels {
oldLabelValuk, ok := old.Labels[newLabelKey]
if !ok || newLabelValue != oldLabelValuk {
return true
}
}

return false
}

func (cnc *ClusterNodeController) enqueueNode(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return
}
cnc.nodeQueue.Add(key)
}

// consumer
func (cnc *ClusterNodeController) Run(stopCh <-chan struct{}, workers int) {
defer runtime.HandleCrash()
defer cnc.nodeQueue.ShutDown()

klog.Info("Starting cluster node controller")
defer klog.Info("Shutting down cluster node controller")

if !cache.WaitForCacheSync(stopCh, cnc.serviceListerSynced, cnc.nodeListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(cnc.worker, time.Second, stopCh)
}

<-stopCh
}

func (cnc *ClusterNodeController) worker() {
for cnc.processNextWorkItem() {
}
}

func (cnc *ClusterNodeController) processNextWorkItem() bool {
key, quit := cnc.nodeQueue.Get()
if quit {
return false
}
defer cnc.nodeQueue.Done(key)

err := cnc.handleNodesUpdate(key.(string))
if err == nil {
cnc.nodeQueue.Forget(key)
return true
}

runtime.HandleError(fmt.Errorf("error processing cluster node %v (will retry): %v", key, err))
cnc.nodeQueue.AddRateLimited(key)

return true
}

// handleNodesUpdate handle service backend according to node lables
func (cnc *ClusterNodeController) handleNodesUpdate(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished handleNodesUpdate %q (%v)", key, time.Since(startTime))
}()

// 1. get node list
var nodes []*corev1.Node
nodeList, err := cnc.nodeLister.List(labels.NewSelector())
if err != nil {
return fmt.Errorf("get node list error: %v", err)
}
for i, _ := range nodeList {
nodes = append(nodes, nodeList[i])
}

// 2. list all service
svcs, err := cnc.serviceLister.List(labels.NewSelector())
if err != nil {
return fmt.Errorf("list service error: %v", err)
}

// 3. filter service which externalTrafficPolicy=cluster and has annotation service.beta.kubernetes.io/qingcloud-lb-backend-label
for _, svc := range svcs {
_, ok := svc.Annotations[qingcloud.ServiceAnnotationBackendLabel]
if ok && svc.Spec.Type == corev1.ServiceTypeLoadBalancer &&
svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeCluster {
klog.Infof("service %s serviceType = %s, externalTrafficPolicy = %s, also has backend label annotation , going to update loadbalancer", svc.Name, svc.Spec.Type, svc.Spec.ExternalTrafficPolicy)

// 4. update lb
lbInterface, _ := cnc.cloud.LoadBalancer()
err = lbInterface.UpdateLoadBalancer(context.TODO(), "", svc, nodes)
if err != nil {
return fmt.Errorf("update loadbalancer for service %s/%s error: %v", svc.Namespace, svc.Name, err)
}
klog.Infof("update loadbalancer for service %s/%s success", svc.Namespace, svc.Name)
}
}

return nil
}
Loading

0 comments on commit 29e947b

Please sign in to comment.