From 346630d2456f2885c4a217cd9b1b3e4526cb4ad6 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Thu, 19 Jul 2018 12:39:38 -0400 Subject: [PATCH] Configure consumer with viper Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 20 ++----- cmd/ingester/app/consumer/options.go | 72 +++++++++++++++++++++++ cmd/ingester/app/consumer/options_test.go | 51 ++++++++++++++++ 3 files changed, 129 insertions(+), 14 deletions(-) create mode 100644 cmd/ingester/app/consumer/options.go create mode 100644 cmd/ingester/app/consumer/options_test.go diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 3d03ec213419..5502878be582 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -26,26 +26,18 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" ) -// SaramaConsumer is an interface to features of Sarama that we use +// SaramaConsumer is an interface to features of Sarama that are necessary for the consumer type SaramaConsumer interface { Partitions() <-chan sc.PartitionConsumer MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) io.Closer } -// Config stores the configuration for a Consumer -type Config struct { - Topic string `yaml:"topic"` - GroupID string `yaml:"group_id"` - Brokers []string `yaml:"brokers"` - Parallelism int `yaml:"parallelism"` -} - // Params are the parameters of a Consumer type Params struct { - Config Config + Options Options Processor processor.SpanProcessor - Factory metrics.Factory `name:"service_metrics"` + Factory metrics.Factory Logger *zap.Logger } @@ -65,7 +57,7 @@ type Consumer struct { func New(params Params) (Consumer, error) { saramaConfig := sc.NewConfig() saramaConfig.Group.Mode = sc.ConsumerModePartitions - saramaConsumer, err := sc.NewConsumer(params.Config.Brokers, params.Config.GroupID, []string{params.Config.Topic}, saramaConfig) + saramaConsumer, err := sc.NewConsumer(params.Options.Brokers, params.Options.GroupID, []string{params.Options.Topic}, saramaConfig) if err != nil { return Consumer{}, err } @@ -76,12 +68,12 @@ func New(params Params) (Consumer, error) { isClosed: sync.WaitGroup{}, SaramaConsumer: saramaConsumer, processorFactory: processorFactory{ - topic: params.Config.Topic, + topic: params.Options.Topic, consumer: saramaConsumer, metricsFactory: params.Factory, logger: params.Logger, baseProcessor: params.Processor, - parallelism: params.Config.Parallelism, + parallelism: params.Options.Parallelism, }, }, nil } diff --git a/cmd/ingester/app/consumer/options.go b/cmd/ingester/app/consumer/options.go new file mode 100644 index 000000000000..25bc62016259 --- /dev/null +++ b/cmd/ingester/app/consumer/options.go @@ -0,0 +1,72 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "flag" + "strconv" + "strings" + + "github.com/spf13/viper" +) + +const ( + configPrefix = "ingester-consumer" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixGroupID = ".group-id" + suffixParallelism = ".parallelism" + + defaultBroker = "127.0.0.1:9092" + defaultTopic = "jaeger-ingester-spans" + defaultGroupID = "jaeger-ingester" + defaultParallelism = 1000 +) + +// Options stores the configuration options for a Kafka consumer +type Options struct { + Topic string + GroupID string + Brokers []string + Parallelism int +} + +// AddFlags adds flags for Options +func (opt *Options) AddFlags(flagSet *flag.FlagSet) { + flagSet.String( + configPrefix+suffixBrokers, + defaultBroker, + "The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") + flagSet.String( + configPrefix+suffixTopic, + defaultTopic, + "The name of the kafka topic to consume from") + flagSet.String( + configPrefix+suffixGroupID, + defaultGroupID, + "The Consumer Group that ingester will be consuming on behalf of") + flagSet.String( + configPrefix+suffixParallelism, + strconv.Itoa(defaultParallelism), + "The number of messages to process in parallel") +} + +// InitFromViper initializes Options with properties from viper +func (opt *Options) InitFromViper(v *viper.Viper) { + opt.Brokers = strings.Split(v.GetString(configPrefix+suffixBrokers), ",") + opt.Topic = v.GetString(configPrefix + suffixTopic) + opt.GroupID = v.GetString(configPrefix + suffixGroupID) + opt.Parallelism = v.GetInt(configPrefix + suffixParallelism) +} diff --git a/cmd/ingester/app/consumer/options_test.go b/cmd/ingester/app/consumer/options_test.go new file mode 100644 index 000000000000..61fe62208a8c --- /dev/null +++ b/cmd/ingester/app/consumer/options_test.go @@ -0,0 +1,51 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/pkg/config" +) + +func TestOptionsWithFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{ + "--ingester-consumer.topic=topic1", + "--ingester-consumer.brokers=127.0.0.1:9092,0.0.0:1234", + "--ingester-consumer.group-id=group1", + "--ingester-consumer.parallelism=5"}) + opts.InitFromViper(v) + + assert.Equal(t, "topic1", opts.Topic) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Brokers) + assert.Equal(t, "group1", opts.GroupID) + assert.Equal(t, 5, opts.Parallelism) +} + +func TestFlagDefaults(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{}) + opts.InitFromViper(v) + + assert.Equal(t, defaultTopic, opts.Topic) + assert.Equal(t, []string{defaultBroker}, opts.Brokers) + assert.Equal(t, defaultGroupID, opts.GroupID) + assert.Equal(t, defaultParallelism, opts.Parallelism) +}