From f4261e3e60f787a289d43d77ae6f28cf868e1a8b Mon Sep 17 00:00:00 2001 From: Rob Pickerill Date: Wed, 21 Aug 2024 12:39:06 +0100 Subject: [PATCH] Add connection name for the rabbitmq scaler (#6093) * add a static connection name Signed-off-by: robpickerill * Update pkg/scalers/rabbitmq_scaler.go Co-authored-by: Jorge Turrado Ferrero Signed-off-by: Rob Pickerill * add improvement to changelog Signed-off-by: robpickerill * add namepace and so name to conn name Signed-off-by: robpickerill * Update comment Signed-off-by: Jorge Turrado Ferrero --------- Signed-off-by: robpickerill Signed-off-by: Rob Pickerill Signed-off-by: Jorge Turrado Ferrero Co-authored-by: Jorge Turrado Ferrero --- CHANGELOG.md | 2 ++ pkg/scalers/rabbitmq_scaler.go | 36 +++++++++++++++++++---------- pkg/scalers/rabbitmq_scaler_test.go | 13 +++++++++++ 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 928a8128c72..5f86988e080 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,8 @@ Here is an overview of all new **experimental** features: - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) +- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958)) +- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) ### Fixes diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index be2ac66995f..f30d92fa13a 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -69,6 +69,7 @@ type rabbitMQScaler struct { type rabbitMQMetadata struct { queueName string + connectionName string // name used for the AMQP connection mode string // QueueLength or MessageRate value float64 // trigger value (queue length or publish/sec. rate) activationValue float64 // activation value @@ -232,7 +233,9 @@ func resolveTLSAuthParams(config *scalersconfig.ScalerConfig, meta *rabbitMQMeta } func parseRabbitMQMetadata(config *scalersconfig.ScalerConfig) (*rabbitMQMetadata, error) { - meta := rabbitMQMetadata{} + meta := rabbitMQMetadata{ + connectionName: connectionName(config), + } // Resolve protocol type if err := resolveProtocol(config, &meta); err != nil { @@ -445,22 +448,25 @@ func parseTrigger(meta *rabbitMQMetadata, config *scalersconfig.ScalerConfig) (* } // getConnectionAndChannel returns an amqp connection. If enableTLS is true tls connection is made using -// -// the given ceClient cert, ceClient key,and CA certificate. If clientKeyPassword is not empty the provided password will be used to -// +// the given ceClient cert, ceClient key,and CA certificate. If clientKeyPassword is not empty the provided password will be used to // decrypt the given key. If enableTLS is disabled then amqp connection will be created without tls. func getConnectionAndChannel(host string, meta *rabbitMQMetadata) (*amqp.Connection, *amqp.Channel, error) { - var conn *amqp.Connection - var err error + amqpConfig := amqp.Config{ + Properties: amqp.Table{ + "connection_name": meta.connectionName, + }, + } + if meta.enableTLS { - tlsConfig, configErr := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca, meta.unsafeSsl) - if configErr != nil { - return nil, nil, configErr + tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca, meta.unsafeSsl) + if err != nil { + return nil, nil, err } - conn, err = amqp.DialTLS(host, tlsConfig) - } else { - conn, err = amqp.Dial(host) + + amqpConfig.TLSClientConfig = tlsConfig } + + conn, err := amqp.DialConfig(host, amqpConfig) if err != nil { return nil, nil, err } @@ -715,3 +721,9 @@ func (s *rabbitMQScaler) anonymizeRabbitMQError(err error) error { errorMessage := fmt.Sprintf("error inspecting rabbitMQ: %s", err) return fmt.Errorf(rabbitMQAnonymizePattern.ReplaceAllString(errorMessage, "user:password@")) } + +// connectionName is used to provide a deterministic AMQP connection name when +// connecting to RabbitMQ +func connectionName(config *scalersconfig.ScalerConfig) string { + return fmt.Sprintf("keda-%s-%s", config.ScalableObjectNamespace, config.ScalableObjectName) +} diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 1d2ba1eacd2..ef705757a3c 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -746,3 +746,16 @@ func TestRegexQueueMissingError(t *testing.T) { } } } + +func TestConnectionName(t *testing.T) { + c := scalersconfig.ScalerConfig{ + ScalableObjectNamespace: "test-namespace", + ScalableObjectName: "test-name", + } + + connectionName := connectionName(&c) + + if connectionName != "keda-test-namespace-test-name" { + t.Error("Expected connection name to be keda-test-namespace-test-name but got", connectionName) + } +}