Skip to content
This repository has been archived by the owner on Oct 2, 2020. It is now read-only.

Enable consuming from retryQ and DLQ only #70

Merged
merged 2 commits into from
Oct 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
v0.2.2 (unreleased)
-------------------

- Handle retryQ and DLQ only consumer topics.
- Update KafkaPartitionOwned metric to be a boolean metric for ownership.
- Rename number of partitions owned by specific worker to KafkaPartitionOwnedCount.

Expand Down
14 changes: 9 additions & 5 deletions consumerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,20 @@ func (c *consumerBuilder) Build() (kafka.Consumer, error) {
func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) {
// build TopicList per cluster
for _, consumerTopic := range c.kafkaConfig.TopicList {
// first, add TopicConsumer for original topic.
// first, add TopicConsumer for original topic if topic is well defined.
// disabling offset commit only applies for the original topic.
partitionConsumerFactory := consumer.NewPartitionConsumer
if !c.kafkaConfig.Offsets.Commits.Enabled {
partitionConsumerFactory = consumer.NewPartitionConsumerWithoutCommit
if consumerTopic.Topic.Name != "" && consumerTopic.Topic.Cluster != "" {
partitionConsumerFactory := consumer.NewPartitionConsumer
if !c.kafkaConfig.Offsets.Commits.Enabled {
partitionConsumerFactory = consumer.NewPartitionConsumerWithoutCommit
}
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: consumerTopic, DLQMetadataDecoder: consumer.NoopDLQMetadataDecoder, PartitionConsumerFactory: partitionConsumerFactory}, c.kafkaConfig.Offsets.Initial.Offset)
}
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: consumerTopic, DLQMetadataDecoder: consumer.NoopDLQMetadataDecoder, PartitionConsumerFactory: partitionConsumerFactory}, c.kafkaConfig.Offsets.Initial.Offset)
// Second, add retryQ topic if enabled.
if consumerTopic.RetryQ.Name != "" && consumerTopic.RetryQ.Cluster != "" {
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: topicToRetryTopic(consumerTopic), DLQMetadataDecoder: consumer.ProtobufDLQMetadataDecoder, PartitionConsumerFactory: consumer.NewPartitionConsumer}, sarama.OffsetOldest)
}
// Third, add DLQ topic if enabled.
if consumerTopic.DLQ.Name != "" && consumerTopic.DLQ.Cluster != "" {
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: topicToDLQTopic(consumerTopic), DLQMetadataDecoder: consumer.ProtobufDLQMetadataDecoder, PartitionConsumerFactory: consumer.NewRangePartitionConsumer}, sarama.OffsetOldest)
}
Expand Down