Skip to content

Commit 4c97f3f

Browse files
committed
kafka-go publisher and subsriber
1 parent 302b4f8 commit 4c97f3f

File tree

4 files changed

+137
-3
lines changed

4 files changed

+137
-3
lines changed

cmd/kafka-cli/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ func main() {
3030
}
3131

3232
ctx := context.Background()
33-
publisher, err := kafka.NewPublisher(args.Brokers...)
33+
publisher, err := kafka.NewKafkaGoPublisher(args.Topic, args.Brokers...)
3434
if err != nil {
3535
fmt.Println("error : ", err)
3636

3737
os.Exit(1)
3838
}
3939

40-
subsriber, err := kafka.NewSubscriber(args.Brokers...)
40+
subsriber, err := kafka.NewKafkaGoSubscriber(args.Topic, args.Brokers...)
4141
if err != nil {
4242
fmt.Println("error : ", err)
4343

kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (r *Runner) Run(ctx context.Context) error {
2222

2323
switch command {
2424
case PublishCommand:
25-
return r.Publisher.Publish(r.Argument.Topic, r.Argument.Message)
25+
return r.Publisher.Publish(ctx, r.Argument.Topic, r.Argument.Message)
2626
case SubscribeCommand:
2727
return r.Subscriber.Subscribe(ctx, r.Argument.Topic)
2828
}

kafka_go_publisher.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
ka "github.com/segmentio/kafka-go"
8+
"github.com/segmentio/kafka-go/snappy"
9+
)
10+
11+
//KafkaGoPublisherImpl struct
12+
type KafkaGoPublisherImpl struct {
13+
writer *ka.Writer
14+
}
15+
16+
//NewKafkaGoPublisherImpl constructor of KafkaGoPublisherImpl
17+
func NewKafkaGoPublisher(topic string, addresses ...string) (*KafkaGoPublisherImpl, error) {
18+
config := ka.WriterConfig{
19+
Brokers: addresses,
20+
Topic: topic,
21+
Balancer: &ka.LeastBytes{},
22+
CompressionCodec: snappy.NewCompressionCodec(),
23+
BatchTimeout: 5 * time.Millisecond,
24+
BatchBytes: 1000000,
25+
}
26+
27+
writer := ka.NewWriter(config)
28+
return &KafkaGoPublisherImpl{writer}, nil
29+
}
30+
31+
//Publish function
32+
func (publisher *KafkaGoPublisherImpl) Publish(ctx context.Context, topic string, message []byte) error {
33+
defer func() { publisher.writer.Close() }()
34+
35+
msg := ka.Message{
36+
Topic: topic,
37+
Value: message,
38+
}
39+
40+
err := publisher.writer.WriteMessages(ctx, msg)
41+
if err != nil {
42+
return err
43+
}
44+
45+
return nil
46+
}

kafka_go_subscriber.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"os"
8+
"os/signal"
9+
"syscall"
10+
11+
"github.com/segmentio/kafka-go"
12+
ka "github.com/segmentio/kafka-go"
13+
_ "github.com/segmentio/kafka-go/snappy"
14+
)
15+
16+
//KafkaGoSubscriberImpl struct
17+
type KafkaGoSubscriberImpl struct {
18+
reader *ka.Reader
19+
}
20+
21+
//NewSubscriber constructor of KafkaGoSubscriberImpl
22+
func NewKafkaGoSubscriber(topic string, addresses ...string) (*KafkaGoSubscriberImpl, error) {
23+
config := kafka.ReaderConfig{
24+
Brokers: addresses,
25+
Topic: topic,
26+
GroupID: "kafka-cli-group",
27+
MinBytes: 10e3, // 10KB
28+
MaxBytes: 10e6, // 10MB
29+
// CommitInterval: time.Second, // flushes commits to Kafka every second
30+
}
31+
32+
reader := ka.NewReader(config)
33+
return &KafkaGoSubscriberImpl{reader}, nil
34+
}
35+
36+
//Subscribe function
37+
func (s *KafkaGoSubscriberImpl) Subscribe(ctx context.Context, topics ...string) error {
38+
signals := make(chan os.Signal, 1)
39+
done := make(chan bool, 1)
40+
41+
// wait until get signal from OS
42+
signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
43+
44+
go waitOSNotify(signals, done)
45+
46+
go func() {
47+
for {
48+
message, err := s.reader.ReadMessage(ctx)
49+
if err != nil {
50+
if err == io.EOF {
51+
fmt.Println("read message end ")
52+
break
53+
} else {
54+
fmt.Println("read message error ", err)
55+
}
56+
}
57+
58+
fmt.Println()
59+
fmt.Printf("\nMessage = %s\nTopic = %s", string(message.Value), message.Topic)
60+
//s.reader.CommitMessages(ctx, message)
61+
}
62+
}()
63+
64+
<-done
65+
66+
// close subscriber after exit
67+
err := s.close()
68+
if err != nil {
69+
return err
70+
}
71+
72+
return nil
73+
}
74+
75+
func waitOSNotify(kill chan os.Signal, done chan bool) {
76+
select {
77+
case <-kill:
78+
fmt.Println("process interrupted")
79+
done <- true
80+
return
81+
}
82+
83+
}
84+
85+
func (s *KafkaGoSubscriberImpl) close() error {
86+
fmt.Println("closing subscriber")
87+
return s.reader.Close()
88+
}

0 commit comments

Comments
 (0)