Skip to content

Commit

Permalink
allow kafka scaler to lag for all topics within a consumer group
Browse files Browse the repository at this point in the history
Signed-off-by: Jinli Liang <paul.liang@rokt.com>
  • Loading branch information
PaulLiang1 committed Dec 20, 2021
1 parent dfcae51 commit d745707
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 80 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
- Graphite Scaler: use the latest datapoint returned, not the earliest ([#2365](https://github.com/kedacore/keda/pull/2365))
- Kubernetes Workload Scaler: ignore terminated pods ([#2384](https://github.com/kedacore/keda/pull/2384))
- `keda-operator` Cluster Role: add `list` and `watch` access to service accounts ([#2406](https://github.com/kedacore/keda/pull/2406))|([#2410](https://github.com/kedacore/keda/pull/2410))

- Apache Kafka Scaler: allow flag `topic` to be optional, in such case, lag of all topics within the consumer group will be used for scaling ([#2409](https://github.com/kedacore/keda/pull/2409))
- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))

### Breaking Changes
Expand Down
164 changes: 95 additions & 69 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
case config.TriggerMetadata["topic"] != "":
meta.topic = config.TriggerMetadata["topic"]
default:
return meta, errors.New("no topic given")
meta.topic = ""
kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

meta.offsetResetPolicy = defaultOffsetResetPolicy
Expand Down Expand Up @@ -209,34 +211,35 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {

// IsActive determines if we need to scale from zero
func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
partitions, err := s.getPartitions()
topicPartitions, err := s.getTopicPartitions()
if err != nil {
return false, err
}

offsets, err := s.getOffsets(partitions)
offsets, err := s.getOffsets(topicPartitions)
if err != nil {
return false, err
}

topicOffsets, err := s.getTopicOffsets(partitions)
topicPartitionsOffsets, err := s.getTopicPartitionOffsets(topicPartitions)
if err != nil {
return false, err
}

for _, partition := range partitions {
lag, err := s.getLagForPartition(partition, offsets, topicOffsets)
if err != nil && lag == invalidOffset {
return true, nil
}
kafkaLog.V(1).Info(fmt.Sprintf("Group %s has a lag of %d for topic %s and partition %d\n", s.metadata.group, lag, s.metadata.topic, partition))
for topic, partitionsOffsets := range topicPartitionsOffsets {
for partition := range partitionsOffsets {
lag, err := s.getLagForPartition(topic, partition, offsets, partitionsOffsets)
if err != nil && lag == invalidOffset {
return true, nil
}
kafkaLog.V(1).Info(fmt.Sprintf("Group %s has a lag of %d for topic %s and partition %d\n", s.metadata.group, lag, topic, partition))

// Return as soon as a lag was detected for any partition
if lag > 0 {
return true, nil
// Return as soon as a lag was detected for any partition
if lag > 0 {
return true, nil
}
}
}

return false, nil
}

Expand Down Expand Up @@ -289,46 +292,58 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
return client, admin, nil
}

func (s *kafkaScaler) getPartitions() ([]int32, error) {
topicsMetadata, err := s.admin.DescribeTopics([]string{s.metadata.topic})
func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) {
var topicsToDescribe = make([]string, 0)

// when no topic is specified, query to cg group to fetch all subscribed topics
if s.metadata.topic == "" {
listCGOffsetResponse, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, nil)
if err != nil {
return nil, fmt.Errorf("error listing cg offset: %s", err)
}
for topicName := range listCGOffsetResponse.Blocks {
topicsToDescribe = append(topicsToDescribe, topicName)
}
} else {
topicsToDescribe = []string{s.metadata.topic}
}

topicsMetadata, err := s.admin.DescribeTopics(topicsToDescribe)
if err != nil {
return nil, fmt.Errorf("error describing topics: %s", err)
}
if len(topicsMetadata) != 1 {
return nil, fmt.Errorf("expected only 1 topic metadata, got %d", len(topicsMetadata))
}

partitionMetadata := topicsMetadata[0].Partitions
partitions := make([]int32, len(partitionMetadata))
for i, p := range partitionMetadata {
partitions[i] = p.ID
topicPartitions := make(map[string][]int32, len(topicsMetadata))
for _, topicMetadata := range topicsMetadata {
partitionMetadata := topicMetadata.Partitions
partitions := make([]int32, len(partitionMetadata))
for i, p := range partitionMetadata {
partitions[i] = p.ID
}
topicPartitions[topicMetadata.Name] = partitions
}

return partitions, nil
return topicPartitions, nil
}

func (s *kafkaScaler) getOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) {
offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, map[string][]int32{
s.metadata.topic: partitions,
})

func (s *kafkaScaler) getOffsets(topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, topicPartitions)
if err != nil {
return nil, fmt.Errorf("error listing consumer group offsets: %s", err)
}

return offsets, nil
}

func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.OffsetFetchResponse, topicOffsets map[int32]int64) (int64, error) {
block := offsets.GetBlock(s.metadata.topic, partition)
func (s *kafkaScaler) getLagForPartition(topic string, partition int32, offsets *sarama.OffsetFetchResponse, topicOffsets map[int32]int64) (int64, error) {
block := offsets.GetBlock(topic, partition)
if block == nil {
kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition), "")
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition)
kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partition), "")
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partition)
}
consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partition))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partition)
}

latestOffset := topicOffsets[partition]
Expand All @@ -351,9 +366,17 @@ func (s *kafkaScaler) Close(context.Context) error {

func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI)

var metricName string
if s.metadata.topic != "" {
metricName = fmt.Sprintf("kafka-%s", s.metadata.topic)
} else {
metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.group)
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("kafka-%s", s.metadata.topic))),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(metricName)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand All @@ -366,34 +389,36 @@ func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricS

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
partitions, err := s.getPartitions()
topicPartitions, err := s.getTopicPartitions()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

offsets, err := s.getOffsets(partitions)
offsets, err := s.getOffsets(topicPartitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

topicOffsets, err := s.getTopicOffsets(partitions)
topicsPartitionsOffsets, err := s.getTopicPartitionOffsets(topicPartitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

totalLag := int64(0)
for _, partition := range partitions {
lag, _ := s.getLagForPartition(partition, offsets, topicOffsets)

totalLag += lag
var totalTopicPartitions int64
for topic, partitionsOffsets := range topicsPartitionsOffsets {
for partition := range partitionsOffsets {
lag, _ := s.getLagForPartition(topic, partition, offsets, partitionsOffsets)
totalLag += lag
}
totalTopicPartitions += (int64)(len(partitionsOffsets))
}

kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, partitions %v, threshold %v", totalLag, len(partitions), s.metadata.lagThreshold))
kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, len(topicPartitions), s.metadata.lagThreshold))

if !s.metadata.allowIdleConsumers {
// don't scale out beyond the number of partitions
if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) {
totalLag = int64(len(partitions)) * s.metadata.lagThreshold
// don't scale out beyond the number of topicPartitions
if (totalLag / s.metadata.lagThreshold) > totalTopicPartitions {
totalLag = totalTopicPartitions * s.metadata.lagThreshold
}
}

Expand All @@ -406,7 +431,7 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, error) {
func (s *kafkaScaler) getTopicPartitionOffsets(topicPartitions map[string][]int32) (map[string]map[int32]int64, error) {
version := int16(0)
if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) {
version = 1
Expand All @@ -415,41 +440,42 @@ func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, erro
// Step 1: build one OffsetRequest instance per broker.
requests := make(map[*sarama.Broker]*sarama.OffsetRequest)

for _, partitionID := range partitions {
broker, err := s.client.Leader(s.metadata.topic, partitionID)
if err != nil {
return nil, err
}

request, ok := requests[broker]
if !ok {
request = &sarama.OffsetRequest{Version: version}
requests[broker] = request
for topic, partitions := range topicPartitions {
for _, partitionID := range partitions {
broker, err := s.client.Leader(topic, partitionID)
if err != nil {
return nil, err
}
request, ok := requests[broker]
if !ok {
request = &sarama.OffsetRequest{Version: version}
requests[broker] = request
}
request.AddBlock(topic, partitionID, sarama.OffsetNewest, 1)
}

request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1)
}

offsets := make(map[int32]int64)
topicPartitionsOffsets := make(map[string]map[int32]int64)

// Step 2: send requests, one per broker, and collect offsets
// Step 2: send requests, one per broker, and collect topicPartitionsOffsets
for broker, request := range requests {
response, err := broker.GetAvailableOffsets(request)

if err != nil {
return nil, err
}

for _, blocks := range response.Blocks {
for topic, blocks := range response.Blocks {
if _, found := topicPartitionsOffsets[topic]; !found {
topicPartitionsOffsets[topic] = make(map[int32]int64)
}
for partitionID, block := range blocks {
if block.Err != sarama.ErrNoError {
return nil, block.Err
}

offsets[partitionID] = block.Offset
topicPartitionsOffsets[topic][partitionID] = block.Offset
}
}
}

return offsets, nil
return topicPartitionsOffsets, nil
}
6 changes: 4 additions & 2 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{}, true, 0, nil, "", "", "", false},
// failure, no consumer group
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false},
// failure, no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false},
// success, no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false},
// failure, version not supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success
Expand Down Expand Up @@ -118,6 +118,7 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
{&parseKafkaMetadataTestDataset[4], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[4], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"},
}

func TestGetBrokers(t *testing.T) {
Expand Down Expand Up @@ -190,6 +191,7 @@ func TestKafkaAuthParams(t *testing.T) {
}
}
}

func TestKafkaGetMetricSpecForScaling(t *testing.T) {
for _, testData := range kafkaMetricIdentifiers {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams, ScalerIndex: testData.scalerIndex})
Expand Down
Loading

0 comments on commit d745707

Please sign in to comment.