Skip to content

Commit a1de4fa

Browse files
committed
Amazon SQS
1 parent ab15b4c commit a1de4fa

File tree

383 files changed

+73096
-22
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

383 files changed

+73096
-22
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
11
## Changelog
22

3+
### [v0.3.0](https://github.com/grepplabs/mqtt-proxy/releases/tag/v0.3.0) - 11 April 2023
4+
5+
* Update confluent-kafka-go to v2.0.2
6+
* Build docker `ARM64` image
7+
8+
### [v0.2.0](https://github.com/grepplabs/mqtt-proxy/releases/tag/v0.2.0) - 29 December 2022
9+
10+
* Add MQTT 5.0 support
11+
312
### [v0.1.1](https://github.com/grepplabs/mqtt-proxy/releases/tag/v0.1.1) - 6 November 2022
13+
414
* Update dependencies
515
* Migrate from [kingpin](https://github.com/alecthomas/kingpin) to [kong](https://github.com/alecthomas/kong)
616

717
### [v0.1.0](https://github.com/grepplabs/mqtt-proxy/releases/tag/v0.1.0) - 1 November 2022
18+
819
* Add server certificates rotation
920
* Add client certificate revocation list

README.md

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ MQTT Proxy allows MQTT clients to send messages to other messaging systems
1818
* Publisher
1919
* [x] Noop
2020
* [x] [Apache Kafka](https://kafka.apache.org/)
21+
* [x] [Amazon SQS](https://aws.amazon.com/sqs/)
22+
* [ ] [Amazon SNS](https://aws.amazon.com/sns/)
23+
* [ ] [Amazon Kinesis](https://aws.amazon.com/kinesis/)
2124
* [ ] Others
2225
* Authentication
2326
* [x] Noop
@@ -113,8 +116,6 @@ prerequisites
113116
watch -c 'curl -s localhost:9090/metrics | grep mqtt | egrep -v '^#''
114117
```
115118
116-
5. see also [cp-kafka](scripts/cp-kafka/Makefile) with SASL_PLAINTEXT and SASL_SSL configuration
117-
118119
### publish to Amazon MSK
119120
120121
1. provision test MSK and EC2 running in [podman](https://podman.io/) 2 proxy containers
@@ -138,6 +139,27 @@ prerequisites
138139
mosquitto_pub -m "on" -t "dummy" -k 20 -i mqtt-proxy.clientv --repeat 1 -q 1 -h <ec2-ip> -p 1884
139140
```
140141
142+
### SQS publisher
143+
144+
1. Create AWS SQS `test1` and `test2.fifo` queues
145+
2. Build and start MQTT Proxy
146+
147+
```
148+
make build
149+
./mqtt-proxy server \
150+
--mqtt.publisher.name=sqs \
151+
--mqtt.publisher.message-format=json \
152+
--mqtt.publisher.sqs.queue-mappings=test1='^dummy$' \
153+
--mqtt.publisher.sqs.default-queue=test2.fifo \
154+
--mqtt.publisher.sqs.aws-profile=admin-dev
155+
```
156+
157+
3. publish
158+
159+
```
160+
mosquitto_pub -L mqtt://localhost:1883/dummy -m "test qos 0" --repeat 1 -q 2
161+
```
162+
141163
### plain authenticator
142164
143165
1. start server with `plain` authenticator

apis/publish.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import "context"
77
type PublishID interface{}
88

99
type PublishRequest struct {
10-
Dup bool
11-
Qos byte
12-
Retain bool
13-
TopicName string
14-
MessageID uint16
15-
Message []byte
10+
Dup bool `json:"dup"`
11+
Qos byte `json:"qos"`
12+
Retain bool `json:"retain"`
13+
TopicName string `json:"topic_name"`
14+
MessageID uint16 `json:"packet_id"`
15+
Message []byte `json:"payload"`
1616
}
1717

1818
type PublishResponse struct {

cmd/server.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
pubinst "github.com/grepplabs/mqtt-proxy/pkg/publisher/instrument"
1717
pubkafka "github.com/grepplabs/mqtt-proxy/pkg/publisher/kafka"
1818
pubnoop "github.com/grepplabs/mqtt-proxy/pkg/publisher/noop"
19+
pubsqs "github.com/grepplabs/mqtt-proxy/pkg/publisher/sqs"
1920
httpserver "github.com/grepplabs/mqtt-proxy/pkg/server/http"
2021
mqttserver "github.com/grepplabs/mqtt-proxy/pkg/server/mqtt"
2122
servertls "github.com/grepplabs/mqtt-proxy/pkg/tls"
@@ -102,10 +103,22 @@ func runServer(
102103
pubkafka.WithConfigMap(cfg.MQTT.Publisher.Kafka.ConfArgs.ConfigMap()),
103104
pubkafka.WithGracePeriod(cfg.MQTT.Publisher.Kafka.GracePeriod),
104105
pubkafka.WithWorkers(cfg.MQTT.Publisher.Kafka.Workers),
106+
pubkafka.WithMessageFormat(cfg.MQTT.Publisher.MessageFormat),
105107
)
106108
if err != nil {
107109
return fmt.Errorf("setup kafka publisher: %w", err)
108110
}
111+
case config.PublisherSQS:
112+
publisher, err = pubsqs.New(logger, registry,
113+
pubsqs.WithAWSProfile(cfg.MQTT.Publisher.SQS.AWSProfile),
114+
pubsqs.WithAWSRegion(cfg.MQTT.Publisher.SQS.AWSRegion),
115+
pubsqs.WithQueueMappings(cfg.MQTT.Publisher.SQS.QueueMappings),
116+
pubsqs.WithDefaultQueue(cfg.MQTT.Publisher.SQS.DefaultQueue),
117+
pubsqs.WithMessageFormat(cfg.MQTT.Publisher.MessageFormat),
118+
)
119+
if err != nil {
120+
return fmt.Errorf("setup sqs publisher: %w", err)
121+
}
109122
default:
110123
return fmt.Errorf("unknown publisher %s", cfg.MQTT.Publisher.Name)
111124
}

go.mod

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ go 1.20
55
require (
66
github.com/alecthomas/kong v0.7.0
77
github.com/alecthomas/kong-yaml v0.1.1
8+
github.com/aws/aws-sdk-go-v2 v1.17.8
9+
github.com/aws/aws-sdk-go-v2/config v1.18.21
10+
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.8
11+
github.com/aws/aws-sdk-go-v2/service/sts v1.18.9
812
github.com/confluentinc/confluent-kafka-go/v2 v2.0.2
913
github.com/go-playground/validator/v10 v10.11.1
1014
github.com/oklog/run v1.1.0
@@ -18,6 +22,15 @@ require (
1822
)
1923

2024
require (
25+
github.com/aws/aws-sdk-go-v2/credentials v1.13.20 // indirect
26+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2 // indirect
27+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32 // indirect
28+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.26 // indirect
29+
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.33 // indirect
30+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.26 // indirect
31+
github.com/aws/aws-sdk-go-v2/service/sso v1.12.8 // indirect
32+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.8 // indirect
33+
github.com/aws/smithy-go v1.13.5 // indirect
2134
github.com/beorn7/perks v1.0.1 // indirect
2235
github.com/cespare/xxhash/v2 v2.2.0 // indirect
2336
github.com/davecgh/go-spew v1.1.1 // indirect

go.sum

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,32 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
4949
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
5050
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
5151
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
52+
github.com/aws/aws-sdk-go-v2 v1.17.8 h1:GMupCNNI7FARX27L7GjCJM8NgivWbRgpjNI/hOQjFS8=
53+
github.com/aws/aws-sdk-go-v2 v1.17.8/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
54+
github.com/aws/aws-sdk-go-v2/config v1.18.21 h1:ENTXWKwE8b9YXgQCsruGLhvA9bhg+RqAsL9XEMEsa2c=
55+
github.com/aws/aws-sdk-go-v2/config v1.18.21/go.mod h1:+jPQiVPz1diRnjj6VGqWcLK6EzNmQ42l7J3OqGTLsSY=
56+
github.com/aws/aws-sdk-go-v2/credentials v1.13.20 h1:oZCEFcrMppP/CNiS8myzv9JgOzq2s0d3v3MXYil/mxQ=
57+
github.com/aws/aws-sdk-go-v2/credentials v1.13.20/go.mod h1:xtZnXErtbZ8YGXC3+8WfajpMBn5Ga/3ojZdxHq6iI8o=
58+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2 h1:jOzQAesnBFDmz93feqKnsTHsXrlwWORNZMFHMV+WLFU=
59+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2/go.mod h1:cDh1p6XkSGSwSRIArWRc6+UqAQ7x4alQ0QfpVR6f+co=
60+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32 h1:dpbVNUjczQ8Ae3QKHbpHBpfvaVkRdesxpTOe9pTouhU=
61+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32/go.mod h1:RudqOgadTWdcS3t/erPQo24pcVEoYyqj/kKW5Vya21I=
62+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.26 h1:QH2kOS3Ht7x+u0gHCh06CXL/h6G8LQJFpZfFBYBNboo=
63+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.26/go.mod h1:vq86l7956VgFr0/FWQ2BWnK07QC3WYsepKzy33qqY5U=
64+
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.33 h1:HbH1VjUgrCdLJ+4lnnuLI4iVNRvBbBELGaJ5f69ClA8=
65+
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.33/go.mod h1:zG2FcwjQarWaqXSCGpgcr3RSjZ6dHGguZSppUL0XR7Q=
66+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.26 h1:uUt4XctZLhl9wBE1L8lobU3bVN8SNUP7T+olb0bWBO4=
67+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.26/go.mod h1:Bd4C/4PkVGubtNe5iMXu5BNnaBi/9t/UsFspPt4ram8=
68+
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.8 h1:SDZBYFUp70hI2T0z9z+KD1iJBz9jGeT7xgU5hPPC9zs=
69+
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.8/go.mod h1:w058QQWcK1MLEnIrD0DmkQtSvC1pLY0EWRQsPXPWppM=
70+
github.com/aws/aws-sdk-go-v2/service/sso v1.12.8 h1:5cb3D6xb006bPTqEfCNaEA6PPEfBXxxy4NNeX/44kGk=
71+
github.com/aws/aws-sdk-go-v2/service/sso v1.12.8/go.mod h1:GNIveDnP+aE3jujyUSH5aZ/rktsTM5EvtKnCqBZawdw=
72+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.8 h1:NZaj0ngZMzsubWZbrEFSB4rgSQRbFq38Sd6KBxHuOIU=
73+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.8/go.mod h1:44qFP1g7pfd+U+sQHLPalAPKnyfTZjJsYR4xIwsJy5o=
74+
github.com/aws/aws-sdk-go-v2/service/sts v1.18.9 h1:Qf1aWwnsNkyAoqDqmdM3nHwN78XQjec27LjM6b9vyfI=
75+
github.com/aws/aws-sdk-go-v2/service/sts v1.18.9/go.mod h1:yyW88BEPXA2fGFyI2KCcZC3dNpiT0CZAHaF+i656/tQ=
76+
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
77+
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
5278
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
5379
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
5480
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -154,6 +180,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
154180
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
155181
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
156182
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
183+
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
157184
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
158185
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
159186
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -185,6 +212,8 @@ github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+
185212
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
186213
github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
187214
github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
215+
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
216+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
188217
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
189218
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
190219
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=

pkg/config/config.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
const (
1616
PublisherNoop = "noop"
1717
PublisherKafka = "kafka"
18+
PublisherSQS = "sqs"
1819
)
1920

2021
// authenticator names
@@ -23,6 +24,13 @@ const (
2324
AuthPlain = "plain"
2425
)
2526

27+
// message format
28+
const (
29+
MessageFormatPlain = "plain"
30+
MessageFormatBase64 = "base64"
31+
MessageFormatJson = "json"
32+
)
33+
2634
// server certificate source
2735
const (
2836
CertSourceFile = "file"
@@ -72,15 +80,22 @@ type Server struct {
7280
} `embed:"" prefix:"auth."`
7381
} `embed:"" prefix:"handler."`
7482
Publisher struct {
75-
Name string `default:"${PublisherDefault}" enum:"${PublisherEnum}" help:"Publisher name. One of: [${PublisherEnum}]"`
76-
Kafka struct {
83+
Name string `default:"${PublisherDefault}" enum:"${PublisherEnum}" help:"Publisher name. One of: [${PublisherEnum}]"`
84+
MessageFormat string `default:"${MessageFormatDefault}" enum:"${MessageFormatEnum}" help:"Message format. One of: [${MessageFormatEnum}]"`
85+
Kafka struct {
7786
BootstrapServers string `default:"localhost:9092" help:"Kafka bootstrap servers."`
7887
GracePeriod time.Duration `default:"10s" help:"Time to wait after an interrupt received for Kafka publisher." validate:"gte=0"`
7988
ConfArgs KafkaConfigArgs `name:"config" placeholder:"PROP=VAL" help:"Comma separated list of properties."`
8089
DefaultTopic string `default:"" help:"Default Kafka topic for MQTT publish messages."`
8190
TopicMappings TopicMappings `placeholder:"TOPIC=REGEX" help:"Comma separated list of Kafka topic to MQTT topic mappings."`
8291
Workers int `default:"1" help:"Number of kafka publisher workers." validate:"gte=1"`
8392
} `embed:"" prefix:"kafka."`
93+
SQS struct {
94+
AWSProfile string `default:"" help:"AWS Profile."`
95+
AWSRegion string `default:"" help:"AWS Region."`
96+
DefaultQueue string `default:"" help:"Default Kafka topic for MQTT publish messages."`
97+
QueueMappings TopicMappings `placeholder:"QUEUE=REGEX" help:"Comma separated list of SQS queue to MQTT topic mappings."`
98+
} `embed:"" prefix:"sqs."`
8499
} `embed:"" prefix:"publisher."`
85100
} `embed:"" prefix:"mqtt."`
86101
}
@@ -94,7 +109,9 @@ func ServerVars() kong.Vars {
94109
"AuthDefault": AuthNoop,
95110
"AuthEnum": strings.Join([]string{AuthNoop, AuthPlain}, ", "),
96111
"PublisherDefault": PublisherNoop,
97-
"PublisherEnum": strings.Join([]string{PublisherNoop, PublisherKafka}, ", "),
112+
"PublisherEnum": strings.Join([]string{PublisherNoop, PublisherKafka, PublisherSQS}, ", "),
113+
"MessageFormatDefault": MessageFormatPlain,
114+
"MessageFormatEnum": strings.Join([]string{MessageFormatPlain, MessageFormatBase64, MessageFormatJson}, ", "),
98115
}
99116
}
100117

pkg/publisher/kafka/kafka.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"github.com/grepplabs/mqtt-proxy/pkg/util"
78
"reflect"
89
"strconv"
910
"strings"
@@ -25,6 +26,7 @@ const (
2526
mqttDupHeader = "mqtt.dup"
2627
mqttRetainHeader = "mqtt.retain"
2728
mqttMsgIDHeader = "mqtt.packet.id"
29+
mqttMsgFmtHeader = "mqtt.fmt"
2830
)
2931
const (
3032
shutdownPollInterval = 500 * time.Millisecond
@@ -44,15 +46,11 @@ func (k *kafkaProducer) Close() {
4446
}
4547

4648
type Publisher struct {
47-
logger log.Logger
48-
49-
producers map[byte]*kafkaProducer
50-
51-
inShutdown atomic.Bool
52-
49+
logger log.Logger
50+
producers map[byte]*kafkaProducer
51+
inShutdown atomic.Bool
5352
workersDone *runtime.DoneChannel
54-
55-
opts options
53+
opts options
5654
}
5755

5856
func New(logger log.Logger, _ *prometheus.Registry, opts ...Option) (*Publisher, error) {
@@ -236,11 +234,18 @@ func (s *Publisher) newKafkaMessage(req *apis.PublishRequest, opaque interface{}
236234
{Key: mqttDupHeader, Value: []byte(strconv.FormatBool(req.Dup))},
237235
{Key: mqttRetainHeader, Value: []byte(strconv.FormatBool(req.Retain))},
238236
{Key: mqttMsgIDHeader, Value: []byte(strconv.FormatUint(uint64(req.MessageID), 10))},
237+
{Key: mqttMsgFmtHeader, Value: []byte(s.opts.messageFormat)},
239238
}
239+
240+
message, err := util.GetMessageBody(s.opts.messageFormat, req)
241+
if err != nil {
242+
return nil, err
243+
}
244+
240245
return &kafka.Message{
241246
TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny},
242247
Key: []byte(req.TopicName),
243-
Value: req.Message,
248+
Value: message,
244249
Opaque: opaque,
245250
Headers: headers,
246251
}, nil
@@ -255,7 +260,7 @@ func (s *Publisher) getKafkaTopic(mqttTopic string) (string, error) {
255260
if s.opts.defaultTopic != "" {
256261
return s.opts.defaultTopic, nil
257262
}
258-
return "", fmt.Errorf("Kafka topic not found for MQTT topic %s", mqttTopic)
263+
return "", fmt.Errorf("kafka topic not found for MQTT topic %s", mqttTopic)
259264
}
260265

261266
func (s *Publisher) Publish(ctx context.Context, request *apis.PublishRequest) (*apis.PublishResponse, error) {

pkg/publisher/kafka/kafka_it_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"context"
55
"github.com/grepplabs/mqtt-proxy/apis"
6+
"github.com/grepplabs/mqtt-proxy/pkg/config"
67
"github.com/oklog/run"
78
"os"
89
"testing"
@@ -30,7 +31,8 @@ func newTestPublisherOrExit() *Publisher {
3031
WithBootstrapServers(bootstrapServers),
3132
WithGracePeriod(60*time.Second),
3233
WithConfigMap(defaultConfig),
33-
WithDefaultTopic("mqtt-test"))
34+
WithDefaultTopic("mqtt-test"),
35+
WithMessageFormat(config.MessageFormatPlain))
3436

3537
if err != nil {
3638
logger.WithError(err).Errorf("kafka publisher creation failed")

pkg/publisher/kafka/option.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type options struct {
1717

1818
defaultTopic string
1919
topicMappings config.TopicMappings
20+
messageFormat string
2021
}
2122

2223
func (o options) validate() error {
@@ -29,6 +30,9 @@ func (o options) validate() error {
2930
if o.defaultTopic == "" && len(o.topicMappings.Mappings) == 0 {
3031
return errors.New("kafka default topic or topic mappings must be provided")
3132
}
33+
if o.messageFormat == "" {
34+
return errors.New("publisher message format must not be empty")
35+
}
3236
return nil
3337
}
3438

@@ -77,3 +81,9 @@ func WithWorkers(v int) Option {
7781
o.workers = v
7882
})
7983
}
84+
85+
func WithMessageFormat(s string) Option {
86+
return optionFunc(func(o *options) {
87+
o.messageFormat = s
88+
})
89+
}

0 commit comments

Comments
 (0)