Skip to content

Trendyol/kafka-cronsumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

94 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸ”₯ Kafka C[r]onsumer πŸ”₯

Description πŸ“–

Kafka Cronsumer is mainly used for retry/exception strategy management. It works based on cron expression and consumes messages in a timely manner with the power of auto pause and concurrency configurations.

For details check our blog post

How Kafka Cronsumer Works πŸ’‘

How Kafka Cronsumer Works

πŸ–₯ Use cases

In this library, we implement an iteration-based process with a back-off strategy. As you already know back-off strategy is helpful to

  • limit the impact of the additional load on dependencies
  • increase upstream resilience and keep healthy
  • resolve transient network errors
  • allows doing hotfixes if there is a temporary bug in the code

If the order of messages is unnecessary, it is very appropriate for these scenarios.

Guide

Installation 🧰

go get -u github.com/Trendyol/kafka-cronsumer@latest

Examples πŸ› 

You can find a number of ready-to-run examples at this directory.

Single Consumer

func main() {
    // ...
    var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
      fmt.Printf("consumer > Message received: %s\n", string(message.Value))
      return nil
    }
    
    cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn)
    cronsumer.Run()
}

Single Consumer With Dead Letter

func main() {
    // ...
    var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
      fmt.Printf("consumer > Message received: %s\n", string(message.Value))
      return errors.New("error occurred")
    }
    
    cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn)
    cronsumer.Run()
}

Multiple Consumers

func main() {
    // ...
    var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
      fmt.Printf("First consumer > Message received: %s\n", string(message.Value))
      return nil
    }
    firstHandler := kcronsumer.NewCronsumer(firstCfg, firstConsumerFn)
    firstHandler.Start()

    var secondConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
      fmt.Printf("Second consumer > Message received: %s\n", string(message.Value))
      return nil
    }
    secondHandler := kcronsumer.NewCronsumer(secondCfg, secondConsumerFn)
    secondHandler.Start()
    // ...    
}

Configs

config description default example
logLevel Describes log level, valid options are debug, info, warn, and error warn
consumer.cron Cron expression when exception consumer starts to work at */1 * * * *
consumer.duration Work duration exception consumer actively consuming messages 20s, 15m, 1h
consumer.topic Exception topic names exception-topic
consumer.groupId Exception consumer group id exception-consumer-group
consumer.maxRetry Maximum retry value for attempting to retry a message 3
consumer.concurrency Number of goroutines used at listeners 1
consumer.minBytes see doc 10e3
consumer.maxBytes see doc 10e6
consumer.maxWait see doc 2s
consumer.commitInterval see doc 1s
consumer.heartbeatInterval see doc 3s
consumer.sessionTimeout see doc 30s
consumer.rebalanceTimeout see doc 30s
consumer.startOffset see doc earliest
consumer.retentionTime see doc 24h
producer.batchSize see doc 100
producer.batchTimeout see doc 500us
sasl.enabled It enables sasl authentication mechanism false
sasl.authType Currently we only support SCRAM ""
sasl.username SCRAM username ""
sasl.password SCRAM password ""
sasl.rootCAPath see doc ""
sasl.intermediateCAPath ""
sasl.rack see doc ""

Contribute

Use issues for everything

  • For a small change, just send a PR.
  • For bigger changes open an issue for discussion before sending a PR.
  • PR should have:
    • Test case
    • Documentation
    • Example (If it makes sense)
  • You can also contribute by:
    • Reporting issues
    • Suggesting new features or enhancements
    • Improve/fix documentation

Please adhere to this project's code of conduct.

Users

Code of Conduct

Contributor Code of Conduct. By participating in this project you agree to abide by its terms.

Libraries Used For This Project πŸ’ͺ

Additional References 🀘