Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2 from vend/retry
Browse files Browse the repository at this point in the history
Add connection retry logic
  • Loading branch information
Mick Staugaard authored Jan 9, 2020
2 parents 37b7726 + 94fcb63 commit e2b2208
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 21 deletions.
13 changes: 3 additions & 10 deletions cmd/createTopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,11 @@ kafkaCLI createTopic --bootstrap-server kafka:9092 --partitions 4 --replication-
`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client, err := kafkaClient()
if err != nil {
panic(err)
}
kafkaAdmin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
panic(err)
}
admin := kafkaAdmin()

for _, topicName := range args {
fmt.Println("Creating topic " + topicName)
err = kafkaAdmin.CreateTopic(topicName, topicDetail(), false)
err := admin.CreateTopic(topicName, topicDetail(), false)
if err != nil {
switch err.(type) {
case *sarama.TopicError:
Expand All @@ -51,7 +44,7 @@ kafkaCLI createTopic --bootstrap-server kafka:9092 --partitions 4 --replication-
}
}

_ = kafkaAdmin.Close()
_ = admin.Close()
},
}

Expand Down
13 changes: 4 additions & 9 deletions cmd/deleteTopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@ kafkaCLI deleteTopic --bootstrap-server kafka:9092 topic1 topic2
`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client, err := kafkaClient()
if err != nil {
panic(err)
}
kafkaAdmin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
panic(err)
}
admin := kafkaAdmin()

for _, topicName := range args {
fmt.Println("Deleting topic " + topicName)
err = kafkaAdmin.DeleteTopic(topicName)
err := admin.DeleteTopic(topicName)

if err != nil {
switch err {
Expand All @@ -40,6 +33,8 @@ kafkaCLI deleteTopic --bootstrap-server kafka:9092 topic1 topic2
}
}
}

_ = admin.Close()
},
}

Expand Down
46 changes: 44 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"fmt"
"os"
"time"

"github.com/Shopify/sarama"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -30,9 +31,50 @@ func init() {
_ = rootCmd.MarkPersistentFlagRequired("bootstrap-server")
}

func kafkaClient() (sarama.Client, error) {
func kafkaClient() sarama.Client {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V1_0_0_0
addresses := []string{bootstrapServer}
return sarama.NewClient(addresses, kafkaConfig)

var client sarama.Client
var err error

for i := 0; i < 30; i++ {
client, err = sarama.NewClient(addresses, kafkaConfig)
if err == nil {
break
}

fmt.Println("failed to connect to " + bootstrapServer + " Retrying in 1s")
time.Sleep(time.Second)
}

if err != nil {
panic(err)
}

return client
}

func kafkaAdmin() sarama.ClusterAdmin {
client := kafkaClient()

var admin sarama.ClusterAdmin
var err error

for i := 0; i < 10; i++ {
admin, err = sarama.NewClusterAdminFromClient(client)
if err == nil {
break
}

fmt.Println("failed to admin cluster at " + bootstrapServer + " Retrying in 1s")
time.Sleep(time.Second)
}

if err != nil {
panic(err)
}

return admin
}

0 comments on commit e2b2208

Please sign in to comment.