diff --git a/README.md b/README.md index 98cc951d..f74bd5f4 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,7 @@ receivers: enable: true username: "kube-event-producer" password: "kube-event-producer-password" + mechanism: "sha512" layout: #optional kind: "{{ .InvolvedObject.Kind }}" namespace: "{{ .InvolvedObject.Namespace }}" diff --git a/go.mod b/go.mod index 930a664d..28b541b9 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,11 @@ require ( k8s.io/client-go v0.26.7 ) +require ( + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect +) + require ( cloud.google.com/go v0.107.0 // indirect cloud.google.com/go/compute v1.15.1 // indirect @@ -41,7 +46,6 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.10.1 // indirect - github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/fatih/color v1.15.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect @@ -98,6 +102,7 @@ require ( github.com/sirupsen/logrus v1.9.0 // indirect github.com/spf13/cast v1.3.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/xdg-go/scram v1.1.2 go.opencensus.io v0.24.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.17.0 // indirect diff --git a/go.sum b/go.sum index d442407a..79562f97 100644 --- a/go.sum +++ b/go.sum @@ -67,7 +67,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= -github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= @@ -299,6 +298,12 @@ github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= @@ -394,6 +399,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= diff --git a/pkg/sinks/kafka.go b/pkg/sinks/kafka.go index cb7907fe..a94b1d1f 100644 --- a/pkg/sinks/kafka.go +++ b/pkg/sinks/kafka.go @@ -2,14 +2,19 @@ package sinks import ( "context" + "crypto/sha256" + "crypto/sha512" "crypto/tls" "crypto/x509" "encoding/json" + "fmt" "os" "github.com/Shopify/sarama" "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/rs/zerolog/log" + + "github.com/xdg-go/scram" ) // KafkaConfig is the Kafka producer configuration @@ -28,9 +33,10 @@ type KafkaConfig struct { InsecureSkipVerify bool `yaml:"insecureSkipVerify"` } `yaml:"tls"` SASL struct { - Enable bool `yaml:"enable"` - Username string `yaml:"username"` - Password string `yaml:"password"` + Enable bool `yaml:"enable"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Mechanism string `yaml:"mechanism" default:"plain"` } `yaml:"sasl"` KafkaEncode Avro `yaml:"avro"` } @@ -151,7 +157,7 @@ func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) { caCert, err := os.ReadFile(cfg.TLS.CaFile) if err != nil { - return nil, err + return nil, fmt.Errorf("error loading ca file: %w", err) } caCertPool := x509.NewCertPool() @@ -178,6 +184,17 @@ func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) { saramaConfig.Net.SASL.Enable = true saramaConfig.Net.SASL.User = cfg.SASL.Username saramaConfig.Net.SASL.Password = cfg.SASL.Password + if cfg.SASL.Mechanism == "sha512" { + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + } else if cfg.SASL.Mechanism == "sha256" { + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + } else if cfg.SASL.Mechanism == "plain" || cfg.SASL.Mechanism == "" { + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext + } else { + return nil, fmt.Errorf("invalid scram sha mechanism: %s: can be one of 'sha256', 'sha512' or 'plain'", cfg.SASL.Mechanism) + } } // TODO: Find a generic way to override all other configs @@ -190,3 +207,32 @@ func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) { return producer, nil } + +var ( + SHA256 scram.HashGeneratorFcn = sha256.New + SHA512 scram.HashGeneratorFcn = sha512.New +) + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +}