Skip to content

Commit 9f20c2c

Browse files
committed
新加NewPusherTLS
1 parent a4ef8e5 commit 9f20c2c

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

kq/pusher.go

+56
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,62 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
101101
return pusher
102102
}
103103

104+
// NewPusherTLS returns a Pusher with the given Kafka addresses and topic.
105+
func NewPusherTLS(addrs []string, topic string, tls *tls.Config, opts ...PushOption) *Pusher {
106+
producer := &kafka.Writer{
107+
Addr: kafka.TCP(addrs...),
108+
Topic: topic,
109+
Balancer: &kafka.LeastBytes{},
110+
Compression: kafka.Snappy,
111+
Transport: &kafka.Transport{
112+
TLS: tls,
113+
},
114+
}
115+
116+
var options pushOptions
117+
for _, opt := range opts {
118+
opt(&options)
119+
}
120+
121+
// apply kafka.Writer options
122+
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation
123+
if options.balancer != nil {
124+
producer.Balancer = options.balancer
125+
}
126+
127+
pusher := &Pusher{
128+
producer: producer,
129+
topic: topic,
130+
}
131+
132+
// if syncPush is true, return the pusher directly
133+
if options.syncPush {
134+
producer.BatchSize = 1
135+
return pusher
136+
}
137+
138+
// apply ChunkExecutor options
139+
var chunkOpts []executors.ChunkOption
140+
if options.chunkSize > 0 {
141+
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
142+
}
143+
if options.flushInterval > 0 {
144+
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
145+
}
146+
147+
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
148+
chunk := make([]kafka.Message, len(tasks))
149+
for i := range tasks {
150+
chunk[i] = tasks[i].(kafka.Message)
151+
}
152+
if err := pusher.producer.WriteMessages(context.Background(), chunk...); err != nil {
153+
logx.Error(err)
154+
}
155+
}, chunkOpts...)
156+
157+
return pusher
158+
}
159+
104160
// Close closes the Pusher and releases any resources used by it.
105161
func (p *Pusher) Close() error {
106162
if p.executor != nil {

0 commit comments

Comments
 (0)