Skip to content

Commit

Permalink
Remove deprecated kafka flags (#1424)
Browse files Browse the repository at this point in the history
* Remove deprecated kafka flags in favor of kafka.producer and kafka.consumer flags

Signed-off-by: Louis-Etienne Dorval <louis-etienne.dorval@ticketmaster.com>

* run make fmt

Signed-off-by: Yuri Shkuro <ys@uber.com>

* update kafka flags in the integration test

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Fix test

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Fix Kafka integration test

Signed-off-by: Yuri Shkuro <ys@uber.com>

* More test fixes

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Fix changelog

Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
ledor473 authored and yurishkuro committed Mar 17, 2019
1 parent 44eaf08 commit 1157a7d
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 180 deletions.
29 changes: 28 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,33 @@ Changes by Version
#### Backend Changes

##### Breaking Changes
- The `kafka` flags were removed in favor of `kafka.producer` and `kafka.consumer` flags ([#1424](https://github.com/jaegertracing/jaeger/pull/1424), [@ledor473](https://github.com/ledor473))

The following flags have been **removed** in the Collector and the Ingester:
```
--kafka.brokers
--kafka.encoding
--kafka.topic
--ingester.brokers
--ingester.encoding
--ingester.topic
--ingester.group-id
```
In the Collector, they are replaced by:
```
--kafka.producer.brokers
--kafka.producer.encoding
--kafka.producer.topic
```
In the Ingester, they are replaced by:
```
--kafka.consumer.brokers
--kafka.consumer.encoding
--kafka.consumer.topic
--kafka.consumer.group-id
```
##### New Features
Expand All @@ -21,7 +48,7 @@ Changes by Version
#### Backend Changes
##### Breaking Changes
- Introduce `kafka.producer` and `kafka.consumer` flags to replace `kafka` flags ([1360](https://github.com/jaegertracing/jaeger/pull/1360), [@ledor473](https://github.com/ledor473))
- Introduce `kafka.producer` and `kafka.consumer` flags to replace `kafka` flags ([#1360](https://github.com/jaegertracing/jaeger/pull/1360), [@ledor473](https://github.com/ledor473))
The following flags have been deprecated in the Collector and the Ingester:
```
Expand Down
56 changes: 6 additions & 50 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ const (
ConfigPrefix = "ingester"
// KafkaConsumerConfigPrefix is a prefix for the Kafka flags
KafkaConsumerConfigPrefix = "kafka.consumer"
// DeprecatedKafkaConfigPrefix is a prefix for the Kafka flags that is replaced by KafkaConfigPrefix
DeprecatedKafkaConfigPrefix = "kafka"
// SuffixBrokers is a suffix for the brokers flag
SuffixBrokers = ".brokers"
// SuffixTopic is a suffix for the topic flag
Expand Down Expand Up @@ -107,64 +105,22 @@ func AddFlags(flagSet *flag.FlagSet) {
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")

// TODO: Remove deprecated flags after 1.11
flagSet.String(
DeprecatedKafkaConfigPrefix+SuffixBrokers,
"",
fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixBrokers))
flagSet.String(
DeprecatedKafkaConfigPrefix+SuffixTopic,
"",
fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixTopic))
flagSet.String(
DeprecatedKafkaConfigPrefix+SuffixGroupID,
"",
fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixGroupID))
flagSet.String(
DeprecatedKafkaConfigPrefix+SuffixEncoding,
"",
fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixEncoding))
}

// InitFromViper initializes Builder with properties from viper
func (o *Options) InitFromViper(v *viper.Viper) {
o.Brokers = strings.Split(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers), ",")
o.Brokers = strings.Split(stripWhiteSpace(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers)), ",")
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)

if brokers := v.GetString(DeprecatedKafkaConfigPrefix + SuffixBrokers); brokers != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
DeprecatedKafkaConfigPrefix+SuffixBrokers,
KafkaConsumerConfigPrefix+SuffixBrokers,
)
o.Brokers = strings.Split(brokers, ",")
}
if topic := v.GetString(DeprecatedKafkaConfigPrefix + SuffixTopic); topic != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
DeprecatedKafkaConfigPrefix+SuffixTopic,
KafkaConsumerConfigPrefix+SuffixTopic,
)
o.Topic = topic
}
if groupID := v.GetString(DeprecatedKafkaConfigPrefix + SuffixGroupID); groupID != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
DeprecatedKafkaConfigPrefix+SuffixGroupID,
KafkaConsumerConfigPrefix+SuffixGroupID,
)
o.GroupID = groupID
}
if encoding := v.GetString(DeprecatedKafkaConfigPrefix + SuffixEncoding); encoding != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
DeprecatedKafkaConfigPrefix+SuffixEncoding,
KafkaConsumerConfigPrefix+SuffixEncoding,
)
o.Encoding = encoding
}

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.IngesterHTTPPort = v.GetInt(ConfigPrefix + SuffixHTTPPort)

o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)
}

// stripWhiteSpace removes all whitespace characters from a string
func stripWhiteSpace(str string) string {
return strings.Replace(str, " ", "", -1)
}
38 changes: 1 addition & 37 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestOptionsWithFlags(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--kafka.consumer.topic=topic1",
"--kafka.consumer.brokers=127.0.0.1:9092,0.0.0:1234",
"--kafka.consumer.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.consumer.group-id=group1",
"--kafka.consumer.encoding=json",
"--ingester.parallelism=5",
Expand Down Expand Up @@ -59,39 +59,3 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, DefaultEncoding, o.Encoding)
assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval)
}

func TestOptionsWithDeprecatedFlags(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092,0.0.0:1234",
"--kafka.group-id=group1",
"--kafka.encoding=json"})
o.InitFromViper(v)

assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
}

func TestOptionsWithAllFlags(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092,0.0.0:1234",
"--kafka.group-id=group1",
"--kafka.encoding=protobuf",
"--kafka.consumer.topic=topic2",
"--kafka.consumer.brokers=10.0.0.1:9092,10.0.0.2:9092",
"--kafka.consumer.group-id=group2",
"--kafka.consumer.encoding=json"})
o.InitFromViper(v)

assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, kafka.EncodingProto, o.Encoding)
}
43 changes: 27 additions & 16 deletions plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,24 @@ type KafkaIntegrationTestSuite struct {

func (s *KafkaIntegrationTestSuite) initialize() error {
s.logger, _ = testutils.NewLogger()
const encoding = "json"
const groupID = "kafka-integration-test"
// A new topic is generated per execution to avoid data overlap
topic := "jaeger-kafka-integration-test-" + strconv.FormatInt(time.Now().UnixNano(), 10)

f := kafka.NewFactory()
v, command := config.Viperize(app.AddFlags)
command.ParseFlags([]string{
"--kafka.topic",
v, command := config.Viperize(f.AddFlags)
err := command.ParseFlags([]string{
"--kafka.producer.topic",
topic,
"--kafka.brokers",
"--kafka.producer.brokers",
defaultLocalKafkaBroker,
"--kafka.encoding",
"json",
"--ingester.brokers",
defaultLocalKafkaBroker,
"--ingester.topic",
topic,
"--ingester.group-id",
"kafka-integration-test",
"--ingester.parallelism",
"1000",
"--ingester.encoding",
"json",
"--kafka.producer.encoding",
encoding,
})
if err != nil {
return err
}
f.InitFromViper(v)
if err := f.Initialize(metrics.NullFactory, s.logger); err != nil {
return err
Expand All @@ -76,6 +71,22 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
return err
}

v, command = config.Viperize(app.AddFlags)
err = command.ParseFlags([]string{
"--kafka.consumer.topic",
topic,
"--kafka.consumer.brokers",
defaultLocalKafkaBroker,
"--kafka.consumer.encoding",
encoding,
"--kafka.consumer.group-id",
groupID,
"--ingester.parallelism",
"1000",
})
if err != nil {
return err
}
options := app.Options{}
options.InitFromViper(v)
traceStore := memory.NewStore()
Expand Down
6 changes: 4 additions & 2 deletions plugin/storage/kafka/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

Expand Down Expand Up @@ -83,7 +84,8 @@ func TestKafkaFactoryEncoding(t *testing.T) {
t.Run(test.encoding, func(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{"--kafka.encoding=" + test.encoding})
err := command.ParseFlags([]string{"--kafka.producer.encoding=" + test.encoding})
require.NoError(t, err)
f.InitFromViper(v)

f.Builder = &mockProducerBuilder{t: t}
Expand All @@ -96,7 +98,7 @@ func TestKafkaFactoryEncoding(t *testing.T) {
func TestKafkaFactoryMarshallerErr(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{"--kafka.encoding=bad-input"})
command.ParseFlags([]string{"--kafka.producer.encoding=bad-input"})
f.InitFromViper(v)

f.Builder = &mockProducerBuilder{t: t}
Expand Down
47 changes: 4 additions & 43 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ const (
// EncodingZipkinThrift is used for spans encoded as Zipkin Thrift.
EncodingZipkinThrift = "zipkin-thrift"

configPrefix = "kafka.producer"
deprecatedPrefix = "kafka"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
configPrefix = "kafka.producer"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
Expand Down Expand Up @@ -70,20 +69,6 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)

// TODO: Remove deprecated flags after 1.11
flagSet.String(
deprecatedPrefix+suffixBrokers,
"",
fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixBrokers))
flagSet.String(
deprecatedPrefix+suffixTopic,
"",
fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixTopic))
flagSet.String(
deprecatedPrefix+suffixEncoding,
"",
fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixEncoding))
}

// InitFromViper initializes Options with properties from viper
Expand All @@ -93,30 +78,6 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
}
opt.topic = v.GetString(configPrefix + suffixTopic)
opt.encoding = v.GetString(configPrefix + suffixEncoding)

if brokers := v.GetString(deprecatedPrefix + suffixBrokers); brokers != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
deprecatedPrefix+suffixBrokers,
configPrefix+suffixBrokers,
)
opt.config = producer.Configuration{
Brokers: strings.Split(stripWhiteSpace(brokers), ","),
}
}
if topic := v.GetString(deprecatedPrefix + suffixTopic); topic != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
deprecatedPrefix+suffixTopic,
configPrefix+suffixTopic,
)
opt.topic = topic
}
if encoding := v.GetString(deprecatedPrefix + suffixEncoding); encoding != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
deprecatedPrefix+suffixEncoding,
configPrefix+suffixEncoding,
)
opt.encoding = encoding
}
}

// stripWhiteSpace removes all whitespace characters from a string
Expand Down
31 changes: 0 additions & 31 deletions plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,3 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, []string{defaultBroker}, opts.config.Brokers)
assert.Equal(t, defaultEncoding, opts.encoding)
}

func TestOptionsWithDeprecatedFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.encoding=protobuf"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers)
assert.Equal(t, "protobuf", opts.encoding)
}

func TestOptionsWithAllFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.encoding=protobuf",
"--kafka.producer.topic=topic2",
"--kafka.producer.brokers=10.0.0.1:9092, 10.0.0.2:9092",
"--kafka.producer.encoding=json"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers)
assert.Equal(t, "protobuf", opts.encoding)
}

0 comments on commit 1157a7d

Please sign in to comment.