From 5893560207ead34cd8c806f29d58a148ab23f42d Mon Sep 17 00:00:00 2001 From: Emre Odabas <44439939+emreodabas@users.noreply.github.com> Date: Fri, 28 Oct 2022 10:20:01 +0300 Subject: [PATCH] refactor: updating project layout (#17) --- cronsumer.go | 14 +++---- examples/multiple-consumers/main.go | 4 +- .../main.go | 8 ++-- .../single-consumer-with-deadletter/main.go | 4 +- examples/single-consumer/main.go | 6 +-- internal/{kafka => }/consumer.go | 38 +++++++++---------- internal/{kafka => }/cron.go | 6 +-- internal/{kafka => }/cronsumer.go | 4 +- internal/{kafka => }/message.go | 20 +++++----- internal/{kafka => }/message_test.go | 2 +- internal/{kafka => }/producer.go | 29 +++++++------- internal/{sasl => }/secure.go | 2 +- pkg/kafka/cronsumer.go | 14 +++++++ pkg/kafka/message.go | 18 --------- pkg/logger/zap.go | 2 +- 15 files changed, 82 insertions(+), 89 deletions(-) rename internal/{kafka => }/consumer.go (77%) rename internal/{kafka => }/cron.go (92%) rename internal/{kafka => }/cronsumer.go (95%) rename internal/{kafka => }/message.go (80%) rename internal/{kafka => }/message_test.go (98%) rename internal/{kafka => }/producer.go (62%) rename internal/{sasl => }/secure.go (98%) delete mode 100644 pkg/kafka/message.go diff --git a/cronsumer.go b/cronsumer.go index 4f66648..a236643 100644 --- a/cronsumer.go +++ b/cronsumer.go @@ -1,15 +1,15 @@ -// This package implements a topic management strategy which consumes messages with cron based manner. +// Package cronsumer This package implements a topic management strategy which consumes messages with cron based manner. // It mainly created for exception/retry management. -package kcronsumer +package cronsumer import ( - "github.com/Trendyol/kafka-cronsumer/internal/kafka" - . "github.com/Trendyol/kafka-cronsumer/pkg/kafka" //nolint:revive + "github.com/Trendyol/kafka-cronsumer/internal" + "github.com/Trendyol/kafka-cronsumer/pkg/kafka" ) -// NewCronsumer returns the newly created kafka consumer instance. +// New returns the newly created kafka consumer instance. // config.Config specifies cron, duration and so many parameters. // ConsumeFn describes how to consume messages from specified topic. -func NewCronsumer(cfg *Config, c ConsumeFn) Cronsumer { - return kafka.NewCronsumer(cfg, c) +func New(cfg *kafka.Config, c kafka.ConsumeFn) kafka.Cronsumer { + return internal.NewCronsumer(cfg, c) } diff --git a/examples/multiple-consumers/main.go b/examples/multiple-consumers/main.go index 61b5239..2ec3b6c 100644 --- a/examples/multiple-consumers/main.go +++ b/examples/multiple-consumers/main.go @@ -19,14 +19,14 @@ func main() { fmt.Printf("First consumer > Message received: %s\n", string(message.Value)) return nil } - firstHandler := kcronsumer.NewCronsumer(firstCfg, firstConsumerFn) + firstHandler := cronsumer.New(firstCfg, firstConsumerFn) firstHandler.Start() var secondConsumerFn kafka.ConsumeFn = func(message kafka.Message) error { fmt.Printf("Second consumer > Message received: %s\n", string(message.Value)) return nil } - secondHandler := kcronsumer.NewCronsumer(secondCfg, secondConsumerFn) + secondHandler := cronsumer.New(secondCfg, secondConsumerFn) secondHandler.Start() select {} // block main goroutine (we did to show it by on purpose) diff --git a/examples/single-consumer-with-custom-logger/main.go b/examples/single-consumer-with-custom-logger/main.go index 6c8df1a..24514c4 100644 --- a/examples/single-consumer-with-custom-logger/main.go +++ b/examples/single-consumer-with-custom-logger/main.go @@ -2,7 +2,7 @@ package main import ( "fmt" - kcronsumer "github.com/Trendyol/kafka-cronsumer" + "github.com/Trendyol/kafka-cronsumer" "github.com/Trendyol/kafka-cronsumer/pkg/kafka" "os" "path/filepath" @@ -19,9 +19,9 @@ func main() { return nil } - cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn) - cronsumer.WithLogger(&myLogger{}) - cronsumer.Run() + c := cronsumer.New(kafkaConfig, consumeFn) + c.WithLogger(&myLogger{}) + c.Run() } func getConfig() *kafka.Config { diff --git a/examples/single-consumer-with-deadletter/main.go b/examples/single-consumer-with-deadletter/main.go index 25f4085..89b2246 100644 --- a/examples/single-consumer-with-deadletter/main.go +++ b/examples/single-consumer-with-deadletter/main.go @@ -19,8 +19,8 @@ func main() { return errors.New("error occurred") } - cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn) - cronsumer.Run() + c := cronsumer.New(kafkaConfig, consumeFn) + c.Run() } func getConfig() *kafka.Config { diff --git a/examples/single-consumer/main.go b/examples/single-consumer/main.go index 7cc76ed..8c11e69 100644 --- a/examples/single-consumer/main.go +++ b/examples/single-consumer/main.go @@ -2,12 +2,12 @@ package main import ( "fmt" + cronsumer "github.com/Trendyol/kafka-cronsumer" "github.com/Trendyol/kafka-cronsumer/pkg/kafka" "os" "path/filepath" "runtime" - "github.com/Trendyol/kafka-cronsumer" "gopkg.in/yaml.v3" ) @@ -19,8 +19,8 @@ func main() { return nil } - cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn) - cronsumer.Run() + c := cronsumer.New(kafkaConfig, consumeFn) + c.Run() } func getConfig() *kafka.Config { diff --git a/internal/kafka/consumer.go b/internal/consumer.go similarity index 77% rename from internal/kafka/consumer.go rename to internal/consumer.go index aee2fb4..6093cf2 100644 --- a/internal/kafka/consumer.go +++ b/internal/consumer.go @@ -1,4 +1,4 @@ -package kafka +package internal import ( "context" @@ -7,10 +7,8 @@ import ( "strconv" "time" - "github.com/Trendyol/kafka-cronsumer/internal/sasl" - . "github.com/Trendyol/kafka-cronsumer/pkg/kafka" - - "github.com/segmentio/kafka-go" + "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + segmentio "github.com/segmentio/kafka-go" ) type Consumer interface { @@ -19,15 +17,15 @@ type Consumer interface { } type kafkaConsumer struct { - consumer *kafka.Reader - cfg *Config + consumer *segmentio.Reader + cfg *kafka.Config } -func newConsumer(kafkaConfig *Config) *kafkaConsumer { +func newConsumer(kafkaConfig *kafka.Config) *kafkaConsumer { setConsumerConfigDefaults(kafkaConfig) checkConsumerRequiredParams(kafkaConfig) - readerConfig := kafka.ReaderConfig{ + readerConfig := segmentio.ReaderConfig{ Brokers: kafkaConfig.Brokers, GroupID: kafkaConfig.Consumer.GroupID, GroupTopics: []string{kafkaConfig.Consumer.Topic}, @@ -43,23 +41,23 @@ func newConsumer(kafkaConfig *Config) *kafkaConsumer { } if kafkaConfig.SASL.Enabled { - readerConfig.Dialer = &kafka.Dialer{ - TLS: sasl.NewTLSConfig(kafkaConfig.SASL), - SASLMechanism: sasl.Mechanism(kafkaConfig.SASL), + readerConfig.Dialer = &segmentio.Dialer{ + TLS: NewTLSConfig(kafkaConfig.SASL), + SASLMechanism: Mechanism(kafkaConfig.SASL), } if kafkaConfig.SASL.Rack != "" { - readerConfig.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: kafkaConfig.SASL.Rack}} + readerConfig.GroupBalancers = []segmentio.GroupBalancer{segmentio.RackAffinityGroupBalancer{Rack: kafkaConfig.SASL.Rack}} } } return &kafkaConsumer{ - consumer: kafka.NewReader(readerConfig), + consumer: segmentio.NewReader(readerConfig), cfg: kafkaConfig, } } -func checkConsumerRequiredParams(kafkaConfig *Config) { +func checkConsumerRequiredParams(kafkaConfig *kafka.Config) { if kafkaConfig.Consumer.GroupID == "" { panic("you have to set consumer group id") } @@ -68,7 +66,7 @@ func checkConsumerRequiredParams(kafkaConfig *Config) { } } -func setConsumerConfigDefaults(kafkaConfig *Config) { +func setConsumerConfigDefaults(kafkaConfig *kafka.Config) { if kafkaConfig.Consumer.MinBytes == 0 { kafkaConfig.Consumer.MinBytes = 10e3 } @@ -98,17 +96,17 @@ func setConsumerConfigDefaults(kafkaConfig *Config) { func convertStartOffset(offset string) int64 { switch offset { case "earliest": - return kafka.FirstOffset + return segmentio.FirstOffset case "latest": - return kafka.LastOffset + return segmentio.LastOffset case "": - return kafka.FirstOffset + return segmentio.FirstOffset default: offsetValue, err := strconv.ParseInt(offset, 10, 64) if err == nil { return offsetValue } - return kafka.FirstOffset + return segmentio.FirstOffset } } diff --git a/internal/kafka/cron.go b/internal/cron.go similarity index 92% rename from internal/kafka/cron.go rename to internal/cron.go index 3962248..741a495 100644 --- a/internal/kafka/cron.go +++ b/internal/cron.go @@ -1,4 +1,4 @@ -package kafka +package internal import ( "time" @@ -16,11 +16,11 @@ type cronsumer struct { consumer *kafkaCronsumer } -func NewCronsumer(cfg *kafka.Config, fn func(message kafka.Message) error) kafka.Cronsumer { +func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer { cfg.Logger = logger.New(cfg.LogLevel) return &cronsumer{ cron: gocron.New(), - consumer: NewKafkaCronsumer(cfg, fn), + consumer: newKafkaCronsumer(cfg, fn), cfg: cfg, } } diff --git a/internal/kafka/cronsumer.go b/internal/cronsumer.go similarity index 95% rename from internal/kafka/cronsumer.go rename to internal/cronsumer.go index 63782ac..e7d3566 100644 --- a/internal/kafka/cronsumer.go +++ b/internal/cronsumer.go @@ -1,4 +1,4 @@ -package kafka +package internal import ( "time" @@ -22,7 +22,7 @@ type kafkaCronsumer struct { cfg *kafka.Config } -func NewKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *kafkaCronsumer { //nolint: revive +func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *kafkaCronsumer { return &kafkaCronsumer{ cfg: cfg, paused: false, diff --git a/internal/kafka/message.go b/internal/message.go similarity index 80% rename from internal/kafka/message.go rename to internal/message.go index 8faa21f..4ce4b85 100644 --- a/internal/kafka/message.go +++ b/internal/message.go @@ -1,26 +1,26 @@ -package kafka +package internal import ( "strconv" "time" "unsafe" - . "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "github.com/Trendyol/kafka-cronsumer/pkg/kafka" - "github.com/segmentio/kafka-go" + segmentio "github.com/segmentio/kafka-go" ) type MessageWrapper struct { - Message + kafka.Message RetryCount int } const RetryHeaderKey = "x-retry-count" -func newMessage(msg kafka.Message) MessageWrapper { +func newMessage(msg segmentio.Message) MessageWrapper { return MessageWrapper{ RetryCount: getRetryCount(&msg), - Message: Message{ + Message: kafka.Message{ Topic: msg.Topic, Partition: msg.Partition, Offset: msg.Offset, @@ -33,11 +33,11 @@ func newMessage(msg kafka.Message) MessageWrapper { } } -func (m *MessageWrapper) To(increaseRetry bool) kafka.Message { +func (m *MessageWrapper) To(increaseRetry bool) segmentio.Message { if increaseRetry { m.IncreaseRetryCount() } - return kafka.Message{ + return segmentio.Message{ Topic: m.Topic, Value: m.Value, Headers: m.Headers, @@ -84,7 +84,7 @@ func (m *MessageWrapper) IncreaseRetryCount() { } } -func getRetryCount(message *kafka.Message) int { +func getRetryCount(message *segmentio.Message) int { for i := range message.Headers { if message.Headers[i].Key != RetryHeaderKey { continue @@ -94,7 +94,7 @@ func getRetryCount(message *kafka.Message) int { return retryCount } - message.Headers = append(message.Headers, kafka.Header{ + message.Headers = append(message.Headers, segmentio.Header{ Key: RetryHeaderKey, Value: []byte("0"), }) diff --git a/internal/kafka/message_test.go b/internal/message_test.go similarity index 98% rename from internal/kafka/message_test.go rename to internal/message_test.go index 4ad2f4b..bf8f99a 100644 --- a/internal/kafka/message_test.go +++ b/internal/message_test.go @@ -1,4 +1,4 @@ -package kafka +package internal import ( "bytes" diff --git a/internal/kafka/producer.go b/internal/producer.go similarity index 62% rename from internal/kafka/producer.go rename to internal/producer.go index f4418d9..7c67684 100644 --- a/internal/kafka/producer.go +++ b/internal/producer.go @@ -1,13 +1,12 @@ -package kafka +package internal import ( "context" "time" - "github.com/Trendyol/kafka-cronsumer/internal/sasl" - . "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "github.com/Trendyol/kafka-cronsumer/pkg/kafka" - "github.com/segmentio/kafka-go" + segmentio "github.com/segmentio/kafka-go" ) type Producer interface { @@ -15,32 +14,32 @@ type Producer interface { } type kafkaProducer struct { - w *kafka.Writer - cfg *Config + w *segmentio.Writer + cfg *kafka.Config } /* -Allow Auto Topic Creation: The default Config configuration specifies that the broker should +Allow Auto Topic Creation: The default kafka.Config configuration specifies that the broker should automatically create a topic under the following circumstances: - When a kafkaProducer starts writing messages to the topic - When a kafkaConsumer starts reading messages from the topic - When any client requests metadata for the topic */ -func newProducer(kafkaConfig *Config) Producer { +func newProducer(kafkaConfig *kafka.Config) Producer { setProducerConfigDefaults(kafkaConfig) - producer := &kafka.Writer{ - Addr: kafka.TCP(kafkaConfig.Brokers...), - Balancer: &kafka.LeastBytes{}, + producer := &segmentio.Writer{ + Addr: segmentio.TCP(kafkaConfig.Brokers...), + Balancer: &segmentio.LeastBytes{}, BatchTimeout: kafkaConfig.Producer.BatchTimeout, BatchSize: kafkaConfig.Producer.BatchSize, AllowAutoTopicCreation: true, } if kafkaConfig.SASL.Enabled { - producer.Transport = &kafka.Transport{ - TLS: sasl.NewTLSConfig(kafkaConfig.SASL), - SASL: sasl.Mechanism(kafkaConfig.SASL), + producer.Transport = &segmentio.Transport{ + TLS: NewTLSConfig(kafkaConfig.SASL), + SASL: Mechanism(kafkaConfig.SASL), } } @@ -50,7 +49,7 @@ func newProducer(kafkaConfig *Config) Producer { } } -func setProducerConfigDefaults(kafkaConfig *Config) { +func setProducerConfigDefaults(kafkaConfig *kafka.Config) { if kafkaConfig.Producer.BatchSize == 0 { kafkaConfig.Producer.BatchSize = 100 } diff --git a/internal/sasl/secure.go b/internal/secure.go similarity index 98% rename from internal/sasl/secure.go rename to internal/secure.go index b246824..7ca8ba1 100644 --- a/internal/sasl/secure.go +++ b/internal/secure.go @@ -1,4 +1,4 @@ -package sasl +package internal import ( "crypto/tls" diff --git a/pkg/kafka/cronsumer.go b/pkg/kafka/cronsumer.go index da67621..4e0d11d 100644 --- a/pkg/kafka/cronsumer.go +++ b/pkg/kafka/cronsumer.go @@ -1,7 +1,10 @@ package kafka import ( + "time" + "github.com/Trendyol/kafka-cronsumer/pkg/logger" + "github.com/segmentio/kafka-go/protocol" ) // ConsumeFn This function describes how to consume messages from specified topic @@ -13,3 +16,14 @@ type Cronsumer interface { Stop() WithLogger(logger logger.Interface) } + +type Message struct { + Topic string + Partition int + Offset int64 + HighWaterMark int64 + Key []byte + Value []byte + Headers []protocol.Header + Time time.Time +} diff --git a/pkg/kafka/message.go b/pkg/kafka/message.go deleted file mode 100644 index 66cfea0..0000000 --- a/pkg/kafka/message.go +++ /dev/null @@ -1,18 +0,0 @@ -package kafka - -import ( - "time" - - "github.com/segmentio/kafka-go/protocol" -) - -type Message struct { - Topic string - Partition int - Offset int64 - HighWaterMark int64 - Key []byte - Value []byte - Headers []protocol.Header - Time time.Time -} diff --git a/pkg/logger/zap.go b/pkg/logger/zap.go index 93d15a6..5dd7985 100644 --- a/pkg/logger/zap.go +++ b/pkg/logger/zap.go @@ -11,7 +11,7 @@ type logger struct { func New(level Level) Interface { if level == "" { - level = Warn + level = Info } l, _ := newLogger(level)