25
25
}
26
26
)
27
27
28
+ // NewPusher returns a Pusher with the given Kafka addresses and topic.
28
29
func NewPusher (addrs []string , topic string , opts ... PushOption ) * Pusher {
29
30
producer := & kafka.Writer {
30
31
Addr : kafka .TCP (addrs ... ),
@@ -49,6 +50,7 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
49
50
return pusher
50
51
}
51
52
53
+ // Close closes the Pusher and releases any resources used by it.
52
54
func (p * Pusher ) Close () error {
53
55
if p .executor != nil {
54
56
p .executor .Flush ()
@@ -57,13 +59,15 @@ func (p *Pusher) Close() error {
57
59
return p .producer .Close ()
58
60
}
59
61
62
+ // Name returns the name of the Kafka topic that the Pusher is sending messages to.
60
63
func (p * Pusher ) Name () string {
61
64
return p .topic
62
65
}
63
66
67
+ // Push sends a message to the Kafka topic.
64
68
func (p * Pusher ) Push (v string ) error {
65
69
msg := kafka.Message {
66
- Key : []byte (strconv .FormatInt (time .Now ().UnixNano (), 10 )),
70
+ Key : []byte (strconv .FormatInt (time .Now ().UnixNano (), 10 )), // current timestamp
67
71
Value : []byte (v ),
68
72
}
69
73
if p .executor != nil {
@@ -73,12 +77,14 @@ func (p *Pusher) Push(v string) error {
73
77
}
74
78
}
75
79
80
+ // WithChunkSize customizes the Pusher with the given chunk size.
76
81
func WithChunkSize (chunkSize int ) PushOption {
77
82
return func (options * chunkOptions ) {
78
83
options .chunkSize = chunkSize
79
84
}
80
85
}
81
86
87
+ // WithFlushInterval customizes the Pusher with the given flush interval.
82
88
func WithFlushInterval (interval time.Duration ) PushOption {
83
89
return func (options * chunkOptions ) {
84
90
options .flushInterval = interval
0 commit comments