Skip to content

Commit

Permalink
refactor: updating project layout (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
emreodabas authored Oct 28, 2022
1 parent dba03b9 commit 5893560
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 89 deletions.
14 changes: 7 additions & 7 deletions cronsumer.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// This package implements a topic management strategy which consumes messages with cron based manner.
// Package cronsumer This package implements a topic management strategy which consumes messages with cron based manner.
// It mainly created for exception/retry management.
package kcronsumer
package cronsumer

import (
"github.com/Trendyol/kafka-cronsumer/internal/kafka"
. "github.com/Trendyol/kafka-cronsumer/pkg/kafka" //nolint:revive
"github.com/Trendyol/kafka-cronsumer/internal"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
)

// NewCronsumer returns the newly created kafka consumer instance.
// New returns the newly created kafka consumer instance.
// config.Config specifies cron, duration and so many parameters.
// ConsumeFn describes how to consume messages from specified topic.
func NewCronsumer(cfg *Config, c ConsumeFn) Cronsumer {
return kafka.NewCronsumer(cfg, c)
func New(cfg *kafka.Config, c kafka.ConsumeFn) kafka.Cronsumer {
return internal.NewCronsumer(cfg, c)
}
4 changes: 2 additions & 2 deletions examples/multiple-consumers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ func main() {
fmt.Printf("First consumer > Message received: %s\n", string(message.Value))
return nil
}
firstHandler := kcronsumer.NewCronsumer(firstCfg, firstConsumerFn)
firstHandler := cronsumer.New(firstCfg, firstConsumerFn)
firstHandler.Start()

var secondConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("Second consumer > Message received: %s\n", string(message.Value))
return nil
}
secondHandler := kcronsumer.NewCronsumer(secondCfg, secondConsumerFn)
secondHandler := cronsumer.New(secondCfg, secondConsumerFn)
secondHandler.Start()

select {} // block main goroutine (we did to show it by on purpose)
Expand Down
8 changes: 4 additions & 4 deletions examples/single-consumer-with-custom-logger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"fmt"
kcronsumer "github.com/Trendyol/kafka-cronsumer"
"github.com/Trendyol/kafka-cronsumer"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"os"
"path/filepath"
Expand All @@ -19,9 +19,9 @@ func main() {
return nil
}

cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn)
cronsumer.WithLogger(&myLogger{})
cronsumer.Run()
c := cronsumer.New(kafkaConfig, consumeFn)
c.WithLogger(&myLogger{})
c.Run()
}

func getConfig() *kafka.Config {
Expand Down
4 changes: 2 additions & 2 deletions examples/single-consumer-with-deadletter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func main() {
return errors.New("error occurred")
}

cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn)
cronsumer.Run()
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}

func getConfig() *kafka.Config {
Expand Down
6 changes: 3 additions & 3 deletions examples/single-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package main

import (
"fmt"
cronsumer "github.com/Trendyol/kafka-cronsumer"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"os"
"path/filepath"
"runtime"

"github.com/Trendyol/kafka-cronsumer"
"gopkg.in/yaml.v3"
)

Expand All @@ -19,8 +19,8 @@ func main() {
return nil
}

cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn)
cronsumer.Run()
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}

func getConfig() *kafka.Config {
Expand Down
38 changes: 18 additions & 20 deletions internal/kafka/consumer.go → internal/consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package internal

import (
"context"
Expand All @@ -7,10 +7,8 @@ import (
"strconv"
"time"

"github.com/Trendyol/kafka-cronsumer/internal/sasl"
. "github.com/Trendyol/kafka-cronsumer/pkg/kafka"

"github.com/segmentio/kafka-go"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
segmentio "github.com/segmentio/kafka-go"
)

type Consumer interface {
Expand All @@ -19,15 +17,15 @@ type Consumer interface {
}

type kafkaConsumer struct {
consumer *kafka.Reader
cfg *Config
consumer *segmentio.Reader
cfg *kafka.Config
}

func newConsumer(kafkaConfig *Config) *kafkaConsumer {
func newConsumer(kafkaConfig *kafka.Config) *kafkaConsumer {
setConsumerConfigDefaults(kafkaConfig)
checkConsumerRequiredParams(kafkaConfig)

readerConfig := kafka.ReaderConfig{
readerConfig := segmentio.ReaderConfig{
Brokers: kafkaConfig.Brokers,
GroupID: kafkaConfig.Consumer.GroupID,
GroupTopics: []string{kafkaConfig.Consumer.Topic},
Expand All @@ -43,23 +41,23 @@ func newConsumer(kafkaConfig *Config) *kafkaConsumer {
}

if kafkaConfig.SASL.Enabled {
readerConfig.Dialer = &kafka.Dialer{
TLS: sasl.NewTLSConfig(kafkaConfig.SASL),
SASLMechanism: sasl.Mechanism(kafkaConfig.SASL),
readerConfig.Dialer = &segmentio.Dialer{
TLS: NewTLSConfig(kafkaConfig.SASL),
SASLMechanism: Mechanism(kafkaConfig.SASL),
}

if kafkaConfig.SASL.Rack != "" {
readerConfig.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: kafkaConfig.SASL.Rack}}
readerConfig.GroupBalancers = []segmentio.GroupBalancer{segmentio.RackAffinityGroupBalancer{Rack: kafkaConfig.SASL.Rack}}
}
}

return &kafkaConsumer{
consumer: kafka.NewReader(readerConfig),
consumer: segmentio.NewReader(readerConfig),
cfg: kafkaConfig,
}
}

func checkConsumerRequiredParams(kafkaConfig *Config) {
func checkConsumerRequiredParams(kafkaConfig *kafka.Config) {
if kafkaConfig.Consumer.GroupID == "" {
panic("you have to set consumer group id")
}
Expand All @@ -68,7 +66,7 @@ func checkConsumerRequiredParams(kafkaConfig *Config) {
}
}

func setConsumerConfigDefaults(kafkaConfig *Config) {
func setConsumerConfigDefaults(kafkaConfig *kafka.Config) {
if kafkaConfig.Consumer.MinBytes == 0 {
kafkaConfig.Consumer.MinBytes = 10e3
}
Expand Down Expand Up @@ -98,17 +96,17 @@ func setConsumerConfigDefaults(kafkaConfig *Config) {
func convertStartOffset(offset string) int64 {
switch offset {
case "earliest":
return kafka.FirstOffset
return segmentio.FirstOffset
case "latest":
return kafka.LastOffset
return segmentio.LastOffset
case "":
return kafka.FirstOffset
return segmentio.FirstOffset
default:
offsetValue, err := strconv.ParseInt(offset, 10, 64)
if err == nil {
return offsetValue
}
return kafka.FirstOffset
return segmentio.FirstOffset
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/kafka/cron.go → internal/cron.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package internal

import (
"time"
Expand All @@ -16,11 +16,11 @@ type cronsumer struct {
consumer *kafkaCronsumer
}

func NewCronsumer(cfg *kafka.Config, fn func(message kafka.Message) error) kafka.Cronsumer {
func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer {
cfg.Logger = logger.New(cfg.LogLevel)
return &cronsumer{
cron: gocron.New(),
consumer: NewKafkaCronsumer(cfg, fn),
consumer: newKafkaCronsumer(cfg, fn),
cfg: cfg,
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/kafka/cronsumer.go → internal/cronsumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package internal

import (
"time"
Expand All @@ -22,7 +22,7 @@ type kafkaCronsumer struct {
cfg *kafka.Config
}

func NewKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *kafkaCronsumer { //nolint: revive
func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *kafkaCronsumer {
return &kafkaCronsumer{
cfg: cfg,
paused: false,
Expand Down
20 changes: 10 additions & 10 deletions internal/kafka/message.go → internal/message.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package kafka
package internal

import (
"strconv"
"time"
"unsafe"

. "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"

"github.com/segmentio/kafka-go"
segmentio "github.com/segmentio/kafka-go"
)

type MessageWrapper struct {
Message
kafka.Message
RetryCount int
}

const RetryHeaderKey = "x-retry-count"

func newMessage(msg kafka.Message) MessageWrapper {
func newMessage(msg segmentio.Message) MessageWrapper {
return MessageWrapper{
RetryCount: getRetryCount(&msg),
Message: Message{
Message: kafka.Message{
Topic: msg.Topic,
Partition: msg.Partition,
Offset: msg.Offset,
Expand All @@ -33,11 +33,11 @@ func newMessage(msg kafka.Message) MessageWrapper {
}
}

func (m *MessageWrapper) To(increaseRetry bool) kafka.Message {
func (m *MessageWrapper) To(increaseRetry bool) segmentio.Message {
if increaseRetry {
m.IncreaseRetryCount()
}
return kafka.Message{
return segmentio.Message{
Topic: m.Topic,
Value: m.Value,
Headers: m.Headers,
Expand Down Expand Up @@ -84,7 +84,7 @@ func (m *MessageWrapper) IncreaseRetryCount() {
}
}

func getRetryCount(message *kafka.Message) int {
func getRetryCount(message *segmentio.Message) int {
for i := range message.Headers {
if message.Headers[i].Key != RetryHeaderKey {
continue
Expand All @@ -94,7 +94,7 @@ func getRetryCount(message *kafka.Message) int {
return retryCount
}

message.Headers = append(message.Headers, kafka.Header{
message.Headers = append(message.Headers, segmentio.Header{
Key: RetryHeaderKey,
Value: []byte("0"),
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package internal

import (
"bytes"
Expand Down
29 changes: 14 additions & 15 deletions internal/kafka/producer.go → internal/producer.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,45 @@
package kafka
package internal

import (
"context"
"time"

"github.com/Trendyol/kafka-cronsumer/internal/sasl"
. "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"

"github.com/segmentio/kafka-go"
segmentio "github.com/segmentio/kafka-go"
)

type Producer interface {
Produce(message MessageWrapper, increaseRetry bool) error
}

type kafkaProducer struct {
w *kafka.Writer
cfg *Config
w *segmentio.Writer
cfg *kafka.Config
}

/*
Allow Auto Topic Creation: The default Config configuration specifies that the broker should
Allow Auto Topic Creation: The default kafka.Config configuration specifies that the broker should
automatically create a topic under the following circumstances:
- When a kafkaProducer starts writing messages to the topic
- When a kafkaConsumer starts reading messages from the topic
- When any client requests metadata for the topic
*/
func newProducer(kafkaConfig *Config) Producer {
func newProducer(kafkaConfig *kafka.Config) Producer {
setProducerConfigDefaults(kafkaConfig)

producer := &kafka.Writer{
Addr: kafka.TCP(kafkaConfig.Brokers...),
Balancer: &kafka.LeastBytes{},
producer := &segmentio.Writer{
Addr: segmentio.TCP(kafkaConfig.Brokers...),
Balancer: &segmentio.LeastBytes{},
BatchTimeout: kafkaConfig.Producer.BatchTimeout,
BatchSize: kafkaConfig.Producer.BatchSize,
AllowAutoTopicCreation: true,
}

if kafkaConfig.SASL.Enabled {
producer.Transport = &kafka.Transport{
TLS: sasl.NewTLSConfig(kafkaConfig.SASL),
SASL: sasl.Mechanism(kafkaConfig.SASL),
producer.Transport = &segmentio.Transport{
TLS: NewTLSConfig(kafkaConfig.SASL),
SASL: Mechanism(kafkaConfig.SASL),
}
}

Expand All @@ -50,7 +49,7 @@ func newProducer(kafkaConfig *Config) Producer {
}
}

func setProducerConfigDefaults(kafkaConfig *Config) {
func setProducerConfigDefaults(kafkaConfig *kafka.Config) {
if kafkaConfig.Producer.BatchSize == 0 {
kafkaConfig.Producer.BatchSize = 100
}
Expand Down
2 changes: 1 addition & 1 deletion internal/sasl/secure.go → internal/secure.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sasl
package internal

import (
"crypto/tls"
Expand Down
Loading

0 comments on commit 5893560

Please sign in to comment.