Skip to content

Commit 67b8ce0

Browse files
committed
feat(kq): allow auto topic creation
1 parent 961c620 commit 67b8ce0

File tree

1 file changed

+31
-18
lines changed

1 file changed

+31
-18
lines changed

kq/pusher.go

+31-18
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,19 @@ import (
1111
)
1212

1313
type (
14-
PushOption func(options *chunkOptions)
14+
PushOption func(options *pushOptions)
1515

1616
Pusher struct {
1717
producer *kafka.Writer
1818
topic string
1919
executor *executors.ChunkExecutor
2020
}
2121

22-
chunkOptions struct {
22+
pushOptions struct {
23+
// kafka.Writer options
24+
allowAutoTopicCreation bool
25+
26+
// executors.ChunkExecutor options
2327
chunkSize int
2428
flushInterval time.Duration
2529
}
@@ -33,6 +37,24 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
3337
Balancer: &kafka.LeastBytes{},
3438
Compression: kafka.Snappy,
3539
}
40+
41+
var options pushOptions
42+
for _, opt := range opts {
43+
opt(&options)
44+
}
45+
46+
// apply kafka.Writer options
47+
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation
48+
49+
// apply ChunkExecutor options
50+
var chunkOpts []executors.ChunkOption
51+
if options.chunkSize > 0 {
52+
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
53+
}
54+
if options.flushInterval > 0 {
55+
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
56+
}
57+
3658
pusher := &Pusher{
3759
producer: producer,
3860
topic: topic,
@@ -45,7 +67,7 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
4567
if err := pusher.producer.WriteMessages(context.Background(), chunk...); err != nil {
4668
logx.Error(err)
4769
}
48-
}, newOptions(opts)...)
70+
}, chunkOpts...)
4971

5072
return pusher
5173
}
@@ -79,30 +101,21 @@ func (p *Pusher) Push(v string) error {
79101

80102
// WithChunkSize customizes the Pusher with the given chunk size.
81103
func WithChunkSize(chunkSize int) PushOption {
82-
return func(options *chunkOptions) {
104+
return func(options *pushOptions) {
83105
options.chunkSize = chunkSize
84106
}
85107
}
86108

87109
// WithFlushInterval customizes the Pusher with the given flush interval.
88110
func WithFlushInterval(interval time.Duration) PushOption {
89-
return func(options *chunkOptions) {
111+
return func(options *pushOptions) {
90112
options.flushInterval = interval
91113
}
92114
}
93115

94-
func newOptions(opts []PushOption) []executors.ChunkOption {
95-
var options chunkOptions
96-
for _, opt := range opts {
97-
opt(&options)
98-
}
99-
100-
var chunkOpts []executors.ChunkOption
101-
if options.chunkSize > 0 {
102-
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
103-
}
104-
if options.flushInterval > 0 {
105-
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
116+
// WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
117+
func WithAllowAutoTopicCreation() PushOption {
118+
return func(options *pushOptions) {
119+
options.allowAutoTopicCreation = true
106120
}
107-
return chunkOpts
108121
}

0 commit comments

Comments
 (0)