[internal/kafka] define a common validation function for kafka authentication #27486
Closed as not planned
Closed as not planned
Description
Component(s)
internal/kafka
Is your feature request related to a problem? Please describe.
Speaking of validation, how about defining a common validation function for `kafka.Authentication` in `internal/kafka` package, which can be reused by `kafkametricsreceiver`, `kafkareceiver` and `kafkaexporter`.
Originally posted by @fatsheep9146 in #27289 (comment)
Currently, we have validation logic inside kafka configure authentication refer to configureSASL.
In addition, there is similar validation logic between the validateSASLConfig
and configureSASL
functions.
We can define a common validation function for kafka authentication which can be reused by kafkametricsreceiver
, kafkareceiver
and kafkaexporter
, to make the validation and configuration semantics clearer, and to remove redundant validation logic.
Describe the solution you'd like
- Define a common validation function
ValidateAuthentication
,validateTLS
andvalidateSASL
for kafka authentication
// ValidateAuthentication validates authentication.
func ValidateAuthentication(config Authentication) error {
var errs error
if config.TLS != nil {
_, err := validateTLS(*config.TLS)
errs = multierr.Append(errs, err)
}
if config.SASL != nil {
err := validateSASL(*config.SASL)
errs = multierr.Append(errs, err)
}
return errs
}
func validateTLS(c configtls.TLSClientSetting) (*tls.Config, error) {
tlsConfig, err := c.LoadTLSConfig()
if err != nil {
return nil, fmt.Errorf("error loading tls config: %w", err)
}
return tlsConfig, nil
}
func validateSASL(c SASLConfig) error {
if c.Username == "" {
return fmt.Errorf("auth.sasl.username is required")
}
if c.Password == "" {
return fmt.Errorf("auth.sasl.password is required")
}
switch c.Mechanism {
case "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256", "SCRAM-SHA-512":
// Do nothing, valid mechanism
default:
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
}
if c.Version < 0 || c.Version > 1 {
return fmt.Errorf("auth.sasl.version has to be either 0 or 1. configured value %v", c.Version)
}
return nil
}
- Chore
configureSASL
andconfigureTLS
functions as shown below, first validation, then configuration:
func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
if err := validateSASL(config); err != nil {
return err
}
// configuration
......
return nil
}
func configureTLS(config configtls.TLSClientSetting, saramaConfig *sarama.Config) error {
tlsConfig, err := validateTLS(config)
if err != nil {
return err
}
// configuration
......
return nil
}
- Replace
validateSASLConfig
withValidateAuthentication
inValidate
// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
// kafka exporter config validation
......
return ValidateAuthentication(cfg.Authentication)
}
Describe alternatives you've considered
No response
Additional context
No response