Skip to content

Commit

Permalink
Export number of members and assigned partitions for each topic in a …
Browse files Browse the repository at this point in the history
…consumer group

Added consumer_group_topic_members and consumer_group_topic_assigned_partitions metrics.

Fixes redpanda-data#104
  • Loading branch information
amuraru committed Aug 1, 2021
1 parent 00ef547 commit 4998ba2
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
41 changes: 41 additions & 0 deletions prometheus/collect_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -58,6 +59,46 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
group.Group,
)
}

// iterate all members and build two maps:
// - {topic -> number-of-consumers}
// - {topic -> number-of-partitions-assigned}
topic_consumers := make(map[string]int)
topic_partitions_assigned := make(map[string]int)
for _, member := range group.Members {
var kassignment kmsg.GroupMemberAssignment
if err := kassignment.ReadFrom(member.MemberAssignment); err != nil {
e.logger.Warn("failed to decode consumer group member assignment, internal kafka error",
zap.Error(err),
zap.String("group_id", group.Group),
)
} else {
for _, topic := range kassignment.Topics {
topic_consumers[topic.Topic]++
topic_partitions_assigned[topic.Topic] += len(topic.Partitions)
}
}
}
// number of members in consumer groups for each topic
for topic_name, consumers := range topic_consumers {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupTopicMembers,
prometheus.GaugeValue,
float64(consumers),
group.Group,
topic_name,
)
}
// number of partitions assigned in consumer groups for each topic
for topic_name, partitions := range topic_consumers {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupTopicPartitions,
prometheus.GaugeValue,
float64(partitions),
group.Group,
topic_name,
)
}
}
}
return true
Expand Down
18 changes: 18 additions & 0 deletions prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Exporter struct {
// Consumer Groups
consumerGroupInfo *prometheus.Desc
consumerGroupMembers *prometheus.Desc
consumerGroupTopicMembers *prometheus.Desc
consumerGroupTopicPartitions *prometheus.Desc
consumerGroupTopicOffsetSum *prometheus.Desc
consumerGroupTopicPartitionLag *prometheus.Desc
consumerGroupTopicLag *prometheus.Desc
Expand Down Expand Up @@ -155,6 +157,22 @@ func (e *Exporter) InitializeMetrics() {
[]string{"group_id"},
nil,
)
// Group Topic Members
e.consumerGroupTopicMembers = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_members"),
"Consumer Group topic member count metrics. "+
"It will report the number of members in the consumer group assigned on a given topic",
[]string{"group_id", "topic_name"},
nil,
)
// Group Topic Assigned Partitions
e.consumerGroupTopicPartitions = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_assigned_partitions"),
"Consumer Group topic partitions count metrics. "+
"It will report the number of partitions assigned in the consumer group for a given topic",
[]string{"group_id", "topic_name"},
nil,
)
// Topic / Partition Offset Sum (useful for calculating the consumed messages / sec on a topic)
e.consumerGroupTopicOffsetSum = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_offset_sum"),
Expand Down

0 comments on commit 4998ba2

Please sign in to comment.