diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a3d1d1b8a..4a6856e059d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381)) - Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323)) - Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413)) +- Automatically determine the RabbitMQ protocol when possible, and support setting the protocl via TriggerAuthentication ([#1459](https://github.com/kedacore/keda/pulls/1459)) ### Breaking Changes diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 54007891a49..ad484ddb28e 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -29,7 +29,8 @@ const ( const ( httpProtocol = "http" amqpProtocol = "amqp" - defaultProtocol = amqpProtocol + autoProtocol = "auto" + defaultProtocol = autoProtocol ) type rabbitMQScaler struct { @@ -87,12 +88,14 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { // Resolve protocol type meta.protocol = defaultProtocol + if val, ok := config.AuthParams["protocol"]; ok { + meta.protocol = val + } if val, ok := config.TriggerMetadata["protocol"]; ok { - if val == amqpProtocol || val == httpProtocol { - meta.protocol = val - } else { - return nil, fmt.Errorf("the protocol has to be either `%s` or `%s` but is `%s`", amqpProtocol, httpProtocol, val) - } + meta.protocol = val + } + if meta.protocol != amqpProtocol && meta.protocol != httpProtocol && meta.protocol != autoProtocol { + return nil, fmt.Errorf("the protocol has to be either `%s`, `%s`, or `%s` but is `%s`", amqpProtocol, httpProtocol, autoProtocol, meta.protocol) } // Resolve host value @@ -107,6 +110,22 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { return nil, fmt.Errorf("no host setting given") } + // If the protocol is auto, check the host scheme. + if meta.protocol == autoProtocol { + parsedURL, err := url.Parse(meta.host) + if err != nil { + return nil, fmt.Errorf("can't parse host to find protocol: %s", err) + } + switch parsedURL.Scheme { + case "amqp", "amqps": + meta.protocol = amqpProtocol + case "http", "https": + meta.protocol = httpProtocol + default: + return nil, fmt.Errorf("unknown host URL scheme `%s`", parsedURL.Scheme) + } + } + // Resolve queueName if val, ok := config.TriggerMetadata["queueName"]; ok { meta.queueName = val diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index dd5f5f86f60..3a37ba05bb1 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -46,6 +46,14 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"queueLength": "10", "queueName": "sample", "host": host, "protocol": "http"}, false, map[string]string{}}, // queue name with slashes {map[string]string{"queueLength": "10", "queueName": "namespace/name", "hostFromEnv": host}, false, map[string]string{}}, + // protocol defined in authParams + {map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{"protocol": "http"}}, + // auto protocol and a bad URL + {map[string]string{"queueName": "sample", "host": "something://"}, true, map[string]string{}}, + // auto protocol and an HTTP URL + {map[string]string{"queueName": "sample", "host": "http://"}, false, map[string]string{}}, + // auto protocol and an HTTPS URL + {map[string]string{"queueName": "sample", "host": "https://"}, false, map[string]string{}}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{ @@ -67,9 +75,9 @@ func TestRabbitMQParseMetadata(t *testing.T) { var testDefaultQueueLength = []parseRabbitMQMetadataTestData{ // use default queueLength - {map[string]string{"queueName": "sample", "host": host}, false, map[string]string{}}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{}}, // use default queueLength with includeUnacked - {map[string]string{"queueName": "sample", "host": host, "protocol": "http"}, false, map[string]string{}}, + {map[string]string{"queueName": "sample", "hostFromEnv": host, "protocol": "http"}, false, map[string]string{}}, } func TestParseDefaultQueueLength(t *testing.T) { @@ -171,7 +179,7 @@ func TestGetQueueInfo(t *testing.T) { func TestRabbitMQGetMetricSpecForScaling(t *testing.T) { for _, testData := range rabbitMQMetricIdentifiers { - meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: map[string]string{"myHostSecret": "myHostSecret"}, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil}) + meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil}) if err != nil { t.Fatal("Could not parse metadata:", err) }