Skip to content

Commit

Permalink
Update to franz-go v1.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Nov 3, 2021
1 parent d2b9046 commit 21e8a6b
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 38 deletions.
2 changes: 1 addition & 1 deletion e2e/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) (
r := kmsg.NewAlterPartitionAssignmentsRequest()
reassignmentTopicReq := kmsg.NewAlterPartitionAssignmentsRequestTopic()
reassignmentTopicReq.Partitions = partitionReassignments
reassignmentTopicReq.Topic = topicMeta.Topic
reassignmentTopicReq.Topic = pointerStrToStr(topicMeta.Topic)
r.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{reassignmentTopicReq}
reassignmentReq = &r
}
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/cloudhut/kminion/v2
go 1.15

require (
github.com/ReneKroon/ttlcache/v2 v2.8.0 // indirect
github.com/ReneKroon/ttlcache/v2 v2.8.0
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0
github.com/jcmturner/gokrb5/v8 v8.4.2
Expand All @@ -12,14 +12,13 @@ require (
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.4.1
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pelletier/go-toml v1.9.1 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/common v0.24.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/twmb/franz-go v0.10.2
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210823212011-0d01f7456b4d
github.com/twmb/franz-go v1.2.2
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914042331-106aef61b693
go.uber.org/atomic v1.7.0
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.16.0
Expand Down
22 changes: 9 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -191,8 +188,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/knadh/koanf v0.16.0 h1:qQqGvE8hs/y5pZTG5kT354vqUqsDKQcXX8IOq2Rg11Y=
github.com/knadh/koanf v0.16.0/go.mod h1:DMZ6jQlhA3PqxnKR63luVaBtDemi/m8v/FpXI7B5Ez8=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -269,8 +266,6 @@ github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:L
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/pelletier/go-toml v1.9.1 h1:a6qW1EVNZWH9WGI6CsYdD8WAylkoXBS5yv0XHlh17Tc=
Expand Down Expand Up @@ -351,10 +346,10 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/twmb/franz-go v0.10.2 h1:xg6kFR6CmtSNRJAzUuu+S/l9ThKYbwPn6AEvbqD5hw8=
github.com/twmb/franz-go v0.10.2/go.mod h1:MmlCzU1afJbPlAEKimYwYOv1ICIJcrIu7gaKMPt/xS0=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210823212011-0d01f7456b4d h1:8k2HiP1zjY6N+OvNrBR/621PafQ2h1qdiu+JwT+WAk0=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210823212011-0d01f7456b4d/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go v1.2.2 h1:Nv+Fb6eUklF8pB8VhDZm0RJ9diaRw9nQybcl+livhI8=
github.com/twmb/franz-go v1.2.2/go.mod h1:KerrVhzNpasYrWJLr2Yj6Cui43f1BxH4U9SJEDVOjqQ=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914042331-106aef61b693 h1:5O4u9Lc69/GIOnSIWieuwwpr0hZr7vDOhCp0hXJAqXw=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914042331-106aef61b693/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand All @@ -371,6 +366,7 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down Expand Up @@ -426,8 +422,8 @@ golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e h1:+b/22bPvDYt4NPDcy4xAGCmON713ONAWFeY3Z7I3tR8=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
2 changes: 1 addition & 1 deletion kafka/client_config_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
kgo.MaxVersions(kversion.V2_7_0()),
kgo.ClientID(cfg.ClientID),
kgo.FetchMaxBytes(5 * 1000 * 1000), // 5MB
kgo.AllowedConcurrentFetches(10),
kgo.MaxConcurrentFetches(10),
}

// Create Logger
Expand Down
2 changes: 1 addition & 1 deletion minion/describe_topic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsRes
for _, topic := range metadata.Topics {
resourceReq := kmsg.NewDescribeConfigsRequestResource()
resourceReq.ResourceType = kmsg.ConfigResourceTypeTopic
resourceReq.ResourceName = topic.Topic
resourceReq.ResourceName = *topic.Topic
req.Resources = append(req.Resources, resourceReq)
}

Expand Down
2 changes: 1 addition & 1 deletion minion/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListO
topicReqs := make([]kmsg.ListOffsetsRequestTopic, len(metadata.Topics))
for i, topic := range metadata.Topics {
req := kmsg.NewListOffsetsRequestTopic()
req.Topic = topic.Topic
req.Topic = *topic.Topic

partitionReqs := make([]kmsg.ListOffsetsRequestTopicPartition, len(topic.Partitions))
for j, partition := range topic.Partitions {
Expand Down
26 changes: 13 additions & 13 deletions minion/offset_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ func (s *Service) startConsumingOffsets(ctx context.Context) {
func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) {
for {
time.Sleep(12 * time.Second)
s.logger.Debug("checking if lag in consumer offsets topic is caught up")
s.logger.Debug("checking if lag in consumer offsets metadataReqTopic is caught up")

// 1. Get topic high watermarks for __consumer_offsets topic
req := kmsg.NewMetadataRequest()
topic := kmsg.NewMetadataRequestTopic()
// 1. Get metadataReqTopic high watermarks for __consumer_offsets metadataReqTopic
metadataReq := kmsg.NewMetadataRequest()
metadataReqTopic := kmsg.NewMetadataRequestTopic()
topicName := "__consumer_offsets"
topic.Topic = &topicName
req.Topics = []kmsg.MetadataRequestTopic{topic}
metadataReqTopic.Topic = &topicName
metadataReq.Topics = []kmsg.MetadataRequestTopic{metadataReqTopic}

res, err := req.RequestWith(ctx, s.client)
res, err := metadataReq.RequestWith(ctx, s.client)
if err != nil {
s.logger.Warn("failed to check if consumer lag on offsets topic is caught up because metadata request failed",
s.logger.Warn("failed to check if consumer lag on offsets metadataReqTopic is caught up because metadata request failed",
zap.Error(err))
continue
}
Expand All @@ -76,7 +76,7 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) {
topicReqs := make([]kmsg.ListOffsetsRequestTopic, len(res.Topics))
for i, topic := range res.Topics {
req := kmsg.NewListOffsetsRequestTopic()
req.Topic = topic.Topic
req.Topic = *topic.Topic

partitionReqs := make([]kmsg.ListOffsetsRequestTopicPartition, len(topic.Partitions))
for j, partition := range topic.Partitions {
Expand All @@ -92,12 +92,12 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) {
offsetReq.Topics = topicReqs
highMarksRes, err := offsetReq.RequestWith(ctx, s.client)
if err != nil {
s.logger.Warn("failed to check if consumer lag on offsets topic is caught up because high watermark request failed",
s.logger.Warn("failed to check if consumer lag on offsets metadataReqTopic is caught up because high watermark request failed",
zap.Error(err))
continue
}
if len(highMarksRes.Topics) != 1 {
s.logger.Error("expected exactly one topic response for high water mark request")
s.logger.Error("expected exactly one metadataReqTopic response for high water mark request")
continue
}

Expand All @@ -118,7 +118,7 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) {
for _, partition := range topicRes.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
s.logger.Warn("failed to check if consumer lag on offsets topic is caught up because high "+
s.logger.Warn("failed to check if consumer lag on offsets metadataReqTopic is caught up because high "+
"watermark request failed, with an inner error",
zap.Error(err))
}
Expand All @@ -137,7 +137,7 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) {
Lag: partitionLag,
})
totalLag += partitionLag
s.logger.Debug("consumer_offsets topic lag has not been caught up yet",
s.logger.Debug("consumer_offsets metadataReqTopic lag has not been caught up yet",
zap.Int32("partition_id", partition.Partition),
zap.Int64("high_water_mark", highWaterMark),
zap.Int64("consumed_offset", consumedOffset),
Expand Down
9 changes: 5 additions & 4 deletions prometheus/collect_topic_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
}

for _, topic := range metadata.Topics {
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
topicName := *topic.Topic
if !e.minionSvc.IsTopicAllowed(topicName) {
continue
}
typedErr := kerr.TypedErrorForCode(topic.ErrorCode)
if typedErr != nil {
isOk = false
e.logger.Warn("failed to get metadata of a specific topic",
zap.String("topic_name", topic.Topic),
zap.String("topic_name", topicName),
zap.Error(typedErr))
continue
}
Expand All @@ -66,11 +67,11 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
}

var labelsValues []string
labelsValues = append(labelsValues, topic.Topic)
labelsValues = append(labelsValues, topicName)
labelsValues = append(labelsValues, strconv.Itoa(partitionCount))
labelsValues = append(labelsValues, strconv.Itoa(replicationFactor))
for _, key := range e.minionSvc.Cfg.Topics.InfoMetric.ConfigKeys {
labelsValues = append(labelsValues, getOrDefault(configsByTopic[topic.Topic], key, "N/A"))
labelsValues = append(labelsValues, getOrDefault(configsByTopic[topicName], key, "N/A"))
}
ch <- prometheus.MustNewConstMetric(
e.topicInfo,
Expand Down

0 comments on commit 21e8a6b

Please sign in to comment.