Skip to content

Commit 91515c1

Browse files
authored
feat: support sasl/scram authentication (#46)
* Added: support sasl/scram authentication * chore: remove unused variable kafkaSslValidation
1 parent 0ba161e commit 91515c1

File tree

3 files changed

+48
-4
lines changed

3 files changed

+48
-4
lines changed

README.md

+7
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ To connect to Kafka over SSL define the following additonal environment variable
5656
- `KAFKA_SSL_CLIENT_KEY_PASS`: Kafka SSL client certificate key password (optional), defaults to `""`
5757
- `KAFKA_SSL_CA_CERT_FILE`: Kafka SSL broker CA certificate file, defaults to `""`
5858

59+
To connect to Kafka over SASL/SCRAM authentication define the following additonal environment variables:
60+
61+
- `KAFKA_SECURITY_PROTOCOL`: Kafka client used protocol to communicate with brokers, must be set if SASL is going to be used, either plain or with SSL
62+
- `KAFKA_SASL_MECHANISM`: SASL mechanism to use for authentication, defaults to `""`
63+
- `KAFKA_SASL_USERNAME`: SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms, defaults to `""`
64+
- `KAFKA_SASL_PASSWORD`: SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism, defaults to `""`
65+
5966
When deployed in a Kubernetes cluster using Helm and using a Kafka external to the cluster, it might be necessary to define the kafka hostname resolution locally (this fills the /etc/hosts of the container). Use a custom values.yaml file with section `hostAliases` (as mentioned in default values.yaml).
6067

6168
### prometheus

config.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ var (
3535
kafkaSslClientKeyFile = ""
3636
kafkaSslClientKeyPass = ""
3737
kafkaSslCACertFile = ""
38-
kafkaSslValidation = false
38+
kafkaSecurityProtocol = ""
39+
kafkaSaslMechanism = ""
40+
kafkaSaslUsername = ""
41+
kafkaSaslPassword = ""
3942
serializer Serializer
4043
)
4144

@@ -88,6 +91,22 @@ func init() {
8891
kafkaSslCACertFile = value
8992
}
9093

94+
if value := os.Getenv("KAFKA_SECURITY_PROTOCOL"); value != "" {
95+
kafkaSecurityProtocol = strings.ToLower(value)
96+
}
97+
98+
if value := os.Getenv("KAFKA_SASL_MECHANISM"); value != "" {
99+
kafkaSaslMechanism = value
100+
}
101+
102+
if value := os.Getenv("KAFKA_SASL_USERNAME"); value != "" {
103+
kafkaSaslUsername = value
104+
}
105+
106+
if value := os.Getenv("KAFKA_SASL_PASSWORD"); value != "" {
107+
kafkaSaslPassword = value
108+
}
109+
91110
var err error
92111
serializer, err = parseSerializationFormat(os.Getenv("SERIALIZATION_FORMAT"))
93112
if err != nil {

main.go

+21-3
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,32 @@ func main() {
3737
}
3838

3939
if kafkaSslClientCertFile != "" && kafkaSslClientKeyFile != "" && kafkaSslCACertFile != "" {
40-
kafkaSslValidation = true
41-
kafkaConfig["security.protocol"] = "ssl"
40+
if kafkaSecurityProtocol == "" {
41+
kafkaSecurityProtocol = "ssl"
42+
}
43+
44+
if kafkaSecurityProtocol != "ssl" && kafkaSecurityProtocol != "sasl_ssl" {
45+
logrus.Fatal("invalid config: kafka security protocol is not ssl based but ssl config is provided")
46+
}
47+
48+
kafkaConfig["security.protocol"] = kafkaSecurityProtocol
4249
kafkaConfig["ssl.ca.location"] = kafkaSslCACertFile // CA certificate file for verifying the broker's certificate.
4350
kafkaConfig["ssl.certificate.location"] = kafkaSslClientCertFile // Client's certificate
4451
kafkaConfig["ssl.key.location"] = kafkaSslClientKeyFile // Client's key
4552
kafkaConfig["ssl.key.password"] = kafkaSslClientKeyPass // Key password, if any.
4653
}
4754

55+
if kafkaSaslMechanism != "" && kafkaSaslUsername != "" && kafkaSaslPassword != "" {
56+
if kafkaSecurityProtocol != "sasl_ssl" && kafkaSecurityProtocol != "sasl_plaintext" {
57+
logrus.Fatal("invalid config: kafka security protocol is not sasl based but sasl config is provided")
58+
}
59+
60+
kafkaConfig["security.protocol"] = kafkaSecurityProtocol
61+
kafkaConfig["sasl.mechanism"] = kafkaSaslMechanism
62+
kafkaConfig["sasl.username"] = kafkaSaslUsername
63+
kafkaConfig["sasl.password"] = kafkaSaslPassword
64+
}
65+
4866
producer, err := kafka.NewProducer(&kafkaConfig)
4967

5068
if err != nil {
@@ -66,5 +84,5 @@ func main() {
6684
r.POST("/receive", receiveHandler(producer, serializer))
6785
}
6886

69-
r.Run()
87+
log.Fatal(r.Run())
7088
}

0 commit comments

Comments
 (0)