Skip to content

Commit

Permalink
Kafka Scaler: check lagThreshold is a positive number (#3367)
Browse files Browse the repository at this point in the history
  • Loading branch information
zroubalik authored Jul 14, 2022
1 parent af80d84 commit 2b8f2fd
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **ActiveMQ Scaler:** KEDA doesn't respect restAPITemplate ([#3188](https://github.com/kedacore/keda/issues/3188))
- **Azure Eventhub Scaler:** KEDA operator crashes on nil memory panic if the eventhub connectionstring for Azure Eventhub Scaler contains an invalid character ([#3082](https://github.com/kedacore/keda/issues/3082))
- **Azure Pipelines Scaler:** Fix issue with Azure Pipelines wrong PAT Auth. ([#3159](https://github.com/kedacore/keda/issues/3159))
- **Kafka Scaler:** Check `lagThreshold` is a positive number ([#3366](https://github.com/kedacore/keda/issues/3366))


### Deprecations
Expand Down
9 changes: 6 additions & 3 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.topic = config.TriggerMetadata["topic"]
default:
meta.topic = ""
kafkaLog.V(1).Info(fmt.Sprintf("consumer group %s has no topic specified, "+
kafkaLog.V(1).Info(fmt.Sprintf("consumer group %q has no topic specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

Expand All @@ -186,7 +186,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
if config.TriggerMetadata["offsetResetPolicy"] != "" {
policy := offsetResetPolicy(config.TriggerMetadata["offsetResetPolicy"])
if policy != earliest && policy != latest {
return meta, fmt.Errorf("err offsetResetPolicy policy %s given", policy)
return meta, fmt.Errorf("err offsetResetPolicy policy %q given", policy)
}
meta.offsetResetPolicy = policy
}
Expand All @@ -196,7 +196,10 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err)
return meta, fmt.Errorf("error parsing %q: %s", lagThresholdMetricName, err)
}
if t <= 0 {
return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName)
}
meta.lagThreshold = t
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false},
// failure, version not supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// failure, lagThreshold is negative value
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// failure, lagThreshold is 0
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success, more brokers
Expand Down Expand Up @@ -118,8 +122,8 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
}

var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
{&parseKafkaMetadataTestDataset[4], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[4], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[6], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[6], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"},
}

Expand Down

0 comments on commit 2b8f2fd

Please sign in to comment.