diff --git a/CHANGELOG.md b/CHANGELOG.md index 53fb9e379f6..8f63d7f9052 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ Here is an overview of all new **experimental** features: - **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962)) - **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830)) +- **General**: Use client-side round-robin load balancing for grpc calls ([#5224](https://github.com/kedacore/keda/issues/5224)) - **GCP pubsub scaler**: Support distribution-valued metrics and metrics from topics ([#5070](https://github.com/kedacore/keda/issues/5070)) - **Hashicorp Vault**: Add support to get secret that needs write operation (e.g. pki) ([#5067](https://github.com/kedacore/keda/issues/5067)) - **Hashicorp Vault**: Fix operator panic when spec.hashiCorpVault.credential.serviceAccount is not set ([#4964](https://github.com/kedacore/keda/issues/4964)) diff --git a/pkg/metricsservice/client.go b/pkg/metricsservice/client.go index 537b172d200..28b7b49f204 100644 --- a/pkg/metricsservice/client.go +++ b/pkg/metricsservice/client.go @@ -37,7 +37,7 @@ type GrpcClient struct { } func NewGrpcClient(url, certDir string) (*GrpcClient, error) { - retryPolicy := `{ + defaultConfig := `{ "methodConfig": [{ "timeout": "3s", "waitForReady": true, @@ -55,7 +55,7 @@ func NewGrpcClient(url, certDir string) (*GrpcClient, error) { } opts := []grpc.DialOption{ grpc.WithTransportCredentials(creds), - grpc.WithDefaultServiceConfig(retryPolicy), + grpc.WithDefaultServiceConfig(defaultConfig), } conn, err := grpc.Dial(url, opts...) if err != nil { diff --git a/pkg/scalers/external_scaler.go b/pkg/scalers/external_scaler.go index 964b649772c..78bbf5dea8b 100644 --- a/pkg/scalers/external_scaler.go +++ b/pkg/scalers/external_scaler.go @@ -50,6 +50,8 @@ type connectionGroup struct { // a pool of connectionGroup per metadata hash var connectionPool sync.Map +const grpcConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}` + // NewExternalScaler creates a new external scaler - calls the GRPC interface // to create a new scaler func NewExternalScaler(config *ScalerConfig) (Scaler, error) { @@ -311,7 +313,9 @@ func getClientForConnectionPool(metadata externalScalerMetadata, logger logr.Log if err != nil { return nil, err } - return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(creds)) + return grpc.Dial(metadata.scalerAddress, + grpc.WithDefaultServiceConfig(grpcConfig), + grpc.WithTransportCredentials(creds)) } tlsConfig, err := util.NewTLSConfig(metadata.tlsClientCert, metadata.tlsClientKey, metadata.caCert, metadata.unsafeSsl) @@ -321,10 +325,14 @@ func getClientForConnectionPool(metadata externalScalerMetadata, logger logr.Log if len(tlsConfig.Certificates) > 0 || metadata.caCert != "" { // nosemgrep: go.grpc.ssrf.grpc-tainted-url-host.grpc-tainted-url-host - return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + return grpc.Dial(metadata.scalerAddress, + grpc.WithDefaultServiceConfig(grpcConfig), + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) } - return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + return grpc.Dial(metadata.scalerAddress, + grpc.WithDefaultServiceConfig(grpcConfig), + grpc.WithTransportCredentials(insecure.NewCredentials())) } // create a unique key per-metadata. If scaledObjects share the same connection properties diff --git a/pkg/scalers/external_scaler_test.go b/pkg/scalers/external_scaler_test.go index 5d85fa75840..569368d92a3 100644 --- a/pkg/scalers/external_scaler_test.go +++ b/pkg/scalers/external_scaler_test.go @@ -246,7 +246,9 @@ func TestWaitForState(t *testing.T) { }() // build client connect to server - grpcClient, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + grpcClient, err := grpc.Dial(address, + grpc.WithDefaultServiceConfig(grpcConfig), + grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Errorf("connect grpc server %s failed:%s", address, err) return diff --git a/pkg/scalers/liiklus_scaler.go b/pkg/scalers/liiklus_scaler.go index bbc635b6ffd..90b1901abec 100644 --- a/pkg/scalers/liiklus_scaler.go +++ b/pkg/scalers/liiklus_scaler.go @@ -69,7 +69,9 @@ func NewLiiklusScaler(config *ScalerConfig) (Scaler, error) { return nil, err } - conn, err := grpc.Dial(lm.address, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(lm.address, + grpc.WithDefaultServiceConfig(grpcConfig), + grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err }