Skip to content

Commit

Permalink
Merge pull request #55 from testerxiaodong/master
Browse files Browse the repository at this point in the history
feat(kq): support custom producer balancer and message key
  • Loading branch information
kevwan authored Jul 24, 2024
2 parents f2ee5c0 + ce1549c commit 319b750
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type (
pushOptions struct {
// kafka.Writer options
allowAutoTopicCreation bool
balancer kafka.Balancer

// executors.ChunkExecutor options
chunkSize int
Expand All @@ -50,6 +51,9 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {

// apply kafka.Writer options
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation
if options.balancer != nil {
producer.Balancer = options.balancer
}

pusher := &Pusher{
producer: producer,
Expand Down Expand Up @@ -134,6 +138,26 @@ func (p *Pusher) PushWithKey(ctx context.Context, key, v string) error {
}
}

// SetWriterBalancer set kafka-go custom writer balancer.
func (p *Pusher) SetWriterBalancer(balancer kafka.Balancer) {
if p.producer != nil {
p.producer.Balancer = balancer
}
}

// PushWithKey sends a message to the Kafka topic with custom message key.
func (p *Pusher) PushWithKey(k, v string) error {

Check failure on line 149 in kq/pusher.go

View workflow job for this annotation

GitHub Actions / Build

method Pusher.PushWithKey already declared at kq/pusher.go:123:18
msg := kafka.Message{
Key: []byte(k), // custom message key
Value: []byte(v),
}
if p.executor != nil {
return p.executor.Add(msg, len(v))
} else {
return p.producer.WriteMessages(context.Background(), msg)
}
}

// WithChunkSize customizes the Pusher with the given chunk size.
func WithChunkSize(chunkSize int) PushOption {
return func(options *pushOptions) {
Expand All @@ -155,6 +179,13 @@ func WithAllowAutoTopicCreation() PushOption {
}
}

// WithBalancer customizes the Pusher with the given balancer.
func WithBalancer(balancer kafka.Balancer) PushOption {
return func(options *pushOptions) {
options.balancer = balancer
}
}

// WithSyncPush enables the Pusher to push messages synchronously.
func WithSyncPush() PushOption {
return func(options *pushOptions) {
Expand Down

0 comments on commit 319b750

Please sign in to comment.