diff --git a/.gitignore b/.gitignore index c21215b..51e46d2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea/ unit_coverage.html unit_coverage.out +resource/ \ No newline at end of file diff --git a/README.md b/README.md index 0874bfa..56b5646 100644 --- a/README.md +++ b/README.md @@ -93,13 +93,13 @@ func main() { | config | description | default | example | |------------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------| -| `cron` | Cron expression when exception consumer starts to work at | - | */1 * * * * | -| `duration` | Work duration exception consumer actively consuming messages | - | 20s, 15m, 1h | -| `topic` | Exception topic names | - | exception-topic | -| `groupId` | Exception consumer group id | - | exception-consumer-group | -| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | warn | | -| `maxRetry` | Maximum retry value for attempting to retry a message | 3 | | -| `concurrency` | Number of goroutines used at listeners | 1 | | +| `kafka.logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | warn | | +| `kafka.consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * | +| `kafka.consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h | +| `kafka.consumer.topic` | Exception topic names | | exception-topic | +| `kafka.consumer.groupId` | Exception consumer group id | | exception-consumer-group | +| `kafka.consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | | +| `kafka.consumer.concurrency` | Number of goroutines used at listeners | 1 | | | `kafka.consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#ReaderConfig.MinBytes) | 10e3 | | | `kafka.consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#ReaderConfig.MaxBytes) | 10e6 | | | `kafka.consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#ReaderConfig.MaxWait) | 2s | | @@ -111,6 +111,13 @@ func main() { | `kafka.consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#ReaderConfig.RetentionTime) | 24h | | | `kafka.producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#Writer.BatchSize) | 100 | | | `kafka.producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#Writer.BatchTimeout) | 500us | | +| `kafka.sasl.enabled` | It enables sasl authentication mechanism | false | | +| `kafka.sasl.authType` | Currently we only support `SCRAM` | "" | | +| `kafka.sasl.username` | SCRAM username | "" | | +| `kafka.sasl.password` | SCRAM password | "" | | +| `kafka.sasl.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | +| `kafka.sasl.intermediateCAPath` | | "" | | +| `kafka.sasl.rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#RackAffinityGroupBalancer) | "" | | ## Contribute diff --git a/cronsumer.go b/cronsumer.go index 5b47ef4..bc278ee 100644 --- a/cronsumer.go +++ b/cronsumer.go @@ -16,14 +16,14 @@ type Cronsumer interface { Stop() } -// NewKafkaCronsumer returns the newly created kafka consumer consumer instance. +// NewCronsumer returns the newly created kafka consumer instance. // config.KafkaConfig specifies cron, duration and so many parameters. // ConsumeFn describes how to consume messages from specified topic. func NewCronsumer(cfg *model.KafkaConfig, c ConsumeFn) Cronsumer { return internal.NewCronsumer(cfg, c) } -// NewKafkaCronsumerSchedulerWithLogger returns the newly created kafka consumer consumer instance. +// NewCronsumerWithLogger returns the newly created kafka consumer instance. // config.KafkaConfig specifies cron, duration and so many parameters. // ConsumeFn describes how to consume messages from specified topic. // logger describes log interface for injecting custom log implementation diff --git a/go.mod b/go.mod index ecd1212..869c951 100644 --- a/go.mod +++ b/go.mod @@ -4,19 +4,21 @@ go 1.19 require ( github.com/robfig/cron/v3 v3.0.1 - github.com/segmentio/kafka-go v0.4.32 + github.com/segmentio/kafka-go v0.4.35 go.uber.org/zap v1.23.0 gopkg.in/yaml.v3 v3.0.1 ) require ( - github.com/klauspost/compress v1.14.2 // indirect - github.com/pierrec/lz4/v4 v4.1.14 // indirect + github.com/klauspost/compress v1.15.7 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/xdg/scram v1.0.5 // indirect + github.com/xdg/stringprep v1.0.3 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.1.0 // indirect - golang.org/x/net v0.1.0 // indirect + golang.org/x/text v0.4.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/go.sum b/go.sum index f44f63f..7a4d635 100644 --- a/go.sum +++ b/go.sum @@ -2,32 +2,34 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= -github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.7 h1:7cgTQxJCU/vy+oP/E3B9RGbQTgbiVzIJWIKOLoAsPok= +github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE= -github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/segmentio/kafka-go v0.4.32 h1:Ohr+9E+kDv/Ld2UPJN9hnKZRd2qgiqCmI8v2e1qlfLM= -github.com/segmentio/kafka-go v0.4.32/go.mod h1:JAPPIiY3MQIwVHj64CWOP0LsFFfQ7H0w69kuoxnMIS0= +github.com/segmentio/kafka-go v0.4.35 h1:TAsQ7q1SjS39PcFvU0zDJhCuVAxHomy7xOAfbdSuhzs= +github.com/segmentio/kafka-go v0.4.35/go.mod h1:GAjxBQJdQMB5zfNA21AhpaqOB2Mu+w3De4ni3Gbm8y0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= -github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= -github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw= +github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4= +github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -38,8 +40,8 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -48,19 +50,27 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= -golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -72,6 +82,5 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/kafka_consumer.go b/internal/kafka_consumer.go index f442bc3..41070d0 100644 --- a/internal/kafka_consumer.go +++ b/internal/kafka_consumer.go @@ -41,6 +41,17 @@ func newConsumer(kafkaConfig *model.KafkaConfig, logger model.Logger) *kafkaCons RetentionTime: kafkaConfig.Consumer.RetentionTime, } + if kafkaConfig.SASL.Enabled { + readerConfig.Dialer = &kafka.Dialer{ + TLS: createTLSConfig(kafkaConfig.SASL), + SASLMechanism: getSaslMechanism(kafkaConfig.SASL), + } + + if kafkaConfig.SASL.Rack != "" { + readerConfig.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: kafkaConfig.SASL.Rack}} + } + } + return &kafkaConsumer{ consumer: kafka.NewReader(readerConfig), logger: logger, diff --git a/internal/kafka_producer.go b/internal/kafka_producer.go index 355a2ca..b942e20 100644 --- a/internal/kafka_producer.go +++ b/internal/kafka_producer.go @@ -35,6 +35,13 @@ func newProducer(kafkaConfig *model.KafkaConfig, l model.Logger) Producer { AllowAutoTopicCreation: true, } + if kafkaConfig.SASL.Enabled { + producer.Transport = &kafka.Transport{ + TLS: createTLSConfig(kafkaConfig.SASL), + SASL: getSaslMechanism(kafkaConfig.SASL), + } + } + return &kafkaProducer{ w: producer, logger: l, diff --git a/internal/secure.go b/internal/secure.go new file mode 100644 index 0000000..ed278cb --- /dev/null +++ b/internal/secure.go @@ -0,0 +1,42 @@ +package internal + +import ( + "crypto/tls" + "crypto/x509" + "os" + + "github.com/Trendyol/kafka-cronsumer/model" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/scram" +) + +func createTLSConfig(sasl model.SASLConfig) *tls.Config { + rootCA, err := os.ReadFile(sasl.RootCAPath) + if err != nil { + panic("Error while reading Root CA file: " + sasl.RootCAPath + " error: " + err.Error()) + } + + interCA, err := os.ReadFile(sasl.IntermediateCAPath) + if err != nil { + panic("Error while reading Intermediate CA file: " + sasl.IntermediateCAPath + " error: " + err.Error()) + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(rootCA) + caCertPool.AppendCertsFromPEM(interCA) + + return &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS13, + } +} + +// TODO: we can support `plain` authentication type +// link: https://github.com/segmentio/kafka-go#plain +func getSaslMechanism(sasl model.SASLConfig) sasl.Mechanism { + mechanism, err := scram.Mechanism(scram.SHA512, sasl.Username, sasl.Password) + if err != nil { + panic("Error while creating SCRAM configuration, error: " + err.Error()) + } + return mechanism +} diff --git a/model/config.go b/model/config.go index c66d3a1..c207a57 100644 --- a/model/config.go +++ b/model/config.go @@ -21,9 +21,20 @@ type KafkaConfig struct { Brokers []string `yaml:"brokers"` Consumer ConsumerConfig `yaml:"consumer"` Producer ProducerConfig `yaml:"producer"` + SASL SASLConfig `yaml:"sasl"` LogLevel Level `yaml:"logLevel"` } +type SASLConfig struct { + Enabled bool `yaml:"enabled"` + AuthType string `yaml:"authType"` // plain or scram + Username string `yaml:"username"` + Password string `yaml:"password"` + RootCAPath string `yaml:"rootCAPath"` + IntermediateCAPath string `yaml:"intermediateCAPath"` + Rack string `yaml:"rack"` +} + type ConsumerConfig struct { GroupID string `yaml:"groupId"` Topic string `yaml:"topic"`