From 2c773857e1f2fbd3a77cdf3a3b150941fc367dca Mon Sep 17 00:00:00 2001 From: valentin <36446499+qvalentin@users.noreply.github.com> Date: Thu, 18 May 2023 14:33:52 +0200 Subject: [PATCH] feat: Add OAuth extensions for kafka scaler (#4486) Signed-off-by: qvalentin --- CHANGELOG.md | 1 + pkg/scalers/kafka_scaler.go | 15 ++++++++++++++- pkg/scalers/kafka_scaler_oauth_token_provider.go | 6 ++++-- pkg/scalers/kafka_scaler_test.go | 11 +++++++++++ 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e558853416e..89462694695 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Azure Data Exporer Scaler**: Use azidentity SDK ([#4489](https://github.com/kedacore/keda/issues/4489)) - **GCP PubSub Scaler**: Make it more flexible for metrics ([#4243](https://github.com/kedacore/keda/issues/4243)) +- **Kafka Scaler:** Add support for OAuth extensions ([#4544](https://github.com/kedacore/keda/issues/4544)) ### Fixes diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 9a3de9c7e65..00de4af29ef 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -54,6 +54,7 @@ type kafkaMetadata struct { // OAUTHBEARER scopes []string oauthTokenEndpointURI string + oauthExtensions map[string]string // TLS enableTLS bool @@ -163,6 +164,18 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error { return errors.New("no oauth token endpoint uri given") } meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"]) + + meta.oauthExtensions = make(map[string]string) + oauthExtensionsRaw := config.AuthParams["oauthExtensions"] + if oauthExtensionsRaw != "" { + for _, extension := range strings.Split(oauthExtensionsRaw, ",") { + splittedExtension := strings.Split(extension, "=") + if len(splittedExtension) != 2 { + return errors.New("invalid OAuthBearer extension, must be of format key=value") + } + meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1] + } + } } } else { return fmt.Errorf("err SASL mode %s given", mode) @@ -382,7 +395,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin if metadata.saslType == KafkaSASLTypeOAuthbearer { config.Net.SASL.Mechanism = sarama.SASLTypeOAuth - config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes) + config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions) } client, err := sarama.NewClient(metadata.bootstrapServers, config) diff --git a/pkg/scalers/kafka_scaler_oauth_token_provider.go b/pkg/scalers/kafka_scaler_oauth_token_provider.go index e833852c187..7e6515182c4 100644 --- a/pkg/scalers/kafka_scaler_oauth_token_provider.go +++ b/pkg/scalers/kafka_scaler_oauth_token_provider.go @@ -10,9 +10,10 @@ import ( type TokenProvider struct { tokenSource oauth2.TokenSource + extensions map[string]string } -func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []string) sarama.AccessTokenProvider { +func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []string, extensions map[string]string) sarama.AccessTokenProvider { cfg := clientcredentials.Config{ ClientID: clientID, ClientSecret: clientSecret, @@ -22,6 +23,7 @@ func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes [] return &TokenProvider{ tokenSource: cfg.TokenSource(context.Background()), + extensions: extensions, } } @@ -31,5 +33,5 @@ func (t *TokenProvider) Token() (*sarama.AccessToken, error) { return nil, err } - return &sarama.AccessToken{Token: token.AccessToken}, nil + return &sarama.AccessToken{Token: token.AccessToken, Extensions: t.extensions}, nil } diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index d321f5bcfd0..31db0e98af7 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -235,6 +235,12 @@ var parseKafkaOAuthbrearerAuthParamsTestDataset = []parseKafkaAuthParamsTestData {map[string]string{"sasl": "foo", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable"}, true, false}, // failure, SASL OAUTHBEARER + TLS missing oauthTokenEndpointUri {map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "", "tls": "disable"}, true, false}, + // success, SASL OAUTHBEARER + extension + {map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "oauthExtensions": "extension_foo=bar"}, false, false}, + // success, SASL OAUTHBEARER + multiple extensions + {map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "oauthExtensions": "extension_foo=bar,extension_baz=baz"}, false, false}, + // failure, SASL OAUTHBEARER + bad extension + {map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "oauthExtensions": "extension_foo=bar,extension_bazbaz"}, true, false}, } var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ @@ -384,6 +390,11 @@ func TestKafkaOAuthbrearerAuthParams(t *testing.T) { t.Errorf("Expected scopes to be set to %v but got %v\n", strings.Count(testData.authParams["scopes"], ","), len(meta.scopes)) } } + if err == nil && testData.authParams["oauthExtensions"] != "" { + if len(meta.oauthExtensions) != strings.Count(testData.authParams["oauthExtensions"], ",")+1 { + t.Errorf("Expected number of extensions to be set to %v but got %v\n", strings.Count(testData.authParams["oauthExtensions"], ",")+1, len(meta.oauthExtensions)) + } + } } }