Skip to content

Commit

Permalink
Export number of members and assigned partitions for each topic in a …
Browse files Browse the repository at this point in the history
…consumer group

Added consumer_group_empty_members, consumer_group_topic_members and 
consumer_group_topic_assigned_partitions metrics.

Fixes redpanda-data#104
  • Loading branch information
amuraru committed Aug 1, 2021
1 parent 45540a5 commit d2e06b7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 2 deletions.
72 changes: 70 additions & 2 deletions prometheus/collect_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand All @@ -18,7 +20,7 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
return false
}

// The list of groups may be incomplete due to group coordinators that might fail to respond. We do log a error
// The list of groups may be incomplete due to group coordinators that might fail to respond. We do log an error
// message in that case (in the kafka request method) and groups will not be included in this list.
for _, grp := range groups {
coordinator := grp.BrokerMetadata.NodeID
Expand Down Expand Up @@ -49,6 +51,72 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
group.State,
strconv.FormatInt(int64(coordinator), 10),
)

// iterate all members and build two maps:
// - {topic -> number-of-consumers}
// - {topic -> number-of-partitions-assigned}
topicConsumers := make(map[string]int)
topicPartitionsAssigned := make(map[string]int)
membersWithEmptyAssignment := 0
var failedAssignmentsDecode int64 = 0
for _, member := range group.Members {
var kassignment kmsg.GroupMemberAssignment
if member.MemberAssignment == nil || len(member.MemberAssignment) == 0 {
membersWithEmptyAssignment++
} else {
if err := kassignment.ReadFrom(member.MemberAssignment); err != nil {
e.logger.Debug("failed to decode consumer group member assignment, internal kafka error",
zap.Error(err),
zap.String("group_id", group.Group),
zap.String("client_id", member.ClientID),
zap.String("member_id", member.MemberID),
zap.String("client_host", member.ClientHost),
)
failedAssignmentsDecode++
} else {
for _, topic := range kassignment.Topics {
topicConsumers[topic.Topic]++
topicPartitionsAssigned[topic.Topic] += len(topic.Partitions)
}
}
}
}
if failedAssignmentsDecode > 0 {
e.logger.Error("failed to decode consumer group member assignment, internal kafka error",
zap.Error(err),
zap.String("group_id", group.Group),
zap.String("assignment_decode_failures", strconv.FormatInt(failedAssignmentsDecode, 10)),
)
}
// number of members with no assignment
if membersWithEmptyAssignment > 0 {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupMembersEmpty,
prometheus.GaugeValue,
float64(membersWithEmptyAssignment),
group.Group,
)
}
// number of members in consumer groups for each topic
for topicName, consumers := range topicConsumers {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupTopicMembers,
prometheus.GaugeValue,
float64(consumers),
group.Group,
topicName,
)
}
// number of partitions assigned in consumer groups for each topic
for topicName, partitions := range topicPartitionsAssigned {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupTopicPartitions,
prometheus.GaugeValue,
float64(partitions),
group.Group,
topicName,
)
}
}
}
return true
Expand Down
27 changes: 27 additions & 0 deletions prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Exporter struct {

// Consumer Groups
consumerGroupInfo *prometheus.Desc
consumerGroupMembersEmpty *prometheus.Desc
consumerGroupTopicMembers *prometheus.Desc
consumerGroupTopicPartitions *prometheus.Desc
consumerGroupTopicOffsetSum *prometheus.Desc
consumerGroupTopicPartitionLag *prometheus.Desc
consumerGroupTopicLag *prometheus.Desc
Expand Down Expand Up @@ -147,6 +150,30 @@ func (e *Exporter) InitializeMetrics() {
[]string{"group_id", "member_count", "protocol", "protocol_type", "state", "coordinator_id"},
nil,
)
// Group Empty Memmbers
e.consumerGroupMembersEmpty = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_empty_members"),
"Consumer Group Empty Members. "+
"It will report the number of members in the consumer group with no partition assigned",
[]string{"group_id"},
nil,
)
// Group Topic Members
e.consumerGroupTopicMembers = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_members"),
"Consumer Group topic member count metrics. "+
"It will report the number of members in the consumer group assigned on a given topic",
[]string{"group_id", "topic_name"},
nil,
)
// Group Topic Assigned Partitions
e.consumerGroupTopicPartitions = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_assigned_partitions"),
"Consumer Group topic partitions count metrics. "+
"It will report the number of partitions assigned in the consumer group for a given topic",
[]string{"group_id", "topic_name"},
nil,
)
// Topic / Partition Offset Sum (useful for calculating the consumed messages / sec on a topic)
e.consumerGroupTopicOffsetSum = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_offset_sum"),
Expand Down

0 comments on commit d2e06b7

Please sign in to comment.