Skip to content
This repository was archived by the owner on Oct 2, 2020. It is now read-only.

Commit b72889e

Browse files
authored
Enable consuming from retryQ and DLQ only (#70)
1 parent 9927cc9 commit b72889e

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
v0.2.2 (unreleased)
44
-------------------
55

6+
- Handle retryQ and DLQ only consumer topics.
67
- Update KafkaPartitionOwned metric to be a boolean metric for ownership.
78
- Rename number of partitions owned by specific worker to KafkaPartitionOwnedCount.
89
- Add client id option.

consumerBuilder.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,20 @@ func (c *consumerBuilder) Build() (kafka.Consumer, error) {
114114
func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) {
115115
// build TopicList per cluster
116116
for _, consumerTopic := range c.kafkaConfig.TopicList {
117-
// first, add TopicConsumer for original topic.
117+
// first, add TopicConsumer for original topic if topic is well defined.
118118
// disabling offset commit only applies for the original topic.
119-
partitionConsumerFactory := consumer.NewPartitionConsumer
120-
if !c.kafkaConfig.Offsets.Commits.Enabled {
121-
partitionConsumerFactory = consumer.NewPartitionConsumerWithoutCommit
119+
if consumerTopic.Topic.Name != "" && consumerTopic.Topic.Cluster != "" {
120+
partitionConsumerFactory := consumer.NewPartitionConsumer
121+
if !c.kafkaConfig.Offsets.Commits.Enabled {
122+
partitionConsumerFactory = consumer.NewPartitionConsumerWithoutCommit
123+
}
124+
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: consumerTopic, DLQMetadataDecoder: consumer.NoopDLQMetadataDecoder, PartitionConsumerFactory: partitionConsumerFactory}, c.kafkaConfig.Offsets.Initial.Offset)
122125
}
123-
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: consumerTopic, DLQMetadataDecoder: consumer.NoopDLQMetadataDecoder, PartitionConsumerFactory: partitionConsumerFactory}, c.kafkaConfig.Offsets.Initial.Offset)
126+
// Second, add retryQ topic if enabled.
124127
if consumerTopic.RetryQ.Name != "" && consumerTopic.RetryQ.Cluster != "" {
125128
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: topicToRetryTopic(consumerTopic), DLQMetadataDecoder: consumer.ProtobufDLQMetadataDecoder, PartitionConsumerFactory: consumer.NewPartitionConsumer}, sarama.OffsetOldest)
126129
}
130+
// Third, add DLQ topic if enabled.
127131
if consumerTopic.DLQ.Name != "" && consumerTopic.DLQ.Cluster != "" {
128132
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: topicToDLQTopic(consumerTopic), DLQMetadataDecoder: consumer.ProtobufDLQMetadataDecoder, PartitionConsumerFactory: consumer.NewRangePartitionConsumer}, sarama.OffsetOldest)
129133
}

0 commit comments

Comments
 (0)