Skip to content

Commit

Permalink
Export number of members and assigned partitions for each topic in a …
Browse files Browse the repository at this point in the history
…consumer group

Added consumer_group_empty_members, consumer_group_topic_members and 
consumer_group_topic_assigned_partitions metrics.

Fixes #104
  • Loading branch information
amuraru committed Sep 20, 2021
1 parent 30f84f5 commit 320627b
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 7 deletions.
12 changes: 12 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ kminion_kafka_topic_high_water_mark_sum{topic_name="__consumer_offsets"} 1.51202
# TYPE kminion_kafka_consumer_group_info gauge
kminion_kafka_consumer_group_info{coordinator_id="0",group_id="bigquery-sink",member_count="2",protocol="range",protocol_type="consumer",state="Stable"} 1
# HELP kminion_kafka_consumer_group_empty_members Consumer Group Empty Members. It will report the number of members in the consumer group with no partition assigned
# TYPE kminion_kafka_consumer_group_empty_members gauge
kminion_kafka_consumer_group_empty_members{group_id="bigquery-sink"} 1
# HELP kminion_kafka_consumer_group_topic_members Consumer Group topic member count metrics. It will report the number of members in the consumer group assigned on a given topic
# TYPE kminion_kafka_consumer_group_topic_members gauge
kminion_kafka_consumer_group_topic_members{group_id="bigquery-sink",topic_name="shop-activity"} 4
# HELP kminion_kafka_consumer_group_topic_assigned_partitions Consumer Group topic partitions count metrics. It will report the number of partitions assigned in the consumer group for a given topic
# TYPE kminion_kafka_consumer_group_topic_assigned_partitions gauge
kminion_kafka_consumer_group_topic_assigned_partitions{group_id="bigquery-sink",topic_name="shop-activity"} 32
# HELP kminion_kafka_consumer_group_topic_offset_sum The sum of all committed group offsets across all partitions in a topic
# TYPE kminion_kafka_consumer_group_topic_offset_sum gauge
kminion_kafka_consumer_group_topic_offset_sum{group_id="bigquery-sink",topic_name="shop-activity"} 4.259513e+06
Expand Down
71 changes: 69 additions & 2 deletions prometheus/collect_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand All @@ -18,7 +20,7 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
return false
}

// The list of groups may be incomplete due to group coordinators that might fail to respond. We do log a error
// The list of groups may be incomplete due to group coordinators that might fail to respond. We do log an error
// message in that case (in the kafka request method) and groups will not be included in this list.
for _, grp := range groups {
coordinator := grp.BrokerMetadata.NodeID
Expand Down Expand Up @@ -49,6 +51,71 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
group.State,
strconv.FormatInt(int64(coordinator), 10),
)

// iterate all members and build two maps:
// - {topic -> number-of-consumers}
// - {topic -> number-of-partitions-assigned}
topicConsumers := make(map[string]int)
topicPartitionsAssigned := make(map[string]int)
membersWithEmptyAssignment := 0
failedAssignmentsDecode := int64(0)
for _, member := range group.Members {
kassignment := kmsg.NewGroupMemberAssignment()
if err := kassignment.ReadFrom(member.MemberAssignment); err != nil {
e.logger.Debug("failed to decode consumer group member assignment, internal kafka error",
zap.Error(err),
zap.String("group_id", group.Group),
zap.String("client_id", member.ClientID),
zap.String("member_id", member.MemberID),
zap.String("client_host", member.ClientHost),
)
failedAssignmentsDecode++
continue
}
if len(kassignment.Topics) == 0 {
membersWithEmptyAssignment++
}
for _, topic := range kassignment.Topics {
topicConsumers[topic.Topic]++
topicPartitionsAssigned[topic.Topic] += len(topic.Partitions)
}
}
if failedAssignmentsDecode > 0 {
e.logger.Error("failed to decode consumer group member assignment, internal kafka error",
zap.Error(err),
zap.String("group_id", group.Group),
zap.Int64("assignment_decode_failures", failedAssignmentsDecode),
)
}
// number of members with no assignment in a stable consumer group
if membersWithEmptyAssignment > 0 {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupMembersEmpty,
prometheus.GaugeValue,
float64(membersWithEmptyAssignment),
group.Group,
)
}
// number of members in consumer groups for each topic
for topicName, consumers := range topicConsumers {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupTopicMembers,
prometheus.GaugeValue,
float64(consumers),
group.Group,
topicName,
)
}
// number of partitions assigned in consumer groups for each topic
for topicName, partitions := range topicPartitionsAssigned {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupAssignedTopicPartitions,
prometheus.GaugeValue,
float64(partitions),
group.Group,
topicName,
)
}
}
}
return true
Expand Down
37 changes: 32 additions & 5 deletions prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ type Exporter struct {
partitionLowWaterMark *prometheus.Desc

// Consumer Groups
consumerGroupInfo *prometheus.Desc
consumerGroupTopicOffsetSum *prometheus.Desc
consumerGroupTopicPartitionLag *prometheus.Desc
consumerGroupTopicLag *prometheus.Desc
offsetCommits *prometheus.Desc
consumerGroupInfo *prometheus.Desc
consumerGroupMembersEmpty *prometheus.Desc
consumerGroupTopicMembers *prometheus.Desc
consumerGroupAssignedTopicPartitions *prometheus.Desc
consumerGroupTopicOffsetSum *prometheus.Desc
consumerGroupTopicPartitionLag *prometheus.Desc
consumerGroupTopicLag *prometheus.Desc
offsetCommits *prometheus.Desc
}

func NewExporter(cfg Config, logger *zap.Logger, minionSvc *minion.Service) (*Exporter, error) {
Expand Down Expand Up @@ -147,6 +150,30 @@ func (e *Exporter) InitializeMetrics() {
[]string{"group_id", "member_count", "protocol", "protocol_type", "state", "coordinator_id"},
nil,
)
// Group Empty Memmbers
e.consumerGroupMembersEmpty = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_empty_members"),
"Consumer Group Empty Members. "+
"It will report the number of members in the consumer group with no partition assigned",
[]string{"group_id"},
nil,
)
// Group Topic Members
e.consumerGroupTopicMembers = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_members"),
"Consumer Group topic member count metrics. "+
"It will report the number of members in the consumer group assigned on a given topic",
[]string{"group_id", "topic_name"},
nil,
)
// Group Topic Assigned Partitions
e.consumerGroupAssignedTopicPartitions = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_assigned_partitions"),
"Consumer Group topic partitions count metrics. "+
"It will report the number of partitions assigned in the consumer group for a given topic",
[]string{"group_id", "topic_name"},
nil,
)
// Topic / Partition Offset Sum (useful for calculating the consumed messages / sec on a topic)
e.consumerGroupTopicOffsetSum = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_offset_sum"),
Expand Down

0 comments on commit 320627b

Please sign in to comment.