Skip to content

[internal/kafka] define a common validation function for kafka authentication #27486

Closed as not planned
@sakulali

Description

Component(s)

internal/kafka

Is your feature request related to a problem? Please describe.

          Speaking of validation, how about defining a common validation function for `kafka.Authentication` in `internal/kafka` package, which can be reused by `kafkametricsreceiver`, `kafkareceiver` and `kafkaexporter`.

Originally posted by @fatsheep9146 in #27289 (comment)

Currently, we have validation logic inside kafka configure authentication refer to configureSASL.
In addition, there is similar validation logic between the validateSASLConfig and configureSASL functions.
We can define a common validation function for kafka authentication which can be reused by kafkametricsreceiver, kafkareceiver and kafkaexporter, to make the validation and configuration semantics clearer, and to remove redundant validation logic.

Describe the solution you'd like

  1. Define a common validation function ValidateAuthentication , validateTLS and validateSASL for kafka authentication
// ValidateAuthentication validates authentication.
func ValidateAuthentication(config Authentication) error {
	var errs error
	if config.TLS != nil {
		_, err := validateTLS(*config.TLS)
		errs = multierr.Append(errs, err)
	}
	if config.SASL != nil {
		err := validateSASL(*config.SASL)
		errs = multierr.Append(errs, err)
	}
	return errs
}

func validateTLS(c configtls.TLSClientSetting) (*tls.Config, error) {
	tlsConfig, err := c.LoadTLSConfig()
	if err != nil {
		return nil, fmt.Errorf("error loading tls config: %w", err)
	}

	return tlsConfig, nil
}

func validateSASL(c SASLConfig) error {
	if c.Username == "" {
		return fmt.Errorf("auth.sasl.username is required")
	}

	if c.Password == "" {
		return fmt.Errorf("auth.sasl.password is required")
	}

	switch c.Mechanism {
	case "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256", "SCRAM-SHA-512":
		// Do nothing, valid mechanism
	default:
		return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
	}

	if c.Version < 0 || c.Version > 1 {
		return fmt.Errorf("auth.sasl.version has to be either 0 or 1. configured value %v", c.Version)
	}

	return nil
}
  1. Chore configureSASL and configureTLS functions as shown below, first validation, then configuration:
func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
	if err := validateSASL(config); err != nil {
		return err
	}

	// configuration
        ......
	return nil
}

func configureTLS(config configtls.TLSClientSetting, saramaConfig *sarama.Config) error {
	tlsConfig, err := validateTLS(config)
	if err != nil {
		return err
	}

        // configuration
        ......
	return nil
}
  1. Replace validateSASLConfig with ValidateAuthentication in Validate
// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
	// kafka exporter config validation
        ......
	return ValidateAuthentication(cfg.Authentication)
}

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions