Skip to content

Commit e106680

Browse files
committed
update kafka go version
1 parent ebb61ba commit e106680

File tree

5 files changed

+44
-26
lines changed

5 files changed

+44
-26
lines changed

cmd/kafka-cli/main.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,34 +30,34 @@ func main() {
3030
}
3131

3232
ctx := context.Background()
33-
// publisher, err := kafka.NewKafkaGoPublisher(args.Topic, args.Brokers...)
34-
// if err != nil {
35-
// fmt.Println("error : ", err)
36-
37-
// os.Exit(1)
38-
// }
39-
40-
// subsriber, err := kafka.NewKafkaGoSubscriber(args.Topic, args.Brokers...)
41-
// if err != nil {
42-
// fmt.Println("error : ", err)
43-
44-
// os.Exit(1)
45-
// }
46-
47-
publisher, err := kafka.NewSaramaPublisher(args.Brokers...)
33+
publisher, err := kafka.NewKafkaGoPublisher(args.Topic, args.Brokers...)
4834
if err != nil {
4935
fmt.Println("error : ", err)
5036

5137
os.Exit(1)
5238
}
5339

54-
subsriber, err := kafka.NewSaramaSubscriber(args.Brokers...)
40+
subsriber, err := kafka.NewKafkaGoSubscriber(args.Topic, args.Brokers...)
5541
if err != nil {
5642
fmt.Println("error : ", err)
5743

5844
os.Exit(1)
5945
}
6046

47+
// publisher, err := kafka.NewSaramaPublisher(args.Brokers...)
48+
// if err != nil {
49+
// fmt.Println("error : ", err)
50+
51+
// os.Exit(1)
52+
// }
53+
54+
// subsriber, err := kafka.NewSaramaSubscriber(args.Brokers...)
55+
// if err != nil {
56+
// fmt.Println("error : ", err)
57+
58+
// os.Exit(1)
59+
// }
60+
6161
runner := kafka.Runner{
6262
Publisher: publisher,
6363
Subscriber: subsriber,

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
module github.com/musobarlab/kafka-cli
22

3+
go 1.15
4+
35
require (
6+
github.com/DataDog/zstd v1.4.0 // indirect
47
github.com/Shopify/sarama v1.22.0
58
github.com/pkg/profile v1.3.0 // indirect
6-
github.com/segmentio/kafka-go v0.3.4
9+
github.com/segmentio/kafka-go v0.4.16
710
github.com/stretchr/objx v0.2.0 // indirect
811
golang.org/x/net v0.0.0-20190424112056-4829fb13d2c6 // indirect
912
golang.org/x/sys v0.0.0-20190428183149-804c0c7841b5 // indirect

go.sum

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,34 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
1414
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
1515
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
1616
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
17+
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
1718
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
1819
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
20+
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
21+
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
22+
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
23+
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
24+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
25+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
1926
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
2027
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
2128
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
2229
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
30+
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
31+
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
2332
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
2433
github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
2534
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2635
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
2736
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
2837
github.com/segmentio/kafka-go v0.3.4 h1:Mv9AcnCgU14/cU6Vd0wuRdG1FBO0HzXQLnjBduDLy70=
2938
github.com/segmentio/kafka-go v0.3.4/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4=
39+
github.com/segmentio/kafka-go v0.4.16 h1:9dt78ehM9qzAkekA60D6A96RlqDzC3hnYYa8y5Szd+U=
40+
github.com/segmentio/kafka-go v0.4.16/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
3041
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
3142
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
3243
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
44+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
3345
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
3446
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
3547
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -50,3 +62,6 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
5062
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
5163
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
5264
golang.org/x/tools v0.0.0-20190428024724-550556f78a90/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
65+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
66+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
67+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

kafka_go_publisher.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ type KafkaGoPublisherImpl struct {
1717
func NewKafkaGoPublisher(topic string, addresses ...string) (*KafkaGoPublisherImpl, error) {
1818
config := ka.WriterConfig{
1919
Brokers: addresses,
20-
Topic: topic,
2120
Balancer: &ka.LeastBytes{},
2221
CompressionCodec: snappy.NewCompressionCodec(),
2322
BatchTimeout: 5 * time.Millisecond,

kafka_go_subscriber.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os"
88
"os/signal"
99
"syscall"
10+
"time"
1011

1112
"github.com/segmentio/kafka-go"
1213
ka "github.com/segmentio/kafka-go"
@@ -21,12 +22,12 @@ type KafkaGoSubscriberImpl struct {
2122
//NewKafkaGoSubscriber constructor of KafkaGoSubscriberImpl
2223
func NewKafkaGoSubscriber(topic string, addresses ...string) (*KafkaGoSubscriberImpl, error) {
2324
config := kafka.ReaderConfig{
24-
Brokers: addresses,
25-
Topic: topic,
26-
GroupID: "kafka-cli-group",
27-
MinBytes: 10e3, // 10KB
28-
MaxBytes: 10e6, // 10MB
29-
// CommitInterval: time.Second, // flushes commits to Kafka every second
25+
Brokers: addresses,
26+
Topic: topic,
27+
GroupID: "kafka-cli-group",
28+
MinBytes: 10e3, // 10KB
29+
MaxBytes: 10e6, // 10MB
30+
CommitInterval: time.Second, // flushes commits to Kafka every second
3031
}
3132

3233
reader := ka.NewReader(config)
@@ -45,7 +46,7 @@ func (s *KafkaGoSubscriberImpl) Subscribe(ctx context.Context, topics ...string)
4546

4647
go func() {
4748
for {
48-
message, err := s.reader.ReadMessage(ctx)
49+
message, err := s.reader.FetchMessage(ctx)
4950
if err != nil {
5051
if err == io.EOF {
5152
fmt.Println("read message end ")
@@ -57,7 +58,7 @@ func (s *KafkaGoSubscriberImpl) Subscribe(ctx context.Context, topics ...string)
5758

5859
fmt.Println()
5960
fmt.Printf("\nMessage = %s\nTopic = %s", string(message.Value), message.Topic)
60-
//s.reader.CommitMessages(ctx, message)
61+
s.reader.CommitMessages(ctx, message)
6162
}
6263
}()
6364

0 commit comments

Comments
 (0)