Skip to content

Commit

Permalink
General: use client-side round-robin load balancing for grpc (kedacor…
Browse files Browse the repository at this point in the history
…e#5225)

Signed-off-by: Bojan Zelic <bnzelic@gmail.com>
  • Loading branch information
BojanZelic authored Dec 8, 2023
1 parent 3a7d496 commit 19f14a8
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Here is an overview of all new **experimental** features:

- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830))
- **General**: Use client-side round-robin load balancing for grpc calls ([#5224](https://github.com/kedacore/keda/issues/5224))
- **GCP pubsub scaler**: Support distribution-valued metrics and metrics from topics ([#5070](https://github.com/kedacore/keda/issues/5070))
- **Hashicorp Vault**: Add support to get secret that needs write operation (e.g. pki) ([#5067](https://github.com/kedacore/keda/issues/5067))
- **Hashicorp Vault**: Fix operator panic when spec.hashiCorpVault.credential.serviceAccount is not set ([#4964](https://github.com/kedacore/keda/issues/4964))
Expand Down
4 changes: 2 additions & 2 deletions pkg/metricsservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type GrpcClient struct {
}

func NewGrpcClient(url, certDir string) (*GrpcClient, error) {
retryPolicy := `{
defaultConfig := `{
"methodConfig": [{
"timeout": "3s",
"waitForReady": true,
Expand All @@ -55,7 +55,7 @@ func NewGrpcClient(url, certDir string) (*GrpcClient, error) {
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithDefaultServiceConfig(defaultConfig),
}
conn, err := grpc.Dial(url, opts...)
if err != nil {
Expand Down
14 changes: 11 additions & 3 deletions pkg/scalers/external_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type connectionGroup struct {
// a pool of connectionGroup per metadata hash
var connectionPool sync.Map

const grpcConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`

// NewExternalScaler creates a new external scaler - calls the GRPC interface
// to create a new scaler
func NewExternalScaler(config *ScalerConfig) (Scaler, error) {
Expand Down Expand Up @@ -311,7 +313,9 @@ func getClientForConnectionPool(metadata externalScalerMetadata, logger logr.Log
if err != nil {
return nil, err
}
return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(creds))
return grpc.Dial(metadata.scalerAddress,
grpc.WithDefaultServiceConfig(grpcConfig),
grpc.WithTransportCredentials(creds))
}

tlsConfig, err := util.NewTLSConfig(metadata.tlsClientCert, metadata.tlsClientKey, metadata.caCert, metadata.unsafeSsl)
Expand All @@ -321,10 +325,14 @@ func getClientForConnectionPool(metadata externalScalerMetadata, logger logr.Log

if len(tlsConfig.Certificates) > 0 || metadata.caCert != "" {
// nosemgrep: go.grpc.ssrf.grpc-tainted-url-host.grpc-tainted-url-host
return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
return grpc.Dial(metadata.scalerAddress,
grpc.WithDefaultServiceConfig(grpcConfig),
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
}

return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
return grpc.Dial(metadata.scalerAddress,
grpc.WithDefaultServiceConfig(grpcConfig),
grpc.WithTransportCredentials(insecure.NewCredentials()))
}

// create a unique key per-metadata. If scaledObjects share the same connection properties
Expand Down
4 changes: 3 additions & 1 deletion pkg/scalers/external_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ func TestWaitForState(t *testing.T) {
}()

// build client connect to server
grpcClient, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
grpcClient, err := grpc.Dial(address,
grpc.WithDefaultServiceConfig(grpcConfig),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Errorf("connect grpc server %s failed:%s", address, err)
return
Expand Down
4 changes: 3 additions & 1 deletion pkg/scalers/liiklus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func NewLiiklusScaler(config *ScalerConfig) (Scaler, error) {
return nil, err
}

conn, err := grpc.Dial(lm.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(lm.address,
grpc.WithDefaultServiceConfig(grpcConfig),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 19f14a8

Please sign in to comment.