Skip to content

Commit

Permalink
merge pr #12 from shortbin/fix/#11
Browse files Browse the repository at this point in the history
fix [#11]:[kafka] remove topic selection at kafka writer initialization
  • Loading branch information
m4tu4g authored Oct 12, 2024
2 parents f5c319d + d68da00 commit 16517f3
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 10 deletions.
1 change: 0 additions & 1 deletion cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func main() {

kp := kafka.NewKafkaProducer(kafka.Config{
Broker: cfg.Kafka.Broker,
Topic: cfg.Kafka.Topic,
})

cache := redis.New(redis.Config{
Expand Down
7 changes: 6 additions & 1 deletion internal/retrieve/http/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ func produce(h *RetrieveHandler, c *gin.Context, shortID string, shortCreatedBy
"request_host": c.Request.Host,
}

err := h.kafkaProducer.Produce(c, shortID, value)
var err error
if shortCreatedBy == "-1" {
err = h.kafkaProducer.Produce(c, config.GetConfig().Kafka.PublicClicksTopic, shortID, value)
} else {
err = h.kafkaProducer.Produce(c, config.GetConfig().Kafka.ClicksTopic, shortID, value)
}
traceContextFields := apmzap.TraceContext(c.Request.Context())
if err != nil {
logger.Infof("failed to produce message to Kafka: %v", err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type ShortIDLimit struct {
}

type Kafka struct {
Broker string `mapstructure:"broker"`
Topic string `mapstructure:"topic"`
Broker string `mapstructure:"broker"`
ClicksTopic string `mapstructure:"clicks_topic"`
PublicClicksTopic string `mapstructure:"public_clicks_topic"`
}

type Redis struct {
Expand Down
11 changes: 5 additions & 6 deletions pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type IKafkaProducer interface {
Produce(ctx context.Context, key string, value map[string]string) error
Produce(ctx context.Context, topic string, key string, value map[string]string) error
}

type Producer struct {
Expand All @@ -17,13 +17,11 @@ type Producer struct {

type Config struct {
Broker string `mapstructure:"broker"`
Topic string `mapstructure:"topic"`
}

func NewKafkaProducer(cfg Config) IKafkaProducer {
w := &kafka.Writer{
Addr: kafka.TCP(cfg.Broker),
Topic: cfg.Topic,
Balancer: &kafka.LeastBytes{},
}

Expand All @@ -36,21 +34,22 @@ type Publisher interface {
Produce(ctx context.Context, payload interface{}) error
}

func (kp *Producer) Produce(ctx context.Context, key string, value map[string]string) error {
message, err := kp.encodeMessage(key, value)
func (kp *Producer) Produce(ctx context.Context, topic string, key string, value map[string]string) error {
message, err := kp.encodeMessage(topic, key, value)
if err != nil {
return err
}
return kp.writer.WriteMessages(ctx, message)
}

func (kp *Producer) encodeMessage(key string, value map[string]string) (kafka.Message, error) {
func (kp *Producer) encodeMessage(topic string, key string, value map[string]string) (kafka.Message, error) {
v, err := json.Marshal(value)
if err != nil {
return kafka.Message{}, err
}

return kafka.Message{
Topic: topic,
Key: []byte(key),
Value: v,
}, nil
Expand Down

0 comments on commit 16517f3

Please sign in to comment.