diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index fab2d5ed2f1..93be0d13f54 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -15,12 +15,16 @@ package app import ( + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/kafka/auth" "github.com/jaegertracing/jaeger/plugin/storage/kafka" ) @@ -49,6 +53,46 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, kafka.EncodingJSON, o.Encoding) } +func TestTLSFlags(t *testing.T) { + kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} + tests := []struct { + flags []string + expected auth.AuthenticationConfig + }{ + { + flags: []string{}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb}, + }, + { + flags: []string{"--kafka.consumer.authentication=foo"}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb}, + }, + { + flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + }, + { + flags: []string{"--kafka.consumer.authentication=tls"}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + }, + { + flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("%s", test.flags), func(t *testing.T) { + o := &Options{} + v, command := config.Viperize(AddFlags) + err := command.ParseFlags(test.flags) + require.NoError(t, err) + o.InitFromViper(v) + assert.Equal(t, test.expected, o.AuthenticationConfig) + }) + } +} + func TestFlagDefaults(t *testing.T) { o := &Options{} v, command := config.Viperize(AddFlags) diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index 7b94176f220..2221cd9d7a1 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -51,14 +51,20 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config if strings.Trim(authentication, " ") == "" { authentication = none } + if config.Authentication == tls || config.TLS.Enabled { + err := setTLSConfiguration(&config.TLS, saramaConfig) + if err != nil { + return err + } + } switch authentication { case none: return nil + case tls: + return nil case kerberos: setKerberosConfiguration(&config.Kerberos, saramaConfig) return nil - case tls: - return setTLSConfiguration(&config.TLS, saramaConfig) case plaintext: setPlainTextConfiguration(&config.PlainText, saramaConfig) return nil @@ -85,6 +91,9 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. } config.TLS = tlsClientConfig.InitFromViper(v) + if config.Authentication == tls { + config.TLS.Enabled = true + } config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName) config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword) diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index e27ee91f55f..90b5632e5c0 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -15,6 +15,7 @@ package kafka import ( + "fmt" "testing" "time" @@ -23,6 +24,8 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) func TestOptionsWithFlags(t *testing.T) { @@ -164,3 +167,44 @@ func TestRequiredAcksFailures(t *testing.T) { _, err := getRequiredAcks("test") assert.Error(t, err) } + +func TestTLSFlags(t *testing.T) { + kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} + tests := []struct { + flags []string + expected auth.AuthenticationConfig + }{ + { + flags: []string{}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb}, + }, + { + flags: []string{"--kafka.producer.authentication=foo"}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb}, + }, + { + flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + }, + { + flags: []string{"--kafka.producer.authentication=tls"}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + }, + { + flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("%s", test.flags), func(t *testing.T) { + o := &Options{} + v, command := config.Viperize(o.AddFlags) + err := command.ParseFlags(test.flags) + require.NoError(t, err) + o.InitFromViper(v) + assert.Equal(t, test.expected, o.config.AuthenticationConfig) + + }) + } +}