Skip to content

Commit

Permalink
External Scaler: Add tls options in TriggerAuth metadata (kedacore#4407)
Browse files Browse the repository at this point in the history
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
  • Loading branch information
dttung2905 authored May 18, 2023
1 parent 994d5e6 commit dcfcddc
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### Improvements

- **Azure Data Exporer Scaler**: Use azidentity SDK ([#4489](https://github.com/kedacore/keda/issues/4489))
- **External Scaler**: Add tls options in TriggerAuth metadata. ([#3565](https://github.com/kedacore/keda/issues/3565))
- **GCP PubSub Scaler**: Make it more flexible for metrics ([#4243](https://github.com/kedacore/keda/issues/4243))
- **Kafka Scaler:** Add support for OAuth extensions ([#4544](https://github.com/kedacore/keda/issues/4544))

Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func parseAzureQueueMetadata(config *ScalerConfig, logger logr.Logger) (*azureQu

// before triggerAuthentication CRD, pod identity was configured using this property
if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" {
if val == "true" {
if val == stringTrue {
config.PodIdentity = kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}
}
}
Expand Down
48 changes: 43 additions & 5 deletions pkg/scalers/external_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/metrics/pkg/apis/external_metrics"

pb "github.com/kedacore/keda/v2/pkg/scalers/externalscaler"
"github.com/kedacore/keda/v2/pkg/util"
)

type externalScaler struct {
Expand All @@ -35,6 +37,10 @@ type externalScalerMetadata struct {
tlsCertFile string
originalMetadata map[string]string
scalerIndex int
caCert string
tlsClientCert string
tlsClientKey string
unsafeSsl bool
}

type connectionGroup struct {
Expand Down Expand Up @@ -112,7 +118,26 @@ func parseExternalScalerMetadata(config *ScalerConfig) (externalScalerMetadata,
}

meta.originalMetadata = make(map[string]string)
if val, ok := config.AuthParams["caCert"]; ok {
meta.caCert = val
}

if val, ok := config.AuthParams["tlsClientCert"]; ok {
meta.tlsClientCert = val
}

if val, ok := config.AuthParams["tlsClientKey"]; ok {
meta.tlsClientKey = val
}

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok && val != "" {
boolVal, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("failed to parse insecureSkipVerify value. Must be either true or false")
}
meta.unsafeSsl = boolVal
}
// Add elements to metadata
for key, value := range config.TriggerMetadata {
// Check if key is in resolved environment and resolve
Expand All @@ -136,7 +161,7 @@ func (s *externalScaler) Close(context.Context) error {
func (s *externalScaler) GetMetricSpecForScaling(ctx context.Context) []v2.MetricSpec {
var result []v2.MetricSpec

grpcClient, err := getClientForConnectionPool(s.metadata)
grpcClient, err := getClientForConnectionPool(s.metadata, s.logger)
if err != nil {
s.logger.Error(err, "error building grpc connection")
return result
Expand Down Expand Up @@ -171,7 +196,7 @@ func (s *externalScaler) GetMetricSpecForScaling(ctx context.Context) []v2.Metri
// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *externalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var metrics []external_metrics.ExternalMetricValue
grpcClient, err := getClientForConnectionPool(s.metadata)
grpcClient, err := getClientForConnectionPool(s.metadata, s.logger)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, err
}
Expand Down Expand Up @@ -212,7 +237,7 @@ func (s *externalPushScaler) Run(ctx context.Context, active chan<- bool) {
defer close(active)
// It's possible for the connection to get terminated anytime, we need to run this in a retry loop
runWithLog := func() {
grpcClient, err := getClientForConnectionPool(s.metadata)
grpcClient, err := getClientForConnectionPool(s.metadata, s.logger)
if err != nil {
s.logger.Error(err, "error running internalRun")
return
Expand All @@ -223,7 +248,7 @@ func (s *externalPushScaler) Run(ctx context.Context, active chan<- bool) {
}
}

// retry on error from runWithLog() starting by 2 sec backing off * 2 with a max of 1 minute
// retry on error from runWithLog() starting by 2 sec backing off * 2 with a max of 2 minute
retryDuration := time.Second * 2
// the caller of this function needs to ensure that they call Stop() on the resulting
// timer, to release background resources.
Expand Down Expand Up @@ -273,19 +298,32 @@ var connectionPoolMutex sync.Mutex

// getClientForConnectionPool returns a grpcClient and a done() Func. The done() function must be called once the client is no longer
// in use to clean up the shared grpc.ClientConn
func getClientForConnectionPool(metadata externalScalerMetadata) (pb.ExternalScalerClient, error) {
func getClientForConnectionPool(metadata externalScalerMetadata, logger logr.Logger) (pb.ExternalScalerClient, error) {
connectionPoolMutex.Lock()
defer connectionPoolMutex.Unlock()

buildGRPCConnection := func(metadata externalScalerMetadata) (*grpc.ClientConn, error) {
// FIXME: DEPRECATED to be removed in v2.13 https://github.com/kedacore/keda/issues/4549
if metadata.tlsCertFile != "" {
logger.V(1).Info("tlsCertFile in ScaleObject metadata will be deprecated in v2.12. Please use" +
"tlsClientCert, tlsClientKey and caCert in TriggerAuthentication instead.")
creds, err := credentials.NewClientTLSFromFile(metadata.tlsCertFile, "")
if err != nil {
return nil, err
}
return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(creds))
}

tlsConfig, err := util.NewTLSConfig(metadata.tlsClientCert, metadata.tlsClientKey, metadata.caCert, metadata.unsafeSsl)
if err != nil {
return nil, err
}

if len(tlsConfig.Certificates) > 0 || metadata.caCert != "" {
// nosemgrep: go.grpc.ssrf.grpc-tainted-url-host.grpc-tainted-url-host
return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
}

return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

Expand Down
61 changes: 53 additions & 8 deletions pkg/scalers/external_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,71 @@ import (
pb "github.com/kedacore/keda/v2/pkg/scalers/externalscaler"
)

var serverRootCA = `-----BEGIN CERTIFICATE-----
MIIDPTCCAiWgAwIBAgIUTPXiztn8CG3+NQdeEIlpA1F9Ec4wDQYJKoZIhvcNAQEL
BQAwLTEOMAwGA1UEAwwFa2VkYTExCzAJBgNVBAYTAlVTMQ4wDAYDVQQHDAVFYXJ0
aDAgFw0yMzAzMjYxMzU1MDJaGA8yMDUxMDgwMjEzNTUwMlowLTEOMAwGA1UEAwwF
a2VkYTExCzAJBgNVBAYTAlVTMQ4wDAYDVQQHDAVFYXJ0aDCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBAOaQl2+EEZycNQlO1nEeFgheUZ20gTVAButKjvEK
vIZv+x4AdwNaOcKahro5b09QinoGakTJEOrpks+VUSqQpJ+zVmja5vpIb92gnmGQ
B3rl7nD9rP/bLffa5bNDhmMR7vRd88PYvopn6+hTyX3EGkvbCZD8WNs5f8jslzek
0xj4U4LgC9T1pBykNl5nZs5Fd4CdaO+vi3cywmgjaiPzDOYMbYH4pzflH7aNsEEc
IYs9fQ8SwzsocXpKUS+bTg9OmrDMAwan+mxz6m15BxvJzHQqmp/aE70BSkinBwCg
dgzgQUwg6ko/jnJixP4tkr8p8nURBL7GNvuVIS7Z2EjD240CAwEAAaNTMFEwHQYD
VR0OBBYEFN03E9o2ne0s5GZSZ7rZczisME5RMB8GA1UdIwQYMBaAFN03E9o2ne0s
5GZSZ7rZczisME5RMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB
ADM6vkgjttrU9bYRviwdgohvGNYegDXfyKt25gwYn5+UwUtqjjztTWMr6aAKLNob
Dqjb0BDR5Ow0kD9KXyO4m0gTBzxrHzDnFeTQxE2h8Gl/VRGueJQ2sfmU8oG2/3GV
4nEWLAu1XMqXcFQWT9X+JS3Wxqc1DLrAeX8u0ZIx5Lkk4kPV3d3BP8KyX+AQGt57
p0kdhXOTNW1kUPCrtnc0uBJNHqlev3KkHebH20G7iAZFCCpul9cyLK1fftCBuVE/
jtq3TnHHw+BroQPje3zF/MZTAA8Z9RejkpALMtoHeE68ar07FPlC8wZXDlfQXzYS
PWO1PoIiMX1UsfdZ35JCOF4=
-----END CERTIFICATE-----
`
var clientCert = `-----BEGIN CERTIFICATE-----
MIIC9jCCAd4CFEGcfEWHP2ckC/kIgUEDUPkAVHHIMA0GCSqGSIb3DQEBCwUAMC0x
DjAMBgNVBAMMBWtlZGEyMQswCQYDVQQGEwJVUzEOMAwGA1UEBwwFRWFydGgwIBcN
MjMwMzI2MTM1NTAzWhgPMjA1MTA2MDcxMzU1MDNaMEAxCzAJBgNVBAYTAlVTMQsw
CQYDVQQIDAJDQTEUMBIGA1UECgwLTXlPcmcsIEluYy4xDjAMBgNVBAMMBWtlZGEy
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3k7zr+QbOHXMqhyUM6Oz
SuGGqQttIGZEs12eLSRlGY9vL+pf/G3CubkGsTtp5b5tmP4CNYcGtU8wSJMn23Bq
BbXECpDXh6cuo+56VVAMJyNqZoIeS4JfKX5mvj4WrfpVJ3e+o6lrebAICK1qqTwq
z6N6ZUkeF552aTh8RAgEiKJlylmKdHc/IF3+oLc2aA7IAl+zxYOTCrjIiUrc54OB
gYCkCSb0P4SPHE8ryB0pN8S3LdtZrgVIzewd24joKbXL7hBZ3ltaj0t2kK2CTCxb
I3te0JdIaPV2TxCnK9dxLRkFXNjz/7V45rbRLXtSPoirKseUR2zbo5kfquY0J2hv
nwIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQCiZI/N/60Gj8711V+Uhj1j9iw3s4oU
qT4r9NozotrhjIMe14rhkJ1k+1x7pyLX8QHgO0WxiAp8tKX0kcUQO/ZQfTAM6FpW
cevGTxrVk5CcilafIBzaZF5Mz6diBxbTnhFS+hZXiwavkImBK4zZY9aUcVIjJfPv
xSaEVvMdofrhaio9M0deYzQ/Bf/uMkR3Fxs4qbhsg3gkbepFm3yJoSANzXCMvDnv
mauSvQwA0SRKECr46F8dSeFE1uIbN4ZNgrisBTVkoPYZuF7pAsSsjqGM0phUKiI8
5thG2dnqJSunC+vZW8QY+x3eq4XzFEpIYcaV9YpiGHbv8N6gzJydL/GB
-----END CERTIFICATE-----
`

type parseExternalScalerMetadataTestData struct {
metadata map[string]string
isError bool
metadata map[string]string
isError bool
authParams map[string]string
}

var testExternalScalerMetadata = []parseExternalScalerMetadataTestData{
{map[string]string{}, true},
{map[string]string{}, true, map[string]string{}},
// all properly formed
{map[string]string{"scalerAddress": "myservice", "test1": "7", "test2": "SAMPLE_CREDS"}, false},
{map[string]string{"scalerAddress": "myservice", "test1": "7", "test2": "SAMPLE_CREDS", "insecureSkipVerify": "true"}, false, map[string]string{"caCert": serverRootCA, "tlsClientCert": clientCert}},
// missing scalerAddress
{map[string]string{"test1": "1", "test2": "SAMPLE_CREDS"}, true},
{map[string]string{"test1": "1", "test2": "SAMPLE_CREDS"}, true, map[string]string{}},
}

func TestExternalScalerParseMetadata(t *testing.T) {
for _, testData := range testExternalScalerMetadata {
_, err := parseExternalScalerMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: map[string]string{}})
metadata, err := parseExternalScalerMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: map[string]string{}, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}

if testData.metadata["unsafeSsl"] == "true" && !metadata.unsafeSsl {
t.Error("Expected unsafeSsl to be true but got", metadata.unsafeSsl)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
Expand Down Expand Up @@ -104,7 +150,6 @@ type testServer struct {

func createGRPCServers(count int, t *testing.T) []testServer {
result := make([]testServer, 0, count)

for i := 0; i < count; i++ {
grpcServer := grpc.NewServer()
address := fmt.Sprintf("127.0.0.1:%d", 5050+i)
Expand Down Expand Up @@ -211,7 +256,7 @@ func TestWaitForState(t *testing.T) {
// server stop will lead to Idle.
<-waitForState(context.TODO(), grpcClient, connectivity.Idle, connectivity.Shutdown)
grpcClient.Close()
// after close the state to Shutdown.
// after close the state to shut down.
t.Log("close state:", grpcClient.GetState().String())
close(graceDone)
}()
Expand Down
39 changes: 24 additions & 15 deletions pkg/util/tls_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password string
issuer string
CACert string
isError bool
}{
{
name: "rsaCert_WithCACert",
Expand All @@ -195,6 +196,7 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password: "keypass",
issuer: "O=Internet Widgits Pty Ltd,ST=Some-State,C=AU",
CACert: randomCACert,
isError: false,
},
{
name: "Cert_WithCACert",
Expand All @@ -203,6 +205,7 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password: "keypass",
issuer: "O=Internet Widgits Pty Ltd,ST=Some-State,C=AU",
CACert: randomCACert,
isError: false,
},
{
name: "rsaCert_WithoutCACert",
Expand All @@ -211,6 +214,7 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password: "keypass",
issuer: "O=Internet Widgits Pty Ltd,ST=Some-State,C=AU",
CACert: "",
isError: false,
},
{
name: "Cert_WithoutCACert",
Expand All @@ -219,28 +223,33 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password: "keypass",
issuer: "O=Internet Widgits Pty Ltd,ST=Some-State,C=AU",
CACert: "",
isError: false,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
config, err := NewTLSConfigWithPassword(test.cert, test.key, test.password, test.CACert, false)
if err != nil {
t.Errorf("Should have no error: %s", err)
}
cert, err := x509.ParseCertificate(config.Certificates[0].Certificate[0])
if err != nil {
t.Errorf("Bad certificate")
}
switch {
case err != nil && !test.isError:
t.Errorf("Expected success but got error: %s", err)
case test.isError && err == nil:
t.Errorf("Expect error but got success")
case err == nil:
cert, err := x509.ParseCertificate(config.Certificates[0].Certificate[0])
if err != nil {
t.Errorf("Bad certificate")
}

if test.CACert != "" {
caCertPool := getRootCAs()
caCertPool.AppendCertsFromPEM([]byte(randomCACert))
if !config.RootCAs.Equal(caCertPool) {
t.Errorf("TLS config return different CA cert")
if test.CACert != "" {
caCertPool := getRootCAs()
caCertPool.AppendCertsFromPEM([]byte(randomCACert))
if !config.RootCAs.Equal(caCertPool) {
t.Errorf("TLS config return different CA cert")
}
}
if cert.Issuer.String() != test.issuer {
t.Errorf("Expected Issuer %s but got %s\n", test.issuer, cert.Issuer.String())
}
}
if cert.Issuer.String() != test.issuer {
t.Errorf("Expected Issuer %s but got %s\n", test.issuer, cert.Issuer.String())
}
})
}
Expand Down

0 comments on commit dcfcddc

Please sign in to comment.