From 869ba21bae716daa3543d043ba8ac6f8ee3f5bed Mon Sep 17 00:00:00 2001 From: wanggangqiang Date: Thu, 17 Nov 2022 09:33:23 +0800 Subject: [PATCH] support config lb listener timeout; Signed-off-by: gangqiangwang ; --- docs/configure.md | 2 + pkg/apis/types.go | 1 + .../clusternode/cluster_node_controller.go | 2 +- .../endpoint/endpoint_controller.go | 5 ++ pkg/errors/errors.go | 2 +- pkg/executor/backend.go | 2 +- pkg/executor/lb.go | 4 +- pkg/executor/listener.go | 4 +- pkg/executor/sg.go | 2 +- pkg/qingcloud/annotation_valid.go | 50 ++++++++++++++ pkg/qingcloud/annotations.go | 7 ++ pkg/qingcloud/loadbalancer_utils.go | 69 +++++++++++++++---- 12 files changed, 131 insertions(+), 19 deletions(-) create mode 100644 pkg/qingcloud/annotation_valid.go diff --git a/docs/configure.md b/docs/configure.md index 5150b5c7..99eda37c 100644 --- a/docs/configure.md +++ b/docs/configure.md @@ -119,6 +119,7 @@ spec: 3. 支持 roundrobin/leastconn/source 三种负载均衡方式,`service.beta.kubernetes.io/qingcloud-lb-listener-balancemode`,默认是 roundrobin 4. 支持 http/https 协议的配置,`service.beta.kubernetes.io/qingcloud-lb-listener-protocol`,没有此注解则默认使用 Service 所用协议 5. 支持 https 协议证书的配置,`service.beta.kubernetes.io/qingcloud-lb-listener-cert`,如果配置的 https 协议,则必须配置证书 +6. 支持监听器的超时时间,` service.beta.kubernetes.io/qingcloud-lb-listener-timeout`,不配置默认是50,可选范围为(10 ~ 86400),单位为s 因为一个LB会有多个监听器,所以进行service注解设置时,通过如下格式区分不同监听器:`80:xxx,443:xxx`。 @@ -136,6 +137,7 @@ metadata: service.beta.kubernetes.io/qingcloud-lb-listener-balancemode: "8090:source" service.beta.kubernetes.io/qingcloud-lb-listener-protocol: "8090:https" service.beta.kubernetes.io/qingcloud-lb-listener-cert: "8090:sc-77oko7zj" + service.beta.kubernetes.io/qingcloud-lb-listener-timeout: "8080:10" spec: selector: app: mylbapp diff --git a/pkg/apis/types.go b/pkg/apis/types.go index da4719ee..f81dd7c9 100644 --- a/pkg/apis/types.go +++ b/pkg/apis/types.go @@ -90,6 +90,7 @@ type LoadBalancerListenerSpec struct { HealthyCheckOption *string `json:"healthy_check_option" name:"healthy_check_option"` BalanceMode *string `json:"balance_mode" name:"balance_mode"` ServerCertificateID []*string `json:"server_certificate_id" name:"server_certificate_id"` + Timeout *int `json:"timeout" name:"timeout"` } type LoadBalancerListenerStatus struct { diff --git a/pkg/controllers/clusternode/cluster_node_controller.go b/pkg/controllers/clusternode/cluster_node_controller.go index 9238d4fd..ef2668d2 100644 --- a/pkg/controllers/clusternode/cluster_node_controller.go +++ b/pkg/controllers/clusternode/cluster_node_controller.go @@ -99,7 +99,7 @@ func New( return cnc, nil } -//check if node label changed +// check if node label changed func (cnc *ClusterNodeController) needsUpdate(old, new *corev1.Node) bool { if len(old.Labels) != len(new.Labels) { diff --git a/pkg/controllers/endpoint/endpoint_controller.go b/pkg/controllers/endpoint/endpoint_controller.go index 144d824b..2b0f2ad6 100644 --- a/pkg/controllers/endpoint/endpoint_controller.go +++ b/pkg/controllers/endpoint/endpoint_controller.go @@ -7,6 +7,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + k8sErr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -173,6 +174,10 @@ func (epc *EndpointController) handleEndpointsUpdate(key string) error { // 1. get service of this endpoint to get service type and service externalTraffixPolicy svc, err := epc.serviceLister.Services(namespace).Get(name) if err != nil { + if k8sErr.IsNotFound(err) { + klog.V(4).Infof("endpoints %s/%s has no service, ignore!", namespace, name) + return nil + } return fmt.Errorf("get service %s/%s error: %v", namespace, name, err) } // ignore service which service type != loadbalancer or externalTrafficPolicy != Local diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index a992296c..ba369e5a 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -21,7 +21,7 @@ type Error struct { ResouceName string } -//Error is method of error interface +// Error is method of error interface func (e *Error) Error() string { return fmt.Sprintf("[%s] happened when [%s] type: [%s] name: [%s], msg: [%s]", e.Type, e.Action, e.ResourceType, e.ResouceName, e.Message) } diff --git a/pkg/executor/backend.go b/pkg/executor/backend.go index 6c5b04a1..cb5e6475 100644 --- a/pkg/executor/backend.go +++ b/pkg/executor/backend.go @@ -29,7 +29,7 @@ func (q *QingCloudClient) DeleteBackends(ids []*string) error { return err } -//need update lb +// need update lb func (q *QingCloudClient) CreateBackends(backends []*apis.LoadBalancerBackend) ([]*string, error) { if len(backends) <= 0 { return nil, nil diff --git a/pkg/executor/lb.go b/pkg/executor/lb.go index b5ff16ad..6514d7ff 100644 --- a/pkg/executor/lb.go +++ b/pkg/executor/lb.go @@ -189,7 +189,7 @@ func (q *QingCloudClient) CreateLB(input *apis.LoadBalancer) (*apis.LoadBalancer return convertLoadBalancer(lb), nil } -//need update lb +// need update lb func (q *QingCloudClient) ModifyLB(conf *apis.LoadBalancer) error { input := &qcservice.ModifyLoadBalancerAttributesInput{ LoadBalancer: conf.Status.LoadBalancerID, @@ -231,7 +231,7 @@ func (q *QingCloudClient) UpdateLB(id *string) error { return nil } -//need update before delete +// need update before delete func (q *QingCloudClient) DeleteLB(id *string) error { var ( err error diff --git a/pkg/executor/listener.go b/pkg/executor/listener.go index 769df96e..fd845be4 100644 --- a/pkg/executor/listener.go +++ b/pkg/executor/listener.go @@ -50,6 +50,7 @@ func convertLoadBalancerListener(inputs []*qcservice.LoadBalancerListener) []*ap HealthyCheckOption: input.HealthyCheckOption, BalanceMode: input.BalanceMode, ServerCertificateID: input.ServerCertificateID, + Timeout: input.Timeout, }, Status: apis.LoadBalancerListenerStatus{ LoadBalancerListenerID: input.LoadBalancerListenerID, @@ -74,13 +75,14 @@ func convertFromLoadBalancerListener(inputs []*apis.LoadBalancerListener) []*qcs HealthyCheckOption: input.Spec.HealthyCheckOption, BalanceMode: input.Spec.BalanceMode, ServerCertificateID: input.Spec.ServerCertificateID, + Timeout: input.Spec.Timeout, }) } return result } -//need update lb +// need update lb func (q *QingCloudClient) CreateListener(inputs []*apis.LoadBalancerListener) ([]*apis.LoadBalancerListener, error) { id := inputs[0].Spec.LoadBalancerID output, err := q.LBService.AddLoadBalancerListeners(&qcservice.AddLoadBalancerListenersInput{ diff --git a/pkg/executor/sg.go b/pkg/executor/sg.go index 601fab1d..e6a400d5 100644 --- a/pkg/executor/sg.go +++ b/pkg/executor/sg.go @@ -118,7 +118,7 @@ func (q *QingCloudClient) DeleteSG(sg *string) error { return nil } -//Currently all load balancers that do not specify sg are using the default. +// Currently all load balancers that do not specify sg are using the default. func (q *QingCloudClient) ensureSecurityGroupByName(name string) (*apis.SecurityGroup, error) { sg, err := q.GetSecurityGroupByName(name) if err != nil { diff --git a/pkg/qingcloud/annotation_valid.go b/pkg/qingcloud/annotation_valid.go new file mode 100644 index 00000000..731c9aaa --- /dev/null +++ b/pkg/qingcloud/annotation_valid.go @@ -0,0 +1,50 @@ +package qingcloud + +import ( + "fmt" + "strconv" + "strings" +) + +const ( + ListenerTimeoutMin = 10 + ListenerTimeoutMax = 86400 +) + +func parseAnnotationIntoIntIntMap(annotationConfValue string) (map[int]int, error) { + result := make(map[int]int) + items := strings.Split(annotationConfValue, ",") + if len(items) == 0 { + return nil, nil + } + for _, item := range items { + parts := strings.Split(item, ":") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid format") + } + k, err1 := strconv.Atoi(parts[0]) + v, err2 := strconv.Atoi(parts[1]) + if err1 != nil || err2 != nil { + return nil, fmt.Errorf("invalid value") + } + result[k] = v + + } + return result, nil +} + +func validListenerTimeout(annotationTimeoutConf string) error { + result, err := parseAnnotationIntoIntIntMap(annotationTimeoutConf) + if err != nil { + return fmt.Errorf("invalid timeout conf: %v for nanotation %s ", err, ServiceAnnotationListenerTimeout) + } + + // the value must in range 10 ~ 86400 + for _, value := range result { + if value < ListenerTimeoutMin || value > ListenerTimeoutMax { + return fmt.Errorf("invalid timeout conf: please spec a timeout value in range (%d, %d)", ListenerTimeoutMin, ListenerTimeoutMax) + } + } + + return nil +} diff --git a/pkg/qingcloud/annotations.go b/pkg/qingcloud/annotations.go index 96f671c2..294d44b0 100644 --- a/pkg/qingcloud/annotations.go +++ b/pkg/qingcloud/annotations.go @@ -107,6 +107,7 @@ type LoadBalancerConfig struct { balanceMode *string ServerCertificate *string Protocol *string + Timeout *string //backend BackendLabel string @@ -184,6 +185,12 @@ func (qc *QingCloud) ParseServiceLBConfig(cluster string, service *v1.Service) ( } config.BackendCountConfig = backendCount } + if timeoutConf, ok := annotation[ServiceAnnotationListenerTimeout]; ok { + if err := validListenerTimeout(timeoutConf); err != nil { + return nil, err + } + config.Timeout = &timeoutConf + } networkType := annotation[ServiceAnnotationLoadBalancerNetworkType] if config.VxNetID == nil && qc.Config.DefaultVxNetForLB != "" { diff --git a/pkg/qingcloud/loadbalancer_utils.go b/pkg/qingcloud/loadbalancer_utils.go index 449112f1..82e1de37 100644 --- a/pkg/qingcloud/loadbalancer_utils.go +++ b/pkg/qingcloud/loadbalancer_utils.go @@ -13,8 +13,11 @@ import ( "github.com/yunify/qingcloud-cloud-controller-manager/pkg/apis" ) -const defaultListenerHeathyCheckOption = "10|5|2|5" -const defaultListenerBalanceMode = "roundrobin" +const ( + defaultListenerHeathyCheckOption = "10|5|2|5" + defaultListenerBalanceMode = "roundrobin" + defaultTimeout = 50 +) // Make sure qingcloud instance hostname or override-hostname (if provided) is equal to InstanceId // Recommended to use override-hostname @@ -43,8 +46,8 @@ func convertLoadBalancerStatus(status *apis.LoadBalancerStatus) *v1.LoadBalancer return &v1.LoadBalancerStatus{Ingress: result} } -//The reason for this is that it is compatible with old logic, and future updates to the -//load balancer will be placed in the relevant crd controller. +// The reason for this is that it is compatible with old logic, and future updates to the +// load balancer will be placed in the relevant crd controller. func needUpdateAttr(config *LoadBalancerConfig, status *apis.LoadBalancer) *apis.LoadBalancer { result := &apis.LoadBalancer{ Status: apis.LoadBalancerStatus{ @@ -78,7 +81,7 @@ func needUpdateAttr(config *LoadBalancerConfig, status *apis.LoadBalancer) *apis return nil } -//The load balancer will be shared, filtering out its own listeners. +// The load balancer will be shared, filtering out its own listeners. func filterListeners(listeners []apis.LoadBalancerListener, prefix string) []*string { var result []*string @@ -154,6 +157,17 @@ func getProtocol(annotationConf map[int]string, port int) *string { } } +func getTimeout(timeouts map[int]int, port int) *int { + timeout := defaultTimeout + if timeouts != nil { + if t, ok := timeouts[port]; ok { + return &t + } + } + + return &timeout +} + func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerConfig, ports []v1.ServicePort) (toDelete []*string, toAdd []v1.ServicePort, toKeep []*apis.LoadBalancerListener) { svcNodePort := make(map[string]int) for _, listener := range listeners { @@ -164,6 +178,7 @@ func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerCon hcs, _ := parseHeathyCheck(conf) bms, _ := parseBalanceMode(conf) + timeoutConf, _ := parseTimeout(conf) for _, port := range ports { add := true healthyCheck := getHealthyCheck(hcs, int(port.Port), strings.ToLower(string(port.Protocol))) @@ -173,7 +188,8 @@ func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerCon svcNodePort[*listener.Status.LoadBalancerListenerID] == int(port.NodePort) && *balanceMode == *listener.Spec.BalanceMode && (*healthyCheck.option == *listener.Spec.HealthyCheckOption && *healthyCheck.method == *listener.Spec.HealthyCheckMethod) && - equalProtocol(listener, conf, port) { + equalProtocol(listener, conf, port) && + equalTimeout(listener, timeoutConf, int(port.Port)) { add = false break } @@ -192,7 +208,8 @@ func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerCon svcNodePort[*listener.Status.LoadBalancerListenerID] == int(port.NodePort) && *balanceMode == *listener.Spec.BalanceMode && (*healthyCheck.option == *listener.Spec.HealthyCheckOption && *healthyCheck.method == *listener.Spec.HealthyCheckMethod) && - equalProtocol(listener, conf, port) { + equalProtocol(listener, conf, port) && + equalTimeout(listener, timeoutConf, int(port.Port)) { delete = false break } @@ -256,11 +273,13 @@ type healthyChek struct { // data format like this: port1:conf,port2:conf,port3:conf // Example: -// 1)healthycheckmethod: "80:tcp,443:tcp" -// 2)healthycheckoption: "80:10|5|2|5,443:10|5|2|5" -// 3)balancemode: "80:roundrobin,443:leastconn,8080:source" -// 4)cert: "443:sc-77oko7zj,80:sc-77oko7zj" -// 5)protocol: "443:https,80:http" +// +// 1)healthycheckmethod: "80:tcp,443:tcp" +// 2)healthycheckoption: "80:10|5|2|5,443:10|5|2|5" +// 3)balancemode: "80:roundrobin,443:leastconn,8080:source" +// 4)cert: "443:sc-77oko7zj,80:sc-77oko7zj" +// 5)protocol: "443:https,80:http" +// 6)timeout: "443:10,80:20" func parseLsnAnnotaionData(data string) (map[int]string, error) { methods := strings.Split(data, ",") rst := make(map[int]string, len(methods)) @@ -344,6 +363,13 @@ func parseProtocol(conf *LoadBalancerConfig) (map[int]string, error) { return parseLsnAnnotaionData(*conf.Protocol) } +func parseTimeout(conf *LoadBalancerConfig) (map[int]int, error) { + if conf == nil || conf.Timeout == nil { + return nil, nil + } + return parseAnnotationIntoIntIntMap(*conf.Timeout) +} + func parseAnnotationIntoStringMap(data string) (map[string]string, error) { parts := strings.Split(data, ",") rst := make(map[string]string, len(parts)) @@ -389,6 +415,11 @@ func generateLoadBalancerListeners(conf *LoadBalancerConfig, lb *apis.LoadBalanc return nil, err } + timeouts, err := parseTimeout(conf) + if err != nil { + return nil, err + } + var result []*apis.LoadBalancerListener for _, port := range ports { protocol := "" @@ -406,6 +437,7 @@ func generateLoadBalancerListeners(conf *LoadBalancerConfig, lb *apis.LoadBalanc balanceMode := getBalanceMode(bms, int(port.Port)) listenerProtocol := getProtocol(protocols, int(port.Port)) certID := getCertificate(certs, int(port.Port)) + timeout := getTimeout(timeouts, int(port.Port)) if listenerProtocol == nil { listenerProtocol = &protocol @@ -430,6 +462,7 @@ func generateLoadBalancerListeners(conf *LoadBalancerConfig, lb *apis.LoadBalanc HealthyCheckOption: healthyCheck.option, BalanceMode: balanceMode, ServerCertificateID: serverCertificate, + Timeout: timeout, }, }) } @@ -526,6 +559,18 @@ func equalProtocol(listener *apis.LoadBalancerListener, conf *LoadBalancerConfig return false } +func equalTimeout(listener *apis.LoadBalancerListener, timeoutConf map[int]int, port int) bool { + timeout, ok := timeoutConf[port] + if !ok { + return true + } else { + if timeout == *listener.Spec.Timeout { + return true + } + } + return false +} + func getRandomNodes(nodes []*v1.Node, count int) (result []*v1.Node) { resultMap := make(map[int64]bool) length := int64(len(nodes))