Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix #20 #22

Merged
merged 2 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/pprof/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
brokers:
- "localhost:9092"
consumer:
groupId: "sample-consumer"
topic: "exception"
maxRetry: 3
concurrency: 100
cron: "*/1 * * * *"
duration: 30s
logLevel: debug
48 changes: 48 additions & 0 deletions examples/pprof/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"fmt"
cronsumer "github.com/Trendyol/kafka-cronsumer"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"log"
"net/http"
"os"
"path/filepath"
"runtime"

"gopkg.in/yaml.v3"
_ "net/http/pprof"
)

func main() {
kafkaConfig := getConfig()

var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return nil
}

go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()

c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}

func getConfig() *kafka.Config {
_, filename, _, _ := runtime.Caller(0)
dirname := filepath.Dir(filename)
file, err := os.ReadFile(filepath.Join(dirname, "config.yml"))
if err != nil {
panic(err)
}

cfg := &kafka.Config{}
err = yaml.Unmarshal(file, cfg)
if err != nil {
panic(err)
}

return cfg
}
22 changes: 10 additions & 12 deletions internal/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package internal
import (
"context"
"errors"
"io"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
segmentio "github.com/segmentio/kafka-go"
)

type Consumer interface {
ReadMessage() (MessageWrapper, error)
ReadMessage(ctx context.Context) (*MessageWrapper, error)
Stop()
}

Expand Down Expand Up @@ -52,22 +51,21 @@ func newConsumer(kafkaConfig *kafka.Config) *kafkaConsumer {
}
}

func (k kafkaConsumer) ReadMessage() (MessageWrapper, error) {
msg, err := k.consumer.ReadMessage(context.Background())
func (k kafkaConsumer) ReadMessage(ctx context.Context) (*MessageWrapper, error) {
msg, err := k.consumer.ReadMessage(ctx)
if err != nil {
if k.IsReaderHasBeenClosed(err) {
return MessageWrapper{}, err
if isContextCancelled(err) {
k.cfg.Logger.Info("kafka-go context is cancelled")
return nil, nil
}

k.cfg.Logger.Errorf("Message not read %v", err)
return MessageWrapper{}, err
return nil, err
}

return newMessage(msg), err
return newMessage(msg), nil
}

func (k kafkaConsumer) IsReaderHasBeenClosed(err error) bool {
return errors.Is(err, io.EOF)
func isContextCancelled(err error) bool {
return errors.Is(err, context.Canceled)
}

func (k kafkaConsumer) Stop() {
Expand Down
36 changes: 19 additions & 17 deletions internal/cron.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"context"
"time"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
Expand Down Expand Up @@ -30,22 +31,12 @@ func (s *cronsumer) WithLogger(logger logger.Interface) {
}

func (s *cronsumer) Start() {
cfg := s.cfg.Consumer
_, _ = s.cron.AddFunc(cfg.Cron, func() {
s.cfg.Logger.Info("Topic started at time: " + time.Now().String())
s.consumer.Start(setConcurrency(cfg.Concurrency))
time.AfterFunc(cfg.Duration, s.consumer.Pause)
})
s.setup()
s.cron.Start()
}

func (s *cronsumer) Run() {
cfg := s.cfg.Consumer
_, _ = s.cron.AddFunc(cfg.Cron, func() {
s.cfg.Logger.Info("Topic started at time: " + time.Now().String())
s.consumer.Start(setConcurrency(cfg.Concurrency))
time.AfterFunc(cfg.Duration, s.consumer.Pause)
})
s.setup()
s.cron.Run()
}

Expand All @@ -54,9 +45,20 @@ func (s *cronsumer) Stop() {
s.consumer.Stop()
}

func setConcurrency(concurrency int) int {
if concurrency == 0 {
return 1
}
return concurrency
func (s *cronsumer) setup() {
cfg := s.cfg.Consumer

s.consumer.SetupConcurrentWorkers(cfg.Concurrency)

_, _ = s.cron.AddFunc(cfg.Cron, func() {
s.cfg.Logger.Info("Topic started at time: " + time.Now().String())

ctx, cancelFunc := context.WithCancel(context.Background())
go s.consumer.Listen(ctx)

time.AfterFunc(cfg.Duration, func() {
s.cfg.Logger.Info("Process Topic PAUSED")
cancelFunc()
})
})
}
65 changes: 19 additions & 46 deletions internal/cronsumer.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package internal

import (
"context"
"time"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
)

type kafkaCronsumer struct {
paused bool
quitChannel chan bool
messageChannel chan MessageWrapper

kafkaConsumer Consumer
Expand All @@ -25,75 +24,49 @@ type kafkaCronsumer struct {
func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *kafkaCronsumer {
cfg.SetDefaults()
cfg.Validate()

return &kafkaCronsumer{
cfg: cfg,
paused: false,
quitChannel: make(chan bool),
messageChannel: make(chan MessageWrapper),
kafkaConsumer: newConsumer(cfg),
kafkaProducer: newProducer(cfg),
consumeFn: c,
maxRetry: setMaxRetry(cfg.Consumer.MaxRetry),
maxRetry: cfg.Consumer.MaxRetry,
deadLetterTopic: cfg.Consumer.DeadLetterTopic,
}
}

func setMaxRetry(maxRetry int) int {
if maxRetry == 0 {
return 3
}
return maxRetry
}

func (k *kafkaCronsumer) Start(concurrency int) {
k.Resume()
go k.Listen()

func (k *kafkaCronsumer) SetupConcurrentWorkers(concurrency int) {
for i := 0; i < concurrency; i++ {
go k.processMessage()
}
}

func (k *kafkaCronsumer) Resume() {
k.messageChannel = make(chan MessageWrapper)
k.paused = false
k.quitChannel = make(chan bool)
}

func (k *kafkaCronsumer) Listen() {
func (k *kafkaCronsumer) Listen(ctx context.Context) {
startTime := time.Now()

for {
select {
case <-k.quitChannel:
msg, err := k.kafkaConsumer.ReadMessage(ctx)
if err != nil {
k.cfg.Logger.Errorf("Message could not read, error %v", err)
return
default:
msg, err := k.kafkaConsumer.ReadMessage()
if err != nil {
continue
}
}

if msg.Time.Before(startTime) {
k.sendToMessageChannel(msg)
} else {
k.Pause()
if msg == nil {
return
}

if err := k.kafkaProducer.Produce(msg, false); err != nil {
k.cfg.Logger.Errorf("Error sending next iteration KafkaMessage: %v", err)
}
if msg.Time.After(startTime) {
k.cfg.Logger.Info("Next iteration KafkaMessage has been detected, resending exception topic")

return
if err = k.kafkaProducer.Produce(*msg, false); err != nil {
k.cfg.Logger.Errorf("Error sending next iteration KafkaMessage: %v", err)
}

return
}
}
}

func (k *kafkaCronsumer) Pause() {
if !k.paused {
k.cfg.Logger.Info("Process Topic PAUSED")
close(k.messageChannel)
k.paused = true
k.quitChannel <- true
k.sendToMessageChannel(*msg)
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type MessageWrapper struct {

const RetryHeaderKey = "x-retry-count"

func newMessage(msg segmentio.Message) MessageWrapper {
return MessageWrapper{
func newMessage(msg segmentio.Message) *MessageWrapper {
return &MessageWrapper{
RetryCount: getRetryCount(&msg),
Message: kafka.Message{
Topic: msg.Topic,
Expand Down
6 changes: 6 additions & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ type ProducerConfig struct {
}

func (c *Config) SetDefaults() {
if c.Consumer.MaxRetry == 0 {
c.Consumer.MaxRetry = 3
}
if c.Consumer.Concurrency == 0 {
c.Consumer.Concurrency = 1
}
if c.Consumer.MinBytes == 0 {
c.Consumer.MinBytes = 10e3
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func TestConfig_SetDefaults(t *testing.T) {
name: "should be set to default values when any value is empty",
expected: fields{
Consumer: ConsumerConfig{
MaxRetry: 3,
Concurrency: 1,
MinBytes: 10e3,
MaxBytes: 10e6,
MaxWait: 2 * time.Second,
Expand Down