Skip to content

Commit

Permalink
feat: producer broker imp and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed May 30, 2024
1 parent 3622137 commit a9a58e6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
7 changes: 1 addition & 6 deletions internal/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package internal

import (
"context"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
segmentio "github.com/segmentio/kafka-go"
)
Expand All @@ -24,12 +23,8 @@ func newProducer(kafkaConfig *kafka.Config) Producer {
kafkaConfig.Producer.Balancer = &segmentio.LeastBytes{}
}

if len(kafkaConfig.Producer.Brokers) == 0 {
kafkaConfig.Producer.Brokers = kafkaConfig.Brokers
}

producer := &segmentio.Writer{
Addr: segmentio.TCP(kafkaConfig.Producer.Brokers...),
Addr: kafkaConfig.GetBrokerAddr(),
Balancer: kafkaConfig.Producer.Balancer,
BatchTimeout: kafkaConfig.Producer.BatchTimeout,
BatchSize: kafkaConfig.Producer.BatchSize,
Expand Down
9 changes: 9 additions & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"net"
"strconv"
"time"

Expand Down Expand Up @@ -36,6 +37,14 @@ type Config struct {
MetricPrefix string `yaml:"metricPrefix"`
}

func (c *Config) GetBrokerAddr() net.Addr {
if len(c.Producer.Brokers) == 0 {
c.Producer.Brokers = c.Brokers
}

return segmentio.TCP(c.Producer.Brokers...)
}

type SASLConfig struct {
Enabled bool `yaml:"enabled"`
AuthType string `yaml:"authType"` // plain or scram
Expand Down
35 changes: 35 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,38 @@ func TestToStringOffset(t *testing.T) {
})
}
}

func TestConfig_GetBrokerAddr(t *testing.T) {
t.Run("Should_Return_Default_Broker_Addr_When_Producer_Broker_Not_Given", func(t *testing.T) {
// Given
kafkaConfig := &Config{
Brokers: []string{"127.0.0.1:9092"},
Producer: ProducerConfig{},
}

// When
result := kafkaConfig.GetBrokerAddr()

// Then
if result.String() != "127.0.0.1:9092" {
t.Errorf("Expected: 127.0.0.1:9092, Actual: %+v", result.String())
}
})
t.Run("Should_Return_Producer_Broker_Addr_When_Its_Given", func(t *testing.T) {
// Given
kafkaConfig := &Config{
Brokers: []string{"127.0.0.1:9092"},
Producer: ProducerConfig{
Brokers: []string{"127.0.0.2:9092"},
},
}

// When
result := kafkaConfig.GetBrokerAddr()

// Then
if result.String() != "127.0.0.2:9092" {
t.Errorf("Expected: 127.0.0.2:9092, Actual: %+v", result.String())
}
})
}

0 comments on commit a9a58e6

Please sign in to comment.