Skip to content

Commit

Permalink
Merge pull request #130 from qiangzii/master
Browse files Browse the repository at this point in the history
support config lb listener timeout;
  • Loading branch information
liangcd authored Nov 30, 2022
2 parents f7bd1a7 + 869ba21 commit eeec280
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 19 deletions.
2 changes: 2 additions & 0 deletions docs/configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ spec:
3. 支持 roundrobin/leastconn/source 三种负载均衡方式,`service.beta.kubernetes.io/qingcloud-lb-listener-balancemode`,默认是 roundrobin
4. 支持 http/https 协议的配置,`service.beta.kubernetes.io/qingcloud-lb-listener-protocol`,没有此注解则默认使用 Service 所用协议
5. 支持 https 协议证书的配置,`service.beta.kubernetes.io/qingcloud-lb-listener-cert`,如果配置的 https 协议,则必须配置证书
6. 支持监听器的超时时间,` service.beta.kubernetes.io/qingcloud-lb-listener-timeout`,不配置默认是50,可选范围为(10 ~ 86400),单位为s

因为一个LB会有多个监听器,所以进行service注解设置时,通过如下格式区分不同监听器:`80:xxx,443:xxx`。

Expand All @@ -136,6 +137,7 @@ metadata:
service.beta.kubernetes.io/qingcloud-lb-listener-balancemode: "8090:source"
service.beta.kubernetes.io/qingcloud-lb-listener-protocol: "8090:https"
service.beta.kubernetes.io/qingcloud-lb-listener-cert: "8090:sc-77oko7zj"
service.beta.kubernetes.io/qingcloud-lb-listener-timeout: "8080:10"
spec:
selector:
app: mylbapp
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type LoadBalancerListenerSpec struct {
HealthyCheckOption *string `json:"healthy_check_option" name:"healthy_check_option"`
BalanceMode *string `json:"balance_mode" name:"balance_mode"`
ServerCertificateID []*string `json:"server_certificate_id" name:"server_certificate_id"`
Timeout *int `json:"timeout" name:"timeout"`
}

type LoadBalancerListenerStatus struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/clusternode/cluster_node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func New(
return cnc, nil
}

//check if node label changed
// check if node label changed
func (cnc *ClusterNodeController) needsUpdate(old, new *corev1.Node) bool {

if len(old.Labels) != len(new.Labels) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/endpoint/endpoint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
k8sErr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -173,6 +174,10 @@ func (epc *EndpointController) handleEndpointsUpdate(key string) error {
// 1. get service of this endpoint to get service type and service externalTraffixPolicy
svc, err := epc.serviceLister.Services(namespace).Get(name)
if err != nil {
if k8sErr.IsNotFound(err) {
klog.V(4).Infof("endpoints %s/%s has no service, ignore!", namespace, name)
return nil
}
return fmt.Errorf("get service %s/%s error: %v", namespace, name, err)
}
// ignore service which service type != loadbalancer or externalTrafficPolicy != Local
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Error struct {
ResouceName string
}

//Error is method of error interface
// Error is method of error interface
func (e *Error) Error() string {
return fmt.Sprintf("[%s] happened when [%s] type: [%s] name: [%s], msg: [%s]", e.Type, e.Action, e.ResourceType, e.ResouceName, e.Message)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (q *QingCloudClient) DeleteBackends(ids []*string) error {
return err
}

//need update lb
// need update lb
func (q *QingCloudClient) CreateBackends(backends []*apis.LoadBalancerBackend) ([]*string, error) {
if len(backends) <= 0 {
return nil, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (q *QingCloudClient) CreateLB(input *apis.LoadBalancer) (*apis.LoadBalancer
return convertLoadBalancer(lb), nil
}

//need update lb
// need update lb
func (q *QingCloudClient) ModifyLB(conf *apis.LoadBalancer) error {
input := &qcservice.ModifyLoadBalancerAttributesInput{
LoadBalancer: conf.Status.LoadBalancerID,
Expand Down Expand Up @@ -231,7 +231,7 @@ func (q *QingCloudClient) UpdateLB(id *string) error {
return nil
}

//need update before delete
// need update before delete
func (q *QingCloudClient) DeleteLB(id *string) error {
var (
err error
Expand Down
4 changes: 3 additions & 1 deletion pkg/executor/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func convertLoadBalancerListener(inputs []*qcservice.LoadBalancerListener) []*ap
HealthyCheckOption: input.HealthyCheckOption,
BalanceMode: input.BalanceMode,
ServerCertificateID: input.ServerCertificateID,
Timeout: input.Timeout,
},
Status: apis.LoadBalancerListenerStatus{
LoadBalancerListenerID: input.LoadBalancerListenerID,
Expand All @@ -74,13 +75,14 @@ func convertFromLoadBalancerListener(inputs []*apis.LoadBalancerListener) []*qcs
HealthyCheckOption: input.Spec.HealthyCheckOption,
BalanceMode: input.Spec.BalanceMode,
ServerCertificateID: input.Spec.ServerCertificateID,
Timeout: input.Spec.Timeout,
})
}

return result
}

//need update lb
// need update lb
func (q *QingCloudClient) CreateListener(inputs []*apis.LoadBalancerListener) ([]*apis.LoadBalancerListener, error) {
id := inputs[0].Spec.LoadBalancerID
output, err := q.LBService.AddLoadBalancerListeners(&qcservice.AddLoadBalancerListenersInput{
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/sg.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (q *QingCloudClient) DeleteSG(sg *string) error {
return nil
}

//Currently all load balancers that do not specify sg are using the default.
// Currently all load balancers that do not specify sg are using the default.
func (q *QingCloudClient) ensureSecurityGroupByName(name string) (*apis.SecurityGroup, error) {
sg, err := q.GetSecurityGroupByName(name)
if err != nil {
Expand Down
50 changes: 50 additions & 0 deletions pkg/qingcloud/annotation_valid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package qingcloud

import (
"fmt"
"strconv"
"strings"
)

const (
ListenerTimeoutMin = 10
ListenerTimeoutMax = 86400
)

func parseAnnotationIntoIntIntMap(annotationConfValue string) (map[int]int, error) {
result := make(map[int]int)
items := strings.Split(annotationConfValue, ",")
if len(items) == 0 {
return nil, nil
}
for _, item := range items {
parts := strings.Split(item, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid format")
}
k, err1 := strconv.Atoi(parts[0])
v, err2 := strconv.Atoi(parts[1])
if err1 != nil || err2 != nil {
return nil, fmt.Errorf("invalid value")
}
result[k] = v

}
return result, nil
}

func validListenerTimeout(annotationTimeoutConf string) error {
result, err := parseAnnotationIntoIntIntMap(annotationTimeoutConf)
if err != nil {
return fmt.Errorf("invalid timeout conf: %v for nanotation %s ", err, ServiceAnnotationListenerTimeout)
}

// the value must in range 10 ~ 86400
for _, value := range result {
if value < ListenerTimeoutMin || value > ListenerTimeoutMax {
return fmt.Errorf("invalid timeout conf: please spec a timeout value in range (%d, %d)", ListenerTimeoutMin, ListenerTimeoutMax)
}
}

return nil
}
7 changes: 7 additions & 0 deletions pkg/qingcloud/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type LoadBalancerConfig struct {
balanceMode *string
ServerCertificate *string
Protocol *string
Timeout *string

//backend
BackendLabel string
Expand Down Expand Up @@ -184,6 +185,12 @@ func (qc *QingCloud) ParseServiceLBConfig(cluster string, service *v1.Service) (
}
config.BackendCountConfig = backendCount
}
if timeoutConf, ok := annotation[ServiceAnnotationListenerTimeout]; ok {
if err := validListenerTimeout(timeoutConf); err != nil {
return nil, err
}
config.Timeout = &timeoutConf
}

networkType := annotation[ServiceAnnotationLoadBalancerNetworkType]
if config.VxNetID == nil && qc.Config.DefaultVxNetForLB != "" {
Expand Down
69 changes: 57 additions & 12 deletions pkg/qingcloud/loadbalancer_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/yunify/qingcloud-cloud-controller-manager/pkg/apis"
)

const defaultListenerHeathyCheckOption = "10|5|2|5"
const defaultListenerBalanceMode = "roundrobin"
const (
defaultListenerHeathyCheckOption = "10|5|2|5"
defaultListenerBalanceMode = "roundrobin"
defaultTimeout = 50
)

// Make sure qingcloud instance hostname or override-hostname (if provided) is equal to InstanceId
// Recommended to use override-hostname
Expand Down Expand Up @@ -43,8 +46,8 @@ func convertLoadBalancerStatus(status *apis.LoadBalancerStatus) *v1.LoadBalancer
return &v1.LoadBalancerStatus{Ingress: result}
}

//The reason for this is that it is compatible with old logic, and future updates to the
//load balancer will be placed in the relevant crd controller.
// The reason for this is that it is compatible with old logic, and future updates to the
// load balancer will be placed in the relevant crd controller.
func needUpdateAttr(config *LoadBalancerConfig, status *apis.LoadBalancer) *apis.LoadBalancer {
result := &apis.LoadBalancer{
Status: apis.LoadBalancerStatus{
Expand Down Expand Up @@ -78,7 +81,7 @@ func needUpdateAttr(config *LoadBalancerConfig, status *apis.LoadBalancer) *apis
return nil
}

//The load balancer will be shared, filtering out its own listeners.
// The load balancer will be shared, filtering out its own listeners.
func filterListeners(listeners []apis.LoadBalancerListener, prefix string) []*string {
var result []*string

Expand Down Expand Up @@ -154,6 +157,17 @@ func getProtocol(annotationConf map[int]string, port int) *string {
}
}

func getTimeout(timeouts map[int]int, port int) *int {
timeout := defaultTimeout
if timeouts != nil {
if t, ok := timeouts[port]; ok {
return &t
}
}

return &timeout
}

func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerConfig, ports []v1.ServicePort) (toDelete []*string, toAdd []v1.ServicePort, toKeep []*apis.LoadBalancerListener) {
svcNodePort := make(map[string]int)
for _, listener := range listeners {
Expand All @@ -164,6 +178,7 @@ func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerCon

hcs, _ := parseHeathyCheck(conf)
bms, _ := parseBalanceMode(conf)
timeoutConf, _ := parseTimeout(conf)
for _, port := range ports {
add := true
healthyCheck := getHealthyCheck(hcs, int(port.Port), strings.ToLower(string(port.Protocol)))
Expand All @@ -173,7 +188,8 @@ func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerCon
svcNodePort[*listener.Status.LoadBalancerListenerID] == int(port.NodePort) &&
*balanceMode == *listener.Spec.BalanceMode &&
(*healthyCheck.option == *listener.Spec.HealthyCheckOption && *healthyCheck.method == *listener.Spec.HealthyCheckMethod) &&
equalProtocol(listener, conf, port) {
equalProtocol(listener, conf, port) &&
equalTimeout(listener, timeoutConf, int(port.Port)) {
add = false
break
}
Expand All @@ -192,7 +208,8 @@ func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerCon
svcNodePort[*listener.Status.LoadBalancerListenerID] == int(port.NodePort) &&
*balanceMode == *listener.Spec.BalanceMode &&
(*healthyCheck.option == *listener.Spec.HealthyCheckOption && *healthyCheck.method == *listener.Spec.HealthyCheckMethod) &&
equalProtocol(listener, conf, port) {
equalProtocol(listener, conf, port) &&
equalTimeout(listener, timeoutConf, int(port.Port)) {
delete = false
break
}
Expand Down Expand Up @@ -256,11 +273,13 @@ type healthyChek struct {

// data format like this: port1:conf,port2:conf,port3:conf
// Example:
// 1)healthycheckmethod: "80:tcp,443:tcp"
// 2)healthycheckoption: "80:10|5|2|5,443:10|5|2|5"
// 3)balancemode: "80:roundrobin,443:leastconn,8080:source"
// 4)cert: "443:sc-77oko7zj,80:sc-77oko7zj"
// 5)protocol: "443:https,80:http"
//
// 1)healthycheckmethod: "80:tcp,443:tcp"
// 2)healthycheckoption: "80:10|5|2|5,443:10|5|2|5"
// 3)balancemode: "80:roundrobin,443:leastconn,8080:source"
// 4)cert: "443:sc-77oko7zj,80:sc-77oko7zj"
// 5)protocol: "443:https,80:http"
// 6)timeout: "443:10,80:20"
func parseLsnAnnotaionData(data string) (map[int]string, error) {
methods := strings.Split(data, ",")
rst := make(map[int]string, len(methods))
Expand Down Expand Up @@ -344,6 +363,13 @@ func parseProtocol(conf *LoadBalancerConfig) (map[int]string, error) {
return parseLsnAnnotaionData(*conf.Protocol)
}

func parseTimeout(conf *LoadBalancerConfig) (map[int]int, error) {
if conf == nil || conf.Timeout == nil {
return nil, nil
}
return parseAnnotationIntoIntIntMap(*conf.Timeout)
}

func parseAnnotationIntoStringMap(data string) (map[string]string, error) {
parts := strings.Split(data, ",")
rst := make(map[string]string, len(parts))
Expand Down Expand Up @@ -389,6 +415,11 @@ func generateLoadBalancerListeners(conf *LoadBalancerConfig, lb *apis.LoadBalanc
return nil, err
}

timeouts, err := parseTimeout(conf)
if err != nil {
return nil, err
}

var result []*apis.LoadBalancerListener
for _, port := range ports {
protocol := ""
Expand All @@ -406,6 +437,7 @@ func generateLoadBalancerListeners(conf *LoadBalancerConfig, lb *apis.LoadBalanc
balanceMode := getBalanceMode(bms, int(port.Port))
listenerProtocol := getProtocol(protocols, int(port.Port))
certID := getCertificate(certs, int(port.Port))
timeout := getTimeout(timeouts, int(port.Port))

if listenerProtocol == nil {
listenerProtocol = &protocol
Expand All @@ -430,6 +462,7 @@ func generateLoadBalancerListeners(conf *LoadBalancerConfig, lb *apis.LoadBalanc
HealthyCheckOption: healthyCheck.option,
BalanceMode: balanceMode,
ServerCertificateID: serverCertificate,
Timeout: timeout,
},
})
}
Expand Down Expand Up @@ -526,6 +559,18 @@ func equalProtocol(listener *apis.LoadBalancerListener, conf *LoadBalancerConfig
return false
}

func equalTimeout(listener *apis.LoadBalancerListener, timeoutConf map[int]int, port int) bool {
timeout, ok := timeoutConf[port]
if !ok {
return true
} else {
if timeout == *listener.Spec.Timeout {
return true
}
}
return false
}

func getRandomNodes(nodes []*v1.Node, count int) (result []*v1.Node) {
resultMap := make(map[int64]bool)
length := int64(len(nodes))
Expand Down

0 comments on commit eeec280

Please sign in to comment.