Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka kerberos authentication support for collector/ingester #1589

Merged
merged 1 commit into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ required = [

[[constraint]]
name = "github.com/Shopify/sarama"
version = "1.20.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to depend on a version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully a new release should be coming soon: IBM/sarama#1366 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just temporal, waiting for a new release of sarama.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pavolloffay if we don't get a sarama release soon, are we ok to release with the SHA?

revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"

[[constraint]]
name = "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -171,3 +171,7 @@ required = [
[[constraint]]
name = "github.com/hashicorp/go-hclog"
version = "0.8.0"

[[override]]
name = "github.com/Shopify/sarama"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
9 changes: 5 additions & 4 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := kafkaConsumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
AuthenticationConfig: options.AuthenticationConfig,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)
Expand All @@ -48,7 +49,6 @@ const (
SuffixParallelism = ".parallelism"
// SuffixHTTPPort is a suffix for the HTTP port
SuffixHTTPPort = ".http-port"

// DefaultBroker is the default kafka broker
DefaultBroker = "127.0.0.1:9092"
// DefaultTopic is the default kafka topic
Expand Down Expand Up @@ -103,6 +103,8 @@ func AddFlags(flagSet *flag.FlagSet) {
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
// Authentication flags
auth.AddFlags(KafkaConsumerConfigPrefix, flagSet)
}

// InitFromViper initializes Builder with properties from viper
Expand All @@ -115,6 +117,9 @@ func (o *Options) InitFromViper(v *viper.Viper) {

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)
authenticationOptions := auth.AuthenticationConfig{}
authenticationOptions.InitFromViper(KafkaConsumerConfigPrefix, v)
o.AuthenticationConfig = authenticationOptions
}

// stripWhiteSpace removes all whitespace characters from a string
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka/auth/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requires connection to Kafka
65 changes: 65 additions & 0 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"log"
"strings"

"github.com/Shopify/sarama"
"github.com/spf13/viper"
)

const none = "none"
const kerberos = "kerberos"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would define this as array and pass to flag description:

  • to avoid duplication in constants
  • this way we don't forget to update help message if new auth is added


var authTypes = []string{
none,
kerberos,
}

// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster
type AuthenticationConfig struct {
Authentication string
Kerberos KerberosConfig
}

//SetConfiguration set configure authentication into sarama config structure
func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) {
authentication := strings.ToLower(config.Authentication)
if strings.Trim(authentication, " ") == "" {
authentication = none
}
switch authentication {
case kerberos:
setKerberosConfiguration(&config.Kerberos, saramaConfig)
case none:
return
default:
log.Fatalf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication)
}
}

// InitFromViper loads authentication configuration from viper flags.
func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.Viper) {
config.Authentication = v.GetString(configPrefix + suffixAuthentication)
config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName)
config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm)
config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab)
config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName)
config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword)
config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig)
config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab)
}
46 changes: 46 additions & 0 deletions pkg/kafka/auth/kerberos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"github.com/Shopify/sarama"
)

// KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer
type KerberosConfig struct {
ServiceName string
Realm string
UseKeyTab bool
Username string
Password string
ConfigPath string
KeyTabPath string
}

func setKerberosConfiguration(config *KerberosConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaConfig.Net.SASL.Enable = true
if config.UseKeyTab {
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
} else {
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
saramaConfig.Net.SASL.GSSAPI.Password = config.Password
}
saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath
saramaConfig.Net.SASL.GSSAPI.Username = config.Username
saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm
saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName
}
Loading