Skip to content

Commit

Permalink
support config lb backend nodes count for service;
Browse files Browse the repository at this point in the history
Signed-off-by: gangqiangwang <gangqiangwang@yunify.com>;
  • Loading branch information
qiangzii committed Nov 8, 2022
1 parent cf71afa commit c41b6ab
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 35 deletions.
31 changes: 30 additions & 1 deletion docs/configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,13 @@ spec:
- `Cluster`: 如果`service`中不显式指定 `externalTrafficPolicy` 字段的值,则默认为`Cluster`;这种模式下,可以通过给服务添加相关注解来指定LB监听器backend的添加规则

`Cluster`模式下,目前支持的 `service` 注解有:
- 使用指定Label的Worker节点作为后端服务器, `service.beta.kubernetes.io/qingcloud-lb-backend-label`,可以指定多个Label,多个Label以逗号分隔。例如:`key1=value1,key2=value2`,多个Label之间是And关系。同时,在需要成为后端服务器的Worker节点打上`key1=value1,key2=value2`的Label;只有服务指定的所有Label的key和value都和Worker节点匹配时,Worker节点会被选为服务的后端服务器;没有此注解则添加所有Worker节点为backend;通过注解过滤节点后,如果没有满足条件的节点,为了避免服务中断,会添加所有Worker节点为后端服务器;
> 以下各种过滤节点的方式不能结合使用,如果指定了多个,则会按照以下注解的说明顺序选用第一个匹配到的注解,并按照该方法过滤节点;
- 使用指定Label的Worker节点作为后端服务器, `service.beta.kubernetes.io/qingcloud-lb-backend-label`,可以指定多个Label,多个Label以逗号分隔。例如:`key1=value1,key2=value2`,多个Label之间是And关系。同时,在需要成为后端服务器的Worker节点打上`key1=value1,key2=value2`的Label;只有服务指定的所有Label的key和value都和Worker节点匹配时,Worker节点会被选为服务的后端服务器;特殊情况说明:
- 没有此注解则添加所有Worker节点为backend;
- 通过注解过滤节点后,如果没有满足条件的节点,为了避免服务中断,会添加所有Worker节点为后端服务器;
- 使用指定数量的Worker节点作为后端服务器,`service.beta.kubernetes.io/qingcloud-lb-backend-count`,通过此注解指定该服务使用的lb backend节点数量;如果服务添加了该注解,就认为用户想使用该特性;如果指定的数量为0或者不在可用节点数量的范围内,则使用默认值:集群所有节点的1/3;如果集群中节点状态发生变化,但是当前服务的lb后端数量就是用户指定的数量,并且已添加的所有lb后端节点在集群中都是ready状态的,则不会更新lb的backend;默认值特殊情况说明:
- 如果集群总worker节点数少于3个,则添加所有worker节点为backend,不再按照比例计算节点数;
- 如果集群总worker节点数多于3个,若按照比例计算后少于3个,则设置为3个;

> 本章节所说的"所有Worker节点"特指所有 `Ready` 状态的Worker节点;

Expand Down Expand Up @@ -184,6 +190,7 @@ spec:
```

#### Cluster模式
##### 使用指定Label的Worker节点作为后端服务器
将服务的`externalTrafficPolicy`指定为`Cluster`,并在服务的注解`service.beta.kubernetes.io/qingcloud-lb-backend-label`中指定要添加为backend的worker节点的label:

```yaml
Expand Down Expand Up @@ -232,6 +239,28 @@ status:
...
```

##### 使用指定数量的Worker节点作为后端服务器

```yaml
kind: Service
apiVersion: v1
metadata:
name: reuse-lb
annotations:
service.beta.kubernetes.io/qingcloud-load-balancer-eip-strategy: "reuse-lb"
service.beta.kubernetes.io/qingcloud-load-balancer-id: "lb-oglqftju"
service.beta.kubernetes.io/qingcloud-lb-backend-count: "3"
spec:
externalTrafficPolicy: Cluster
selector:
app: mylbapp
type: LoadBalancer
ports:
- name: http
port: 8090
protocol: TCP
targetPort: 80
```

## 配置内网负载均衡器
### 已知问题
Expand Down
15 changes: 14 additions & 1 deletion pkg/qingcloud/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ const (
ServiceAnnotationListenerServerCertificate = "service.beta.kubernetes.io/qingcloud-lb-listener-cert"
// port:protocol, such as "443:https,80:http"
ServiceAnnotationListenerProtocol = "service.beta.kubernetes.io/qingcloud-lb-listener-protocol"
// port:timeout, such as "443:50", the value must in range 10 ~ 86400
ServiceAnnotationListenerTimeout = "service.beta.kubernetes.io/qingcloud-lb-listener-timeout"

// 5. Configure backend
// backend label, such as "key1=value1,key2=value2"
ServiceAnnotationBackendLabel = "service.beta.kubernetes.io/qingcloud-lb-backend-label"
// backend count limit, if value is 0 or greater than cluster ready worker, will use default value : 1/3 of cluster ready worker
ServiceAnnotationBackendCount = "service.beta.kubernetes.io/qingcloud-lb-backend-count"
)

type LoadBalancerConfig struct {
Expand All @@ -105,7 +109,9 @@ type LoadBalancerConfig struct {
Protocol *string

//backend
BackendLabel string
BackendLabel string
BackendCountConfig string
BackendCountResult int

//It's just for defining names, nothing more.
NetworkType string
Expand Down Expand Up @@ -171,6 +177,13 @@ func (qc *QingCloud) ParseServiceLBConfig(cluster string, service *v1.Service) (
if backendLabel, ok := annotation[ServiceAnnotationBackendLabel]; ok {
config.BackendLabel = backendLabel
}
if backendCount, ok := annotation[ServiceAnnotationBackendCount]; ok {
_, err := strconv.Atoi(backendCount)
if err != nil {
return nil, fmt.Errorf("please spec a valid value of loadBalancer backend count")
}
config.BackendCountConfig = backendCount
}

networkType := annotation[ServiceAnnotationLoadBalancerNetworkType]
if config.VxNetID == nil && qc.Config.DefaultVxNetForLB != "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/qingcloud/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func TestDiffListeners(t *testing.T) {
}

for _, tc := range testCases {
toDelete, toAdd := diffListeners(tc.listeners, tc.conf, tc.ports)
toDelete, toAdd, _ := diffListeners(tc.listeners, tc.conf, tc.ports)
// fmt.Printf("delete=%s, add=%s", spew.Sdump(toDelete), spew.Sdump(toAdd))
if !reflect.DeepEqual(toDelete, tc.toDelete) || !reflect.DeepEqual(toAdd, tc.toAdd) {
t.Fail()
Expand Down
21 changes: 20 additions & 1 deletion pkg/qingcloud/loadbalancer_utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package qingcloud

import (
"crypto/rand"
"fmt"
"math/big"
"strconv"
"strings"

Expand Down Expand Up @@ -152,7 +154,7 @@ func getProtocol(annotationConf map[int]string, port int) *string {
}
}

func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerConfig, ports []v1.ServicePort) (toDelete []*string, toAdd []v1.ServicePort) {
func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerConfig, ports []v1.ServicePort) (toDelete []*string, toAdd []v1.ServicePort, toKeep []*apis.LoadBalancerListener) {
svcNodePort := make(map[string]int)
for _, listener := range listeners {
if len(listener.Status.LoadBalancerBackends) > 0 {
Expand Down Expand Up @@ -197,6 +199,8 @@ func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerCon
}
if delete {
toDelete = append(toDelete, listener.Status.LoadBalancerListenerID)
} else {
toKeep = append(toKeep, listener)
}
}

Expand Down Expand Up @@ -521,3 +525,18 @@ func equalProtocol(listener *apis.LoadBalancerListener, conf *LoadBalancerConfig
}
return false
}

func getRandomNodes(nodes []*v1.Node, count int) (result []*v1.Node) {
resultMap := make(map[int64]bool)
length := int64(len(nodes))

for i := 0; i < count; {
r, _ := rand.Int(rand.Reader, big.NewInt(length))
if !resultMap[r.Int64()] {
result = append(result, nodes[r.Int64()])
resultMap[r.Int64()] = true
i++
}
}
return
}
99 changes: 68 additions & 31 deletions pkg/qingcloud/qingcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"fmt"
"io"
"strconv"

"github.com/davecgh/go-spew/spew"
yaml "gopkg.in/yaml.v2"
Expand All @@ -27,8 +28,9 @@ import (
)

const (
ProviderName = "qingcloud"
QYConfigPath = "/etc/qingcloud/config.yaml"
ProviderName = "qingcloud"
QYConfigPath = "/etc/qingcloud/config.yaml"
DefaultBackendCount = 3
)

type Config struct {
Expand Down Expand Up @@ -232,7 +234,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *
klog.Infof("The loadbalancer %s has the following listeners %s", *lb.Status.LoadBalancerID, spew.Sdump(listenerIDs))
if len(listenerIDs) <= 0 {
klog.Infof("creating listeners for loadbalancers %s, service ports %s", *lb.Status.LoadBalancerID, spew.Sdump(service.Spec.Ports))
if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes); err != nil {
if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes, service); err != nil {
klog.Errorf("createListenersAndBackends for loadbalancer %s error: %v", *lb.Status.LoadBalancerID, err)
return nil, err
}
Expand All @@ -244,7 +246,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *
}

//update listerner
toDelete, toAdd := diffListeners(listeners, conf, service.Spec.Ports)
toDelete, toAdd, toKeep := diffListeners(listeners, conf, service.Spec.Ports)
if len(toDelete) > 0 {
klog.Infof("listeners %s will be deleted for lb %s", spew.Sdump(toDelete), *lb.Status.LoadBalancerID)
err = qc.Client.DeleteListener(toDelete)
Expand All @@ -256,38 +258,37 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *

if len(toAdd) > 0 {
klog.Infof("listeners %s will be added for lb %s", spew.Sdump(toAdd), *lb.Status.LoadBalancerID)
err = qc.createListenersAndBackends(conf, lb, toAdd, nodes)
err = qc.createListenersAndBackends(conf, lb, toAdd, nodes, service)
if err != nil {
return nil, err
}
modify = true
}

//update backend; for example, service annotation for backend label changed
if len(toAdd) == 0 && len(toDelete) == 0 {
for _, listener := range listeners {
toDelete, toAdd := diffBackend(listener, nodes)

if len(toDelete) > 0 {
klog.Infof("backends %s will be deleted for listener %s(%s) of lb %s",
spew.Sdump(toDelete), *listener.Spec.LoadBalancerListenerName, *listener.Spec.LoadBalancerListenerID, *lb.Status.LoadBalancerID)
err = qc.Client.DeleteBackends(toDelete)
if err != nil {
return nil, err
}
modify = true
for _, listener := range toKeep {
// toDelete, toAdd := diffBackend(listener, nodes)
toDelete, toAdd := qc.diffBackend(listener, nodes, conf, service)

if len(toDelete) > 0 {
klog.Infof("backends %s will be deleted for listener %s(%s) of lb %s",
spew.Sdump(toDelete), *listener.Spec.LoadBalancerListenerName, *listener.Spec.LoadBalancerListenerID, *lb.Status.LoadBalancerID)
err = qc.Client.DeleteBackends(toDelete)
if err != nil {
return nil, err
}
modify = true
}

toAddBackends := generateLoadBalancerBackends(toAdd, listener, service.Spec.Ports)
if len(toAddBackends) > 0 {
klog.Infof("backends %s will be added for listener %s(%s) of lb %s",
spew.Sdump(toAddBackends), *listener.Spec.LoadBalancerListenerName, *listener.Spec.LoadBalancerListenerID, *lb.Status.LoadBalancerID)
_, err = qc.Client.CreateBackends(toAddBackends)
if err != nil {
return nil, err
}
modify = true
toAddBackends := generateLoadBalancerBackends(toAdd, listener, service.Spec.Ports)
if len(toAddBackends) > 0 {
klog.Infof("backends %s will be added for listener %s(%s) of lb %s",
spew.Sdump(toAddBackends), *listener.Spec.LoadBalancerListenerName, *listener.Spec.LoadBalancerListenerID, *lb.Status.LoadBalancerID)
_, err = qc.Client.CreateBackends(toAddBackends)
if err != nil {
return nil, err
}
modify = true
}
}

Expand Down Expand Up @@ -316,6 +317,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *
//1.2 prepare sg
//default sg set by Client auto
//1.3 create lb
klog.Infof("creating loadbalance for service %s/%s", service.Namespace, service.Name)
lb, err = qc.Client.CreateLB(&apis.LoadBalancer{
Spec: apis.LoadBalancerSpec{
LoadBalancerName: &conf.LoadBalancerName,
Expand All @@ -331,7 +333,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *
}

//create listener
if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes); err != nil {
if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes, service); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -382,7 +384,8 @@ func (qc *QingCloud) UpdateLoadBalancer(ctx context.Context, _ string, service *
}

for _, listener := range listeners {
toDelete, toAdd := diffBackend(listener, nodes)
// toDelete, toAdd := diffBackend(listener, nodes)
toDelete, toAdd := qc.diffBackend(listener, nodes, conf, service)

if len(toDelete) > 0 {
klog.Infof("backends %s will be deleted for listener %s(%s) of lb %s",
Expand Down Expand Up @@ -414,7 +417,7 @@ func (qc *QingCloud) UpdateLoadBalancer(ctx context.Context, _ string, service *
return qc.Client.UpdateLB(lb.Status.LoadBalancerID)
}

func (qc *QingCloud) createListenersAndBackends(conf *LoadBalancerConfig, status *apis.LoadBalancer, ports []v1.ServicePort, nodes []*v1.Node) error {
func (qc *QingCloud) createListenersAndBackends(conf *LoadBalancerConfig, status *apis.LoadBalancer, ports []v1.ServicePort, nodes []*v1.Node, svc *v1.Service) error {
listeners, err := generateLoadBalancerListeners(conf, status, ports)
if err != nil {
klog.Errorf("generateLoadBalancerListeners for loadbalancer %s error: %v", *status.Status.LoadBalancerID, err)
Expand All @@ -426,7 +429,19 @@ func (qc *QingCloud) createListenersAndBackends(conf *LoadBalancerConfig, status
return err
}

//create backend
// filter backend nodes by count
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeCluster && conf.BackendCountConfig != "" {
klog.Infof("service %s/%s has lb backend count annotation, try to get %d random nodes as backend", svc.Namespace, svc.Name, conf.BackendCountResult)
nodes = getRandomNodes(nodes, conf.BackendCountResult)

var resultNames []string
for _, node := range nodes {
resultNames = append(resultNames, node.Name)
}
klog.Infof("get random nodes result for service %s/%s: %v", svc.Namespace, svc.Name, resultNames)
}

// create backend
for _, listener := range listeners {
backends := generateLoadBalancerBackends(nodes, listener, ports)
_, err = qc.Client.CreateBackends(backends)
Expand Down Expand Up @@ -515,7 +530,7 @@ func (qc *QingCloud) filterNodes(ctx context.Context, svc *v1.Service, nodes []*
}
}
} else {
if lbconfog.BackendLabel != "" {
if lbconfog.BackendLabel != "" { // filter by node label
klog.Infof("filter nodes for service %s/%s by backend label: %s", svc.Namespace, svc.Name, lbconfog.BackendLabel)

// filter by label
Expand Down Expand Up @@ -543,6 +558,28 @@ func (qc *QingCloud) filterNodes(ctx context.Context, svc *v1.Service, nodes []*
klog.Infof("there are no available nodes for service %s/%s, use all nodes!", svc.Namespace, svc.Name)
newNodes = nodes
}
// clear lb backend count config
lbconfog.BackendCountConfig = ""
} else if lbconfog.BackendCountConfig != "" { //filter by backend count config
var backendCountResult int

backendCountConfig, _ := strconv.Atoi(lbconfog.BackendCountConfig)
if backendCountConfig > 0 && backendCountConfig <= len(nodes) {
backendCountResult = backendCountConfig
} else {
//invalid count config, use default value (1/3 of all nodes)
if len(nodes) <= 3 {
backendCountResult = len(nodes)
} else {
backendCountResult = len(nodes) / 3
if backendCountResult < 3 {
backendCountResult = DefaultBackendCount
}
}
}

lbconfog.BackendCountResult = backendCountResult
newNodes = nodes
} else {
// no need to filter
newNodes = nodes
Expand Down
39 changes: 39 additions & 0 deletions pkg/qingcloud/qingcloud_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"fmt"

"github.com/davecgh/go-spew/spew"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"github.com/yunify/qingcloud-cloud-controller-manager/pkg/apis"
"github.com/yunify/qingcloud-cloud-controller-manager/pkg/util"
)

func (qc *QingCloud) prepareEip(eipSource *string) (eip *apis.EIP, err error) {
Expand Down Expand Up @@ -143,3 +145,40 @@ func (qc *QingCloud) updateLBEip(config *LoadBalancerConfig, lb *apis.LoadBalanc

return nil
}

func (qc *QingCloud) diffBackend(listener *apis.LoadBalancerListener, nodes []*v1.Node, conf *LoadBalancerConfig, svc *v1.Service) (toDelete []*string, toAdd []*v1.Node) {
var backendLeftID []*string
for _, backend := range listener.Status.LoadBalancerBackends {
if !nodesHasBackend(*backend.Spec.LoadBalancerBackendName, nodes) {
toDelete = append(toDelete, backend.Status.LoadBalancerBackendID)
} else {
backendLeftID = append(backendLeftID, backend.Status.LoadBalancerBackendID)
}
}

// filter backend nodes by count
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeCluster && conf.BackendCountConfig != "" {
backendLeftCount := len(listener.Status.LoadBalancerBackends) - len(toDelete)
if backendLeftCount > conf.BackendCountResult {
// delete some
toDelete = append(toDelete, util.GetRandomItems(backendLeftID, backendLeftCount-conf.BackendCountResult)...)
} else {
// add some
var nodeLeft []*v1.Node
for _, node := range nodes {
if !backendsHasNode(node, listener.Status.LoadBalancerBackends) {
nodeLeft = append(nodeLeft, node)
}
}
toAdd = append(toAdd, getRandomNodes(nodeLeft, conf.BackendCountResult-backendLeftCount)...)
}
} else {
for _, node := range nodes {
if !backendsHasNode(node, listener.Status.LoadBalancerBackends) {
toAdd = append(toAdd, node)
}
}
}

return
}
Loading

0 comments on commit c41b6ab

Please sign in to comment.