Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move sasl and tls flag to ScaledObject instead of specifying in TriggerAuthentication #4322

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features:
- **Azure Pipelines Scaler**: New configuration parameter `requireAllDemands` to scale only if jobs request all demands provided by the scaling definition ([#4138](https://github.com/kedacore/keda/issues/4138))
- **Hashicorp Vault**: Add support to secrets backend version 1 ([#2645](https://github.com/kedacore/keda/issues/2645))
- **Kafka Scaler**: Improve error logging for `GetBlock` method ([#4232](https://github.com/kedacore/keda/issues/4232))
- **Kafka Scaler**: Add support to use `tls` and `sasl` in ScaledObject ([#4232](https://github.com/kedacore/keda/issues/4322))
- **Prometheus Scaler**: Add custom headers and custom auth support ([#4208](https://github.com/kedacore/keda/issues/4208))
- **RabbitMQ Scaler**: Add TLS support ([#967](https://github.com/kedacore/keda/issues/967))
- **Redis Scalers**: Add support to Redis 7 ([#4052](https://github.com/kedacore/keda/issues/4052))
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_blob_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func parseAzureBlobMetadata(config *ScalerConfig, logger logr.Logger) (*azure.Bl
meta.EndpointSuffix = endpointSuffix

// before triggerAuthentication CRD, pod identity was configured using this property
if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" && val == "true" {
if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" && val == stringTrue {
config.PodIdentity = kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}
}

Expand Down
84 changes: 62 additions & 22 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type kafkaScaler struct {
previousOffsets map[string]map[int32]int64
}

const (
stringEnable = "enable"
stringDisable = "disable"
)

type kafkaMetadata struct {
bootstrapServers []string
group string
Expand Down Expand Up @@ -121,9 +126,23 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) {

func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
meta.saslType = KafkaSASLTypeNone
var saslAuthType string
switch {
case config.TriggerMetadata["sasl"] != "":
saslAuthType = config.TriggerMetadata["sasl"]
default:
saslAuthType = ""
}

if val, ok := config.AuthParams["sasl"]; ok {
val = strings.TrimSpace(val)
mode := kafkaSaslType(val)
if saslAuthType != "" {
return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together")
}
saslAuthType = val
}

if saslAuthType != "" {
mode := kafkaSaslType(saslAuthType)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer {
if config.AuthParams["username"] == "" {
Expand Down Expand Up @@ -151,30 +170,51 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
}

meta.enableTLS = false
enableTLS := false
if val, ok := config.TriggerMetadata["tls"]; ok {
switch val {
case stringEnable:
enableTLS = true
case stringDisable:
enableTLS = false
default:
return fmt.Errorf("error incorrect TLS value given, got %s", val)
}
}

if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)
if enableTLS {
return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together")
}
switch val {
case stringEnable:
enableTLS = true
case stringDisable:
enableTLS = false
default:
return fmt.Errorf("error incorrect TLS value given, got %s", val)
}
}

if val == "enable" {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
} else if val != "disable" {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
if enableTLS {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
}

return nil
Expand Down
103 changes: 100 additions & 3 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type parseKafkaAuthParamsTestData struct {
enableTLS bool
}

// Testing the case where `tls` and `sasl` are specified in ScaledObject
type parseAuthParamsTestDataSecondAuthMethod struct {
metadata map[string]string
authParams map[string]string
isError bool
enableTLS bool
}

type kafkaMetricIdentifier struct {
metadataTestData *parseKafkaMetadataTestData
scalerIndex int
Expand Down Expand Up @@ -158,8 +166,65 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
// failure, SASL + TLS, missing key
{map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false},
}
var parseAuthParamsTestDataset = []parseAuthParamsTestDataSecondAuthMethod{
// success, SASL plaintext
{map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false},
// success, SASL scram_sha256
{map[string]string{"sasl": "scram_sha256", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false},
// success, SASL scram_sha512
{map[string]string{"sasl": "scram_sha512", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false},
// success, TLS only
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// success, TLS cert/key and assumed public CA
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"cert": "ceert", "key": "keey"}, false, true},
// success, TLS cert/key + key password and assumed public CA
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true},
// success, TLS CA only
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa"}, false, true},
// success, SASL + TLS
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// success, SASL + TLS explicitly disabled
{map[string]string{"sasl": "plaintext", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false},
// success, SASL OAUTHBEARER + TLS explicitly disabled
{map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com"}, false, false},
// failure, SASL OAUTHBEARER + TLS explicitly disable + bad SASL type
{map[string]string{"sasl": "foo", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com"}, true, false},
// success, SASL OAUTHBEARER + TLS missing scope
{map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "oauthTokenEndpointUri": "https://website.com"}, false, false},
// failure, SASL OAUTHBEARER + TLS missing oauthTokenEndpointUri
{map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": ""}, true, false},
// failure, SASL incorrect type
{map[string]string{"sasl": "foo", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, true, false},
// failure, SASL missing username
{map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"password": "admin"}, true, false},
// failure, SASL missing password
{map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin"}, true, false},
// failure, TLS missing cert
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "key": "keey"}, true, false},
// failure, TLS missing key
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert"}, true, false},
// failure, TLS invalid
{map[string]string{"tls": "random", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, incorrect SASL type
{map[string]string{"sasl": "foo", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, incorrect tls
{map[string]string{"sasl": "plaintext", "tls": "foo", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing username
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing password
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing cert
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "key": "keey"}, true, false},
// failure, SASL + TLS, missing key
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert"}, true, false},

// failure, setting SASL values in both places
{map[string]string{"sasl": "scram_sha512", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "scram_sha512", "username": "admin", "password": "admin"}, true, false},
// failure, setting TLS values in both places
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, true},
}

var parseKafkaOAuthbreakerAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
var parseKafkaOAuthbrearerAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
// success, SASL OAUTHBEARER + TLS
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable"}, false, false},
// success, SASL OAUTHBEARER + TLS multiple scopes
Expand Down Expand Up @@ -243,6 +308,7 @@ func TestGetBrokers(t *testing.T) {
}

func TestKafkaAuthParams(t *testing.T) {
// Testing tls and sasl value in TriggerAuthentication
for _, testData := range parseKafkaAuthParamsTestDataset {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard())

Expand Down Expand Up @@ -270,10 +336,41 @@ func TestKafkaAuthParams(t *testing.T) {
}
}
}

// Testing tls and sasl value in scaledObject
for id, testData := range parseAuthParamsTestDataset {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard())

if err != nil && !testData.isError {
t.Errorf("Test case: %v. Expected success but got error %v", id, err)
}
if testData.isError && err == nil {
t.Errorf("Test case: %v. Expected error but got success", id)
}
if !testData.isError {
if testData.metadata["tls"] == "true" && !meta.enableTLS {
t.Errorf("Test case: %v. Expected tls to be set to %v but got %v\n", id, testData.metadata["tls"], meta.enableTLS)
}
if meta.enableTLS {
if meta.ca != testData.authParams["ca"] {
t.Errorf("Test case: %v. Expected ca to be set to %v but got %v\n", id, testData.authParams["ca"], meta.ca)
}
if meta.cert != testData.authParams["cert"] {
t.Errorf("Test case: %v. Expected cert to be set to %v but got %v\n", id, testData.authParams["cert"], meta.cert)
}
if meta.key != testData.authParams["key"] {
t.Errorf("Test case: %v. Expected key to be set to %v but got %v\n", id, testData.authParams["key"], meta.key)
}
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Test case: %v. Expected key to be set to %v but got %v\n", id, testData.authParams["keyPassword"], meta.keyPassword)
}
}
}
}
}

func TestKafkaOAuthbreakerAuthParams(t *testing.T) {
for _, testData := range parseKafkaOAuthbreakerAuthParamsTestDataset {
func TestKafkaOAuthbrearerAuthParams(t *testing.T) {
for _, testData := range parseKafkaOAuthbrearerAuthParamsTestDataset {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard())

if err != nil && !testData.isError {
Expand Down