Skip to content

Commit

Permalink
feat: add sasl support (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
A.Samet İleri authored Oct 25, 2022
1 parent 580c9a6 commit 6f89fdf
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/
unit_coverage.html
unit_coverage.out
resource/
21 changes: 14 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ func main() {

| config | description | default | example |
|------------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------|
| `cron` | Cron expression when exception consumer starts to work at | - | */1 * * * * |
| `duration` | Work duration exception consumer actively consuming messages | - | 20s, 15m, 1h |
| `topic` | Exception topic names | - | exception-topic |
| `groupId` | Exception consumer group id | - | exception-consumer-group |
| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | warn | |
| `maxRetry` | Maximum retry value for attempting to retry a message | 3 | |
| `concurrency` | Number of goroutines used at listeners | 1 | |
| `kafka.logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | warn | |
| `kafka.consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * |
| `kafka.consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h |
| `kafka.consumer.topic` | Exception topic names | | exception-topic |
| `kafka.consumer.groupId` | Exception consumer group id | | exception-consumer-group |
| `kafka.consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | |
| `kafka.consumer.concurrency` | Number of goroutines used at listeners | 1 | |
| `kafka.consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#ReaderConfig.MinBytes) | 10e3 | |
| `kafka.consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#ReaderConfig.MaxBytes) | 10e6 | |
| `kafka.consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#ReaderConfig.MaxWait) | 2s | |
Expand All @@ -111,6 +111,13 @@ func main() {
| `kafka.consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#ReaderConfig.RetentionTime) | 24h | |
| `kafka.producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#Writer.BatchSize) | 100 | |
| `kafka.producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#Writer.BatchTimeout) | 500us | |
| `kafka.sasl.enabled` | It enables sasl authentication mechanism | false | |
| `kafka.sasl.authType` | Currently we only support `SCRAM` | "" | |
| `kafka.sasl.username` | SCRAM username | "" | |
| `kafka.sasl.password` | SCRAM password | "" | |
| `kafka.sasl.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | |
| `kafka.sasl.intermediateCAPath` | | "" | |
| `kafka.sasl.rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.32#RackAffinityGroupBalancer) | "" | |

## Contribute

Expand Down
4 changes: 2 additions & 2 deletions cronsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ type Cronsumer interface {
Stop()
}

// NewKafkaCronsumer returns the newly created kafka consumer consumer instance.
// NewCronsumer returns the newly created kafka consumer instance.
// config.KafkaConfig specifies cron, duration and so many parameters.
// ConsumeFn describes how to consume messages from specified topic.
func NewCronsumer(cfg *model.KafkaConfig, c ConsumeFn) Cronsumer {
return internal.NewCronsumer(cfg, c)
}

// NewKafkaCronsumerSchedulerWithLogger returns the newly created kafka consumer consumer instance.
// NewCronsumerWithLogger returns the newly created kafka consumer instance.
// config.KafkaConfig specifies cron, duration and so many parameters.
// ConsumeFn describes how to consume messages from specified topic.
// logger describes log interface for injecting custom log implementation
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@ go 1.19

require (
github.com/robfig/cron/v3 v3.0.1
github.com/segmentio/kafka-go v0.4.32
github.com/segmentio/kafka-go v0.4.35
go.uber.org/zap v1.23.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/klauspost/compress v1.14.2 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/klauspost/compress v1.15.7 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/text v0.4.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
35 changes: 22 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,34 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.7 h1:7cgTQxJCU/vy+oP/E3B9RGbQTgbiVzIJWIKOLoAsPok=
github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/segmentio/kafka-go v0.4.32 h1:Ohr+9E+kDv/Ld2UPJN9hnKZRd2qgiqCmI8v2e1qlfLM=
github.com/segmentio/kafka-go v0.4.32/go.mod h1:JAPPIiY3MQIwVHj64CWOP0LsFFfQ7H0w69kuoxnMIS0=
github.com/segmentio/kafka-go v0.4.35 h1:TAsQ7q1SjS39PcFvU0zDJhCuVAxHomy7xOAfbdSuhzs=
github.com/segmentio/kafka-go v0.4.35/go.mod h1:GAjxBQJdQMB5zfNA21AhpaqOB2Mu+w3De4ni3Gbm8y0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand All @@ -38,8 +40,8 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand All @@ -48,19 +50,27 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand All @@ -72,6 +82,5 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
11 changes: 11 additions & 0 deletions internal/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ func newConsumer(kafkaConfig *model.KafkaConfig, logger model.Logger) *kafkaCons
RetentionTime: kafkaConfig.Consumer.RetentionTime,
}

if kafkaConfig.SASL.Enabled {
readerConfig.Dialer = &kafka.Dialer{
TLS: createTLSConfig(kafkaConfig.SASL),
SASLMechanism: getSaslMechanism(kafkaConfig.SASL),
}

if kafkaConfig.SASL.Rack != "" {
readerConfig.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: kafkaConfig.SASL.Rack}}
}
}

return &kafkaConsumer{
consumer: kafka.NewReader(readerConfig),
logger: logger,
Expand Down
7 changes: 7 additions & 0 deletions internal/kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ func newProducer(kafkaConfig *model.KafkaConfig, l model.Logger) Producer {
AllowAutoTopicCreation: true,
}

if kafkaConfig.SASL.Enabled {
producer.Transport = &kafka.Transport{
TLS: createTLSConfig(kafkaConfig.SASL),
SASL: getSaslMechanism(kafkaConfig.SASL),
}
}

return &kafkaProducer{
w: producer,
logger: l,
Expand Down
42 changes: 42 additions & 0 deletions internal/secure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package internal

import (
"crypto/tls"
"crypto/x509"
"os"

"github.com/Trendyol/kafka-cronsumer/model"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/scram"
)

func createTLSConfig(sasl model.SASLConfig) *tls.Config {
rootCA, err := os.ReadFile(sasl.RootCAPath)
if err != nil {
panic("Error while reading Root CA file: " + sasl.RootCAPath + " error: " + err.Error())
}

interCA, err := os.ReadFile(sasl.IntermediateCAPath)
if err != nil {
panic("Error while reading Intermediate CA file: " + sasl.IntermediateCAPath + " error: " + err.Error())
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(rootCA)
caCertPool.AppendCertsFromPEM(interCA)

return &tls.Config{
RootCAs: caCertPool,
MinVersion: tls.VersionTLS13,
}
}

// TODO: we can support `plain` authentication type
// link: https://github.com/segmentio/kafka-go#plain
func getSaslMechanism(sasl model.SASLConfig) sasl.Mechanism {
mechanism, err := scram.Mechanism(scram.SHA512, sasl.Username, sasl.Password)
if err != nil {
panic("Error while creating SCRAM configuration, error: " + err.Error())
}
return mechanism
}
11 changes: 11 additions & 0 deletions model/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,20 @@ type KafkaConfig struct {
Brokers []string `yaml:"brokers"`
Consumer ConsumerConfig `yaml:"consumer"`
Producer ProducerConfig `yaml:"producer"`
SASL SASLConfig `yaml:"sasl"`
LogLevel Level `yaml:"logLevel"`
}

type SASLConfig struct {
Enabled bool `yaml:"enabled"`
AuthType string `yaml:"authType"` // plain or scram
Username string `yaml:"username"`
Password string `yaml:"password"`
RootCAPath string `yaml:"rootCAPath"`
IntermediateCAPath string `yaml:"intermediateCAPath"`
Rack string `yaml:"rack"`
}

type ConsumerConfig struct {
GroupID string `yaml:"groupId"`
Topic string `yaml:"topic"`
Expand Down

0 comments on commit 6f89fdf

Please sign in to comment.