From 4dfb1efe0ca1c83b7593e28dd4d0ccf5dca9dd41 Mon Sep 17 00:00:00 2001 From: zhzhuang-zju Date: Wed, 11 Sep 2024 14:14:07 +0800 Subject: [PATCH] karmada-scheduler-estimator add the support for custom DNS suffix Signed-off-by: zhzhuang-zju --- pkg/estimator/client/cache.go | 9 +++++---- pkg/estimator/client/service.go | 20 ++++++++++++-------- pkg/estimator/client/service_test.go | 11 ++++++----- pkg/util/grpcconnection/config.go | 27 ++++++++++++++++++++++----- 4 files changed, 45 insertions(+), 22 deletions(-) diff --git a/pkg/estimator/client/cache.go b/pkg/estimator/client/cache.go index 7bd70eedd880..834915b6ca98 100644 --- a/pkg/estimator/client/cache.go +++ b/pkg/estimator/client/cache.go @@ -18,6 +18,7 @@ package client import ( "fmt" + "strings" "sync" "time" @@ -108,20 +109,20 @@ func EstablishConnection(kubeClient kubernetes.Interface, serviceInfo SchedulerE return nil } - serverAddr, err := resolveCluster(kubeClient, serviceInfo.Namespace, + serverAddrs, err := resolveCluster(kubeClient, serviceInfo.Namespace, names.GenerateEstimatorServiceName(serviceInfo.NamePrefix, serviceInfo.Name), int32(grpcConfig.TargetPort)) if err != nil { return err } - klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, serviceInfo.Name) - cc, err := grpcConfig.DialWithTimeOut(serverAddr, 5*time.Second) + klog.Infof("Start dialing estimator server(%s) of cluster(%s).", strings.Join(serverAddrs, ","), serviceInfo.Name) + cc, err := grpcConfig.DialWithTimeOut(serverAddrs, 5*time.Second) if err != nil { klog.Errorf("Failed to dial cluster(%s): %v.", serviceInfo.Name, err) return err } c := estimatorservice.NewEstimatorClient(cc) estimatorCache.AddCluster(serviceInfo.Name, cc, c) - klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, serviceInfo.Name) + klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", cc.Target(), serviceInfo.Name) return nil } diff --git a/pkg/estimator/client/service.go b/pkg/estimator/client/service.go index c1216557763c..274c061c4a43 100644 --- a/pkg/estimator/client/service.go +++ b/pkg/estimator/client/service.go @@ -31,7 +31,7 @@ import ( // ResolveCluster parses Service resource content by itself. // Fixes Issue https://github.com/karmada-io/karmada/issues/2487 // Modified from "k8s.io/apiserver/pkg/util/proxy/proxy.go:92 => func ResolveCluster" -func resolveCluster(kubeClient kubernetes.Interface, namespace, id string, port int32) (string, error) { +func resolveCluster(kubeClient kubernetes.Interface, namespace, id string, port int32) ([]string, error) { svc, err := kubeClient.CoreV1().Services(namespace).Get(context.TODO(), id, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { @@ -39,29 +39,33 @@ func resolveCluster(kubeClient kubernetes.Interface, namespace, id string, port * When Deploying Karmada in Host Kubernetes Cluster, the kubeClient will connect kube-apiserver * of Karmada Control Plane, rather than of host cluster. * But the Service resource is defined in Host Kubernetes Cluster. So we cannot get its content here. - * The best thing we can do is just glue host:port together, and try to connect to it. + * The best thing we can do is just assemble hosts and ports according to a specific rule, and try to connect to them. */ - return net.JoinHostPort(fmt.Sprintf("%s.%s.svc.cluster.local", id, namespace), fmt.Sprintf("%d", port)), nil + return []string{ + net.JoinHostPort(fmt.Sprintf("%s.%s.svc.cluster.local", id, namespace), fmt.Sprintf("%d", port)), + // To support the environment with a custom DNS suffix. + net.JoinHostPort(fmt.Sprintf("%s.%s.svc", id, namespace), fmt.Sprintf("%d", port)), + }, nil } - return "", err + return nil, err } if svc.Spec.Type != corev1.ServiceTypeExternalName { // We only support ExternalName type here. // See discussions in PR: https://github.com/karmada-io/karmada/pull/2574#discussion_r979539389 - return "", fmt.Errorf("unsupported service type %q", svc.Spec.Type) + return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type) } svcPort, err := findServicePort(svc, port) if err != nil { - return "", err + return nil, err } if svcPort.TargetPort.Type != intstr.Int { - return "", fmt.Errorf("ExternalName service type should have int target port, "+ + return nil, fmt.Errorf("ExternalName service type should have int target port, "+ "current target port: %v", svcPort.TargetPort) } - return net.JoinHostPort(svc.Spec.ExternalName, fmt.Sprintf("%d", svcPort.TargetPort.IntVal)), nil + return []string{net.JoinHostPort(svc.Spec.ExternalName, fmt.Sprintf("%d", svcPort.TargetPort.IntVal))}, nil } // findServicePort finds the service port by name or numerically. diff --git a/pkg/estimator/client/service_test.go b/pkg/estimator/client/service_test.go index 9d074e7864db..8162e56830c0 100644 --- a/pkg/estimator/client/service_test.go +++ b/pkg/estimator/client/service_test.go @@ -18,6 +18,7 @@ package client import ( "context" + "reflect" "testing" corev1 "k8s.io/api/core/v1" @@ -34,7 +35,7 @@ func TestResolveCluster(t *testing.T) { port int32 service *corev1.Service expectError bool - expected string + expected []string }{ { name: "Service not found", @@ -42,7 +43,7 @@ func TestResolveCluster(t *testing.T) { id: "nonexistent", port: 80, service: nil, - expected: "nonexistent.default.svc.cluster.local:80", + expected: []string{"nonexistent.default.svc.cluster.local:80", "nonexistent.default.svc:80"}, }, { name: "Unsupported service type", @@ -81,7 +82,7 @@ func TestResolveCluster(t *testing.T) { }, }, }, - expected: "example.com:8080", + expected: []string{"example.com:8080"}, }, { name: "ExternalName service with non-int target port", @@ -122,8 +123,8 @@ func TestResolveCluster(t *testing.T) { if (err != nil) != tt.expectError { t.Errorf("expected error: %v, got: %v", tt.expectError, err) } - if result != tt.expected { - t.Errorf("expected: %s, got: %s", tt.expected, result) + if !reflect.DeepEqual(tt.expected, result) { + t.Errorf("expected: %v, got: %v", tt.expected, result) } }) } diff --git a/pkg/util/grpcconnection/config.go b/pkg/util/grpcconnection/config.go index 149d0aeefa4e..b15a850b1a04 100644 --- a/pkg/util/grpcconnection/config.go +++ b/pkg/util/grpcconnection/config.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc" grpccredentials "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + utilerrors "k8s.io/apimachinery/pkg/util/errors" ) // ServerConfig the config of GRPC server side. @@ -99,11 +100,8 @@ func (s *ServerConfig) NewServer() (*grpc.Server, error) { return grpc.NewServer(grpc.Creds(grpccredentials.NewTLS(config))), nil } -// DialWithTimeOut creates a client connection to the given target. -func (c *ClientConfig) DialWithTimeOut(path string, timeout time.Duration) (*grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - +// DialWithTimeOut will attempt to create a client connection based on the given targets, one at a time, until a client connection is successfully established. +func (c *ClientConfig) DialWithTimeOut(paths []string, timeout time.Duration) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithBlock(), } @@ -138,6 +136,25 @@ func (c *ClientConfig) DialWithTimeOut(path string, timeout time.Duration) (*grp } opts = append(opts, grpc.WithTransportCredentials(cred)) + + var cc *grpc.ClientConn + var err error + var allErrs []error + for _, path := range paths { + cc, err = createGRPCConnection(path, timeout, opts...) + if err == nil { + return cc, nil + } + allErrs = append(allErrs, err) + } + + return nil, utilerrors.NewAggregate(allErrs) +} + +func createGRPCConnection(path string, timeout time.Duration, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + cc, err := grpc.DialContext(ctx, path, opts...) if err != nil { return nil, fmt.Errorf("dial %s error: %v", path, err)