Skip to content
This repository has been archived by the owner on Oct 2, 2020. It is now read-only.

Commit

Permalink
Fix OffsetNewest to apply for non retry and dlq topics only (#58)
Browse files Browse the repository at this point in the history
* Fix OffsetNewest to apply for non retry and dlq topics only

* Rename offsetPolicy to initialOffset

* fmt

* Improve comments
  • Loading branch information
georgeteo authored Jul 3, 2018
1 parent 32ee22d commit 0719201
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 37 deletions.
39 changes: 25 additions & 14 deletions consumerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

type (
consumerBuilder struct {
clusterTopicsMap map[string][]consumer.Topic
clusterTopicsMap map[consumerCluster][]consumer.Topic

clusterSaramaClientMap map[string]sarama.Client
clusterSaramaConsumerMap map[string]consumer.SaramaConsumer
Expand All @@ -51,6 +51,14 @@ type (
saramaConfig *sarama.Config
saramaClusterConfig *cluster.Config
}

// consumerCluster wraps a consumer cluster name and relevant config for consuming from that cluster.
consumerCluster struct {
// name is the name of the cluster.
name string
// initialOffset is the initial offset of the cluster if there is no previously checkpointed offset.
initialOffset int64
}
)

func newConsumerBuilder(
Expand All @@ -62,7 +70,7 @@ func newConsumerBuilder(
consumerOptions := buildOptions(config, opts...)
saramaClusterConfig := buildSaramaConfig(consumerOptions)
return &consumerBuilder{
clusterTopicsMap: make(map[string][]consumer.Topic),
clusterTopicsMap: make(map[consumerCluster][]consumer.Topic),
clusterSaramaClientMap: make(map[string]sarama.Client),
clusterSaramaConsumerMap: make(map[string]consumer.SaramaConsumer),
clusterTopicSaramaProducerMap: make(map[string]map[string]sarama.AsyncProducer),
Expand All @@ -82,13 +90,13 @@ func newConsumerBuilder(
}
}

func (c *consumerBuilder) addTopicToClusterTopicsMap(topic consumer.Topic) {
topicList, ok := c.clusterTopicsMap[topic.Topic.Cluster]
func (c *consumerBuilder) addTopicToClusterTopicsMap(topic consumer.Topic, offsetPolicy int64) {
topicList, ok := c.clusterTopicsMap[consumerCluster{topic.Topic.Cluster, offsetPolicy}]
if !ok {
topicList = make([]consumer.Topic, 0, 10)
}
topicList = append(topicList, topic)
c.clusterTopicsMap[topic.Topic.Cluster] = topicList
c.clusterTopicsMap[consumerCluster{topic.Topic.Cluster, offsetPolicy}] = topicList
}

func (c *consumerBuilder) topicConsumerBuilderToTopicNames(topicList []consumer.Topic) []string {
Expand All @@ -112,25 +120,25 @@ func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) {
if !c.kafkaConfig.Offsets.Commits.Enabled {
partitionConsumerFactory = consumer.NewPartitionConsumerWithoutCommit
}
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: consumerTopic, DLQMetadataDecoder: consumer.NoopDLQMetadataDecoder, PartitionConsumerFactory: partitionConsumerFactory})
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: consumerTopic, DLQMetadataDecoder: consumer.NoopDLQMetadataDecoder, PartitionConsumerFactory: partitionConsumerFactory}, c.kafkaConfig.Offsets.Initial.Offset)
if consumerTopic.RetryQ.Name != "" && consumerTopic.RetryQ.Cluster != "" {
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: topicToRetryTopic(consumerTopic), DLQMetadataDecoder: consumer.ProtobufDLQMetadataDecoder, PartitionConsumerFactory: consumer.NewPartitionConsumer})
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: topicToRetryTopic(consumerTopic), DLQMetadataDecoder: consumer.ProtobufDLQMetadataDecoder, PartitionConsumerFactory: consumer.NewPartitionConsumer}, sarama.OffsetOldest)
}
if consumerTopic.DLQ.Name != "" && consumerTopic.DLQ.Cluster != "" {
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: topicToDLQTopic(consumerTopic), DLQMetadataDecoder: consumer.ProtobufDLQMetadataDecoder, PartitionConsumerFactory: consumer.NewRangePartitionConsumer})
c.addTopicToClusterTopicsMap(consumer.Topic{ConsumerTopic: topicToDLQTopic(consumerTopic), DLQMetadataDecoder: consumer.ProtobufDLQMetadataDecoder, PartitionConsumerFactory: consumer.NewRangePartitionConsumer}, sarama.OffsetOldest)
}
}

// Add additional topics that may have been injected from WithRangeConsumer option.
for _, topic := range c.options.OtherConsumerTopics {
c.addTopicToClusterTopicsMap(topic)
c.addTopicToClusterTopicsMap(topic, sarama.OffsetOldest) // OtherConsumerTopics are retry or dlq so default to offset oldest.
}

// build cluster consumer
clusterConsumerMap := make(map[string]*consumer.ClusterConsumer)
for cluster, topicList := range c.clusterTopicsMap {
uniqueTopicList := c.uniqueTopics(topicList)
saramaConsumer, err := c.getOrAddSaramaConsumer(cluster, uniqueTopicList)
saramaConsumer, err := c.getOrAddSaramaConsumer(cluster.name, uniqueTopicList, cluster.initialOffset)
if err != nil {
c.close()
return nil, err
Expand Down Expand Up @@ -163,8 +171,8 @@ func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) {
)
topicConsumerMap[topic.Name] = topicConsumer
}
clusterConsumerMap[cluster] = consumer.NewClusterConsumer(
cluster,
clusterConsumerMap[cluster.name] = consumer.NewClusterConsumer(
cluster.name,
saramaConsumer,
topicConsumerMap,
c.scope,
Expand Down Expand Up @@ -251,17 +259,20 @@ func (c *consumerBuilder) close() {
}
}

func (c *consumerBuilder) getOrAddSaramaConsumer(cluster string, topicList []consumer.Topic) (consumer.SaramaConsumer, error) {
func (c *consumerBuilder) getOrAddSaramaConsumer(cluster string, topicList []consumer.Topic, offsetPolicy int64) (consumer.SaramaConsumer, error) {
brokerList, err := c.resolver.ResolveIPForCluster(cluster)
if err != nil {
return nil, err
}

saramaConfig := *c.saramaClusterConfig
saramaConfig.Consumer.Offsets.Initial = offsetPolicy

saramaConsumer, err := c.constructors.NewSaramaConsumer(
brokerList,
c.kafkaConfig.GroupName,
c.topicConsumerBuilderToTopicNames(topicList),
c.saramaClusterConfig,
&saramaConfig,
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion consumerBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *ConsumerBuilderTestSuite) TestBuild() {
s.Equal([]string{"cluster", "dlq-cluster"}, func() []string {
output := make([]string, 0, 3)
for cluster := range s.builder.clusterTopicsMap {
output = append(output, cluster)
output = append(output, cluster.name)
}
sort.Strings(output)
return output
Expand Down
22 changes: 0 additions & 22 deletions consumerOptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ type (
apply(*consumer.Options)
}

rangeConsumersOption struct {
topicList kafka.ConsumerTopicList
}

dlqTopicsOptions struct {
topicList kafka.ConsumerTopicList
}
Expand All @@ -44,24 +40,6 @@ type (
}
)

// WithRangeConsumers creates a range consumer for the specified consumer topics.
// DEPRECATED
func WithRangeConsumers(topicList kafka.ConsumerTopicList) ConsumerOption {
return &rangeConsumersOption{
topicList: topicList,
}
}

func (o *rangeConsumersOption) apply(opts *consumer.Options) {
for _, topic := range o.topicList {
opts.OtherConsumerTopics = append(opts.OtherConsumerTopics, consumer.Topic{
ConsumerTopic: topic,
DLQMetadataDecoder: consumer.NoopDLQMetadataDecoder,
PartitionConsumerFactory: consumer.NewRangePartitionConsumer,
})
}
}

// WithDLQTopics creates a range consumer for the specified consumer DLQ topics.
func WithDLQTopics(topicList kafka.ConsumerTopicList) ConsumerOption {
return &dlqTopicsOptions{
Expand Down
2 changes: 2 additions & 0 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type (
Commits struct {
// Enabled if you want the library to commit offsets on your behalf.
// Defaults to true.
//
// The retry and dlq topic commit will always be committed for you since those topics are abstracted away from you.
Enabled bool
}
}
Expand Down

0 comments on commit 0719201

Please sign in to comment.