Skip to content

Commit ec0517f

Browse files
authored
Merge pull request #2 from zeromicro/master
merge
2 parents 7647654 + 4a99c09 commit ec0517f

File tree

3 files changed

+66
-26
lines changed

3 files changed

+66
-26
lines changed

kq/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type KqConf struct {
1212
Brokers []string
1313
Group string
1414
Topic string
15+
CaFile string `json:",optional"`
1516
Offset string `json:",options=first|last,default=last"`
1617
Conns int `json:",default=1"`
1718
Consumers int `json:",default=8"`

kq/pusher.go

+45-26
Original file line numberDiff line numberDiff line change
@@ -11,92 +11,111 @@ import (
1111
)
1212

1313
type (
14-
PushOption func(options *chunkOptions)
14+
PushOption func(options *pushOptions)
1515

1616
Pusher struct {
17-
produer *kafka.Writer
17+
producer *kafka.Writer
1818
topic string
1919
executor *executors.ChunkExecutor
2020
}
2121

22-
chunkOptions struct {
22+
pushOptions struct {
23+
// kafka.Writer options
24+
allowAutoTopicCreation bool
25+
26+
// executors.ChunkExecutor options
2327
chunkSize int
2428
flushInterval time.Duration
2529
}
2630
)
2731

32+
// NewPusher returns a Pusher with the given Kafka addresses and topic.
2833
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
2934
producer := &kafka.Writer{
3035
Addr: kafka.TCP(addrs...),
3136
Topic: topic,
3237
Balancer: &kafka.LeastBytes{},
3338
Compression: kafka.Snappy,
3439
}
40+
41+
var options pushOptions
42+
for _, opt := range opts {
43+
opt(&options)
44+
}
45+
46+
// apply kafka.Writer options
47+
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation
48+
49+
// apply ChunkExecutor options
50+
var chunkOpts []executors.ChunkOption
51+
if options.chunkSize > 0 {
52+
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
53+
}
54+
if options.flushInterval > 0 {
55+
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
56+
}
57+
3558
pusher := &Pusher{
36-
produer: producer,
37-
topic: topic,
59+
producer: producer,
60+
topic: topic,
3861
}
3962
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
4063
chunk := make([]kafka.Message, len(tasks))
4164
for i := range tasks {
4265
chunk[i] = tasks[i].(kafka.Message)
4366
}
44-
if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
67+
if err := pusher.producer.WriteMessages(context.Background(), chunk...); err != nil {
4568
logx.Error(err)
4669
}
47-
}, newOptions(opts)...)
70+
}, chunkOpts...)
4871

4972
return pusher
5073
}
5174

75+
// Close closes the Pusher and releases any resources used by it.
5276
func (p *Pusher) Close() error {
5377
if p.executor != nil {
5478
p.executor.Flush()
5579
}
56-
57-
return p.produer.Close()
80+
81+
return p.producer.Close()
5882
}
5983

84+
// Name returns the name of the Kafka topic that the Pusher is sending messages to.
6085
func (p *Pusher) Name() string {
6186
return p.topic
6287
}
6388

89+
// Push sends a message to the Kafka topic.
6490
func (p *Pusher) Push(v string) error {
6591
msg := kafka.Message{
66-
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
92+
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), // current timestamp
6793
Value: []byte(v),
6894
}
6995
if p.executor != nil {
7096
return p.executor.Add(msg, len(v))
7197
} else {
72-
return p.produer.WriteMessages(context.Background(), msg)
98+
return p.producer.WriteMessages(context.Background(), msg)
7399
}
74100
}
75101

102+
// WithChunkSize customizes the Pusher with the given chunk size.
76103
func WithChunkSize(chunkSize int) PushOption {
77-
return func(options *chunkOptions) {
104+
return func(options *pushOptions) {
78105
options.chunkSize = chunkSize
79106
}
80107
}
81108

109+
// WithFlushInterval customizes the Pusher with the given flush interval.
82110
func WithFlushInterval(interval time.Duration) PushOption {
83-
return func(options *chunkOptions) {
111+
return func(options *pushOptions) {
84112
options.flushInterval = interval
85113
}
86114
}
87115

88-
func newOptions(opts []PushOption) []executors.ChunkOption {
89-
var options chunkOptions
90-
for _, opt := range opts {
91-
opt(&options)
92-
}
93-
94-
var chunkOpts []executors.ChunkOption
95-
if options.chunkSize > 0 {
96-
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
97-
}
98-
if options.flushInterval > 0 {
99-
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
116+
// WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
117+
func WithAllowAutoTopicCreation() PushOption {
118+
return func(options *pushOptions) {
119+
options.allowAutoTopicCreation = true
100120
}
101-
return chunkOpts
102121
}

kq/queue.go

+20
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package kq
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
57
"io"
68
"log"
9+
"os"
710
"time"
811

912
"github.com/segmentio/kafka-go"
@@ -121,6 +124,23 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
121124
},
122125
}
123126
}
127+
if len(c.CaFile) > 0 {
128+
caCert, err := os.ReadFile(c.CaFile)
129+
if err != nil {
130+
log.Fatal(err)
131+
}
132+
133+
caCertPool := x509.NewCertPool()
134+
ok := caCertPool.AppendCertsFromPEM(caCert)
135+
if !ok {
136+
log.Fatal(err)
137+
}
138+
139+
readerConfig.Dialer.TLS = &tls.Config{
140+
RootCAs: caCertPool,
141+
InsecureSkipVerify: true,
142+
}
143+
}
124144
consumer := kafka.NewReader(readerConfig)
125145

126146
return &kafkaQueue{

0 commit comments

Comments
 (0)