Kafka Cronsumer is mainly used for retry/exception strategy management. It works based on cron expression and consumes messages in a timely manner with the power of auto pause and concurrency configurations.
For details check our blog post
In this library, we implement an iteration-based process with a back-off strategy. As you already know back-off strategy is helpful to
- limit the impact of the additional load on dependencies
- increase upstream resilience and keep healthy
- resolve transient network errors
- allows doing hotfixes if there is a temporary bug in the code
If the order of messages is unnecessary, it is very appropriate for these scenarios.
go get -u github.com/Trendyol/kafka-cronsumer@latest
You can find a number of ready-to-run examples at this directory.
func main() {
// ...
var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return nil
}
cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn)
cronsumer.Run()
}
func main() {
// ...
var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return errors.New("error occurred")
}
cronsumer := kcronsumer.NewCronsumer(kafkaConfig, consumeFn)
cronsumer.Run()
}
func main() {
// ...
var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("First consumer > Message received: %s\n", string(message.Value))
return nil
}
firstHandler := kcronsumer.NewCronsumer(firstCfg, firstConsumerFn)
firstHandler.Start()
var secondConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("Second consumer > Message received: %s\n", string(message.Value))
return nil
}
secondHandler := kcronsumer.NewCronsumer(secondCfg, secondConsumerFn)
secondHandler.Start()
// ...
}
config | description | default | example |
---|---|---|---|
logLevel |
Describes log level, valid options are debug , info , warn , and error |
warn | |
consumer.cron |
Cron expression when exception consumer starts to work at | */1 * * * * | |
consumer.duration |
Work duration exception consumer actively consuming messages | 20s, 15m, 1h | |
consumer.topic |
Exception topic names | exception-topic | |
consumer.groupId |
Exception consumer group id | exception-consumer-group | |
consumer.maxRetry |
Maximum retry value for attempting to retry a message | 3 | |
consumer.concurrency |
Number of goroutines used at listeners | 1 | |
consumer.minBytes |
see doc | 10e3 | |
consumer.maxBytes |
see doc | 10e6 | |
consumer.maxWait |
see doc | 2s | |
consumer.commitInterval |
see doc | 1s | |
consumer.heartbeatInterval |
see doc | 3s | |
consumer.sessionTimeout |
see doc | 30s | |
consumer.rebalanceTimeout |
see doc | 30s | |
consumer.startOffset |
see doc | earliest | |
consumer.retentionTime |
see doc | 24h | |
producer.batchSize |
see doc | 100 | |
producer.batchTimeout |
see doc | 500us | |
sasl.enabled |
It enables sasl authentication mechanism | false | |
sasl.authType |
Currently we only support SCRAM |
"" | |
sasl.username |
SCRAM username | "" | |
sasl.password |
SCRAM password | "" | |
sasl.rootCAPath |
see doc | "" | |
sasl.intermediateCAPath |
"" | ||
sasl.rack |
see doc | "" |
Use issues for everything
- For a small change, just send a PR.
- For bigger changes open an issue for discussion before sending a PR.
- PR should have:
- Test case
- Documentation
- Example (If it makes sense)
- You can also contribute by:
- Reporting issues
- Suggesting new features or enhancements
- Improve/fix documentation
Please adhere to this project's code of conduct
.
- @Abdulsametileri (Author)
- @emreodabas (Author)
- @anilmisirlioglu (Contributor)
Contributor Code of Conduct. By participating in this project you agree to abide by its terms.