Skip to content

Commit 85b8eca

Browse files
committed
Test kafka ssl/sasl
1 parent 5823416 commit 85b8eca

28 files changed

+630
-9
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
FROM golang:1.14-alpine3.11 as builder
22

3-
RUN apk add alpine-sdk
3+
RUN apk add alpine-sdk ca-certificates
44

55
WORKDIR "/code"
66
ADD . "/code"

README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,62 @@ prerequisites
5252
5353
3. publish messages using mosquitto client
5454
55+
* proxy using Kafka PLAINTEXT listener
5556
```
5657
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 0" --repeat 1 -q 0
5758
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 1" --repeat 1 -q 1
5859
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 2" --repeat 1 -q 2
5960
```
6061
62+
* proxy using Kafka SSL listener
63+
```
64+
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy-ssl:1884/dummy -m "test qos 0" --repeat 1 -q 0
65+
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy-ssl:1884/dummy -m "test qos 1" --repeat 1 -q 1
66+
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy-ssl:1884/dummy -m "test qos 2" --repeat 1 -q 2
67+
```
68+
6169
4. check the prometheus metrics
6270
6371
```
6472
watch -c 'curl -s localhost:9090/metrics | grep mqtt | egrep -v '^#''
6573
```
6674
75+
5. see also [cp-kafka](scripts/cp-kafka/Makefile) with SASL_PLAINTEXT and SASL_SSL configuration
76+
77+
### publish to Amazon MSK
78+
79+
1. provision test MSK and EC2 running in [podman](https://podman.io/) 2 proxy containers
80+
81+
```
82+
cd scripts/msk
83+
make tf-apply
84+
```
85+
86+
2. create Kafka mqtt-test topic
87+
88+
3. publish
89+
90+
* container connects to MSK PLAINTEXT listener
91+
```
92+
mosquitto_pub -m "on" -t "dummy" -k 20 -i mqtt-proxy.clientv --repeat 1 -q 1 -h <ec2-ip> -p 1883
93+
```
94+
95+
* container connects to MSK TLS listener
96+
```
97+
mosquitto_pub -m "on" -t "dummy" -k 20 -i mqtt-proxy.clientv --repeat 1 -q 1 -h <ec2-ip> -p 1884
98+
```
99+
67100
## Configuration
101+
102+
### Kafka publisher
103+
104+
Kafka producer configuration properties used by [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) should be prefixed with `producer.`
105+
106+
```
107+
--mqtt.publisher.kafka.config=producer.sasl.mechanisms=PLAIN,producer.security.protocol=SASL_SSL,producer.sasl.username=myuser,producer.sasl.password=mypasswd
108+
```
109+
110+
68111
### Examples
69112
70113
- Ignore subscribe / unsubscribe requests

pkg/config/config_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package config
2+
3+
import (
4+
"github.com/confluentinc/confluent-kafka-go/kafka"
5+
"github.com/stretchr/testify/assert"
6+
"testing"
7+
)
8+
9+
func TestKafkaConfArgs(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
input string
13+
output KafkaConfigArgs
14+
err error
15+
}{
16+
{
17+
name: "Set parameters",
18+
input: "bootstrap.servers=localhost:9092,producer.sasl.mechanisms=PLAIN,producer.security.protocol=SASL_SSL,producer.sasl.username=myuser,producer.sasl.password=mypasswd,{qos-0}.producer.hello=property for qos 0",
19+
output: KafkaConfigArgs{
20+
conf: map[string]kafka.ConfigValue{
21+
"bootstrap.servers": "localhost:9092",
22+
"producer.sasl.mechanisms": "PLAIN",
23+
"producer.security.protocol": "SASL_SSL",
24+
"producer.sasl.username": "myuser",
25+
"producer.sasl.password": "mypasswd",
26+
"{qos-0}.producer.hello": "property for qos 0",
27+
},
28+
},
29+
},
30+
{
31+
name: "Error expected key=value",
32+
input: "bootstrap.servers",
33+
output: KafkaConfigArgs{
34+
conf: map[string]kafka.ConfigValue{},
35+
},
36+
err: kafka.NewError(-186, "Expected key=value", false),
37+
},
38+
}
39+
for _, tc := range tests {
40+
t.Run(tc.name, func(t *testing.T) {
41+
a := assert.New(t)
42+
43+
s := new(Server)
44+
err := s.MQTT.Publisher.Kafka.ConfArgs.Set(tc.input)
45+
a.Equal(tc.err, err)
46+
a.Equal(tc.output, s.MQTT.Publisher.Kafka.ConfArgs)
47+
})
48+
}
49+
}

pkg/publisher/kafka/kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func producerProperties(qos byte, opts options) *kafka.ConfigMap {
203203
for k, v := range propertiesWithPrefix(opts.configMap, "producer.", true) {
204204
_ = configMap.SetKey(k, v)
205205
}
206-
for k, v := range propertiesWithPrefix(opts.configMap, fmt.Sprintf("{qos=%d}.producer.", qos), true) {
206+
for k, v := range propertiesWithPrefix(opts.configMap, fmt.Sprintf("{qos-%d}.producer.", qos), true) {
207207
_ = configMap.SetKey(k, v)
208208
}
209209
return &configMap

pkg/publisher/kafka/kafka_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package kafka
2+
3+
import (
4+
"github.com/confluentinc/confluent-kafka-go/kafka"
5+
"github.com/stretchr/testify/assert"
6+
"testing"
7+
)
8+
9+
func TestProducerProperties(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
qos byte
13+
input options
14+
output kafka.ConfigMap
15+
}{
16+
{
17+
name: "Set producer parameters",
18+
qos: 0,
19+
input: options{
20+
bootstrapServers: "localhost:9092",
21+
configMap: kafka.ConfigMap{
22+
"producer.sasl.mechanisms": "PLAIN",
23+
"producer.security.protocol": "SASL_SSL",
24+
"producer.sasl.username": "myuser",
25+
"producer.sasl.password": "mypasswd",
26+
"{qos-1}.producer.hello": "property for qos 1",
27+
"{qos-0}.producer.hello": "property for qos 0",
28+
},
29+
},
30+
output: kafka.ConfigMap{
31+
"bootstrap.servers": "localhost:9092",
32+
"sasl.mechanisms": "PLAIN",
33+
"security.protocol": "SASL_SSL",
34+
"sasl.username": "myuser",
35+
"sasl.password": "mypasswd",
36+
"hello": "property for qos 0",
37+
},
38+
},
39+
}
40+
for _, tc := range tests {
41+
t.Run(tc.name, func(t *testing.T) {
42+
a := assert.New(t)
43+
44+
actual := producerProperties(tc.qos, tc.input)
45+
a.Equal(tc.output, *actual)
46+
})
47+
}
48+
}

scripts/cp-kafka/Makefile

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
.DEFAULT_GOAL := build-up
22
.PHONY: build up down build-up
33

4+
ROOT_DIR := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))
5+
MQTT_PROXY := $(ROOT_DIR)/../../mqtt-proxy
6+
47
build-up: build up
58

69
build:
@@ -22,3 +25,53 @@ test-publish:
2225

2326
test-listen:
2427
docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt-test --property print.key=true --from-beginning
28+
29+
server-security-plaintext:
30+
@$(MQTT_PROXY) server \
31+
--mqtt.listen-address="0.0.0.0:2883" \
32+
--http.listen-address="0.0.0.0:10090" \
33+
--mqtt.publisher.name=kafka \
34+
--mqtt.publisher.kafka.bootstrap-servers=localhost:9092 \
35+
--mqtt.publisher.kafka.default-topic=mqtt-test
36+
37+
server-security-sasl-plaintext:
38+
@$(MQTT_PROXY) server \
39+
--mqtt.listen-address="0.0.0.0:2883" \
40+
--http.listen-address="0.0.0.0:10090" \
41+
--mqtt.publisher.name=kafka \
42+
--mqtt.publisher.kafka.bootstrap-servers=localhost:9093 \
43+
--mqtt.publisher.kafka.config=producer.sasl.mechanisms=PLAIN,producer.security.protocol=SASL_PLAINTEXT,producer.sasl.username=mqtt_proxy,producer.sasl.password=mqtt-proxy-secret \
44+
--mqtt.publisher.kafka.default-topic=mqtt-test
45+
46+
server-security-ssl:
47+
@$(MQTT_PROXY) server \
48+
--mqtt.listen-address="0.0.0.0:2883" \
49+
--http.listen-address="0.0.0.0:10090" \
50+
--mqtt.publisher.name=kafka \
51+
--mqtt.publisher.kafka.bootstrap-servers=localhost:9094 \
52+
--mqtt.publisher.kafka.config=producer.security.protocol=SSL,producer.ssl.ca.location=$(ROOT_DIR)/security/certs/ca-cert.pem \
53+
--mqtt.publisher.kafka.default-topic=mqtt-test
54+
55+
server-security-sasl-ssl:
56+
@$(MQTT_PROXY) server \
57+
--mqtt.listen-address="0.0.0.0:2883" \
58+
--http.listen-address="0.0.0.0:10090" \
59+
--mqtt.publisher.name=kafka \
60+
--mqtt.publisher.kafka.bootstrap-servers=localhost:9095 \
61+
--mqtt.publisher.kafka.config=producer.sasl.mechanisms=PLAIN,producer.security.protocol=SASL_SSL,producer.ssl.ca.location=$(ROOT_DIR)/security/certs/ca-cert.pem,producer.sasl.username=mqtt_proxy,producer.sasl.password=mqtt-proxy-secret \
62+
--mqtt.publisher.kafka.default-topic=mqtt-test
63+
64+
server-tls-security-plaintext:
65+
@$(MQTT_PROXY) server \
66+
--mqtt.listen-address="0.0.0.0:3883" \
67+
--http.listen-address="0.0.0.0:20090" \
68+
--mqtt.server-tls-key=$(ROOT_DIR)/security/certs/proxy-key.pem \
69+
--mqtt.server-tls-cert=$(ROOT_DIR)/security/certs/proxy-signed.pem \
70+
--mqtt.publisher.name=kafka \
71+
--mqtt.publisher.kafka.bootstrap-servers=localhost:9092 \
72+
--mqtt.publisher.kafka.default-topic=mqtt-test
73+
74+
publish-mosquitto:
75+
mosquitto_pub -L mqtt://localhost:2883/dummy -m "test qos 0" --repeat 1 -q 0
76+
mosquitto_pub -L mqtt://localhost:2883/dummy -m "test qos 1" --repeat 1 -q 1
77+
mosquitto_pub -L mqtt://localhost:2883/dummy -m "test qos 2" --repeat 1 -q 2

scripts/cp-kafka/docker-compose.yml

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,38 @@ services:
1818
depends_on:
1919
- zookeeper
2020
ports:
21-
- "19092:19092"
2221
- "9092:9092"
22+
- "9093:9093"
23+
- "9094:9094"
24+
- "9095:9095"
25+
- "19092:19092"
26+
- "19093:19093"
27+
- "19094:19094"
28+
- "19095:19095"
29+
volumes:
30+
- $PWD/security:/etc/kafka/secrets
2331
environment:
2432
KAFKA_BROKER_ID: 1
2533
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
26-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
27-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:19092,PLAINTEXT_HOST://localhost:9092
34+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_PLAINTEXT_HOST:SASL_PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL
35+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:19092,PLAINTEXT_HOST://localhost:9092,SASL_PLAINTEXT://broker:19093,SASL_PLAINTEXT_HOST://localhost:9093,SSL://broker:19094,SSL_HOST://localhost:9094,SASL_SSL://broker:19095,SASL_SSL_HOST://localhost:9095
2836
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
2937
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
3038
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
3139
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
40+
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
41+
# KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf -Djavax.net.debug=all
42+
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf
43+
KAFKA_ZOOKEEPER_SASL_CLIENT: 'FALSE'
44+
ZOOKEEPER_SASL_CLIENT: 'FALSE'
45+
ZOOKEEPER_SASL_ENABLED: 'FALSE'
46+
KAFKA_SSL_KEYSTORE_FILENAME: certs/docker.kafka.server.keystore.jks
47+
KAFKA_SSL_KEYSTORE_CREDENTIALS: certs/docker.kafka.server.keystore.passwd
48+
KAFKA_SSL_KEY_CREDENTIALS: certs/docker.kafka.server.key.passwd
49+
KAFKA_SSL_TRUSTSTORE_FILENAME: certs/docker.kafka.server.truststore.jks
50+
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: certs/docker.kafka.server.truststore.passwd
51+
# none, requested, required
52+
KAFKA_SSL_CLIENT_AUTH: requested
3253
mqtt-proxy:
3354
hostname: mqtt-proxy
3455
container_name: mqtt-proxy
@@ -46,6 +67,28 @@ services:
4667
- 1883:1883/tcp
4768
- 9090:9090/tcp
4869
restart: unless-stopped
70+
mqtt-proxy-ssl:
71+
hostname: mqtt-proxy-ssl
72+
container_name: mqtt-proxy-ssl
73+
build:
74+
context: ../..
75+
dockerfile: Dockerfile
76+
command:
77+
- server
78+
- '--mqtt.publisher.name=kafka'
79+
- '--mqtt.listen-address=0.0.0.0:1884'
80+
- '--http.listen-address=0.0.0.0:9091'
81+
- '--mqtt.publisher.kafka.bootstrap-servers=broker:19094'
82+
- '--mqtt.publisher.kafka.default-topic=mqtt-test'
83+
- '--mqtt.publisher.kafka.config=producer.security.protocol=SSL,producer.ssl.ca.location=/etc/mqtt-proxy/secrets/certs/ca-cert.pem'
84+
depends_on:
85+
- broker
86+
ports:
87+
- 1884:1884/tcp
88+
- 9091:9091/tcp
89+
volumes:
90+
- $PWD/security:/etc/mqtt-proxy/secrets
91+
restart: unless-stopped
4992
mqtt-client:
5093
image: eclipse-mosquitto
5194
hostname: mqtt-client
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
KafkaServer {
2+
org.apache.kafka.common.security.plain.PlainLoginModule required
3+
username="broker"
4+
password="broker-secret"
5+
user_broker="broker-secret"
6+
user_mqtt_proxy="mqtt-proxy-secret"
7+
;
8+
};
9+
10+
KafkaClient {
11+
org.apache.kafka.common.security.plain.PlainLoginModule required
12+
username="broker"
13+
password="broker-secret"
14+
user_broker="broker-secret"
15+
;
16+
};
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIIDhzCCAm+gAwIBAgIUWS/AVh8AcU8xMSmChyenwlb9liowDQYJKoZIhvcNAQEL
3+
BQAwUzELMAkGA1UEBhMCREUxDTALBgNVBAcMBEJvbm4xEjAQBgNVBAoMCWdyZXBw
4+
bGFiczENMAsGA1UECwwETm9uZTESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIwMDUy
5+
OTE1NDQzNloXDTQwMDUyNDE1NDQzNlowUzELMAkGA1UEBhMCREUxDTALBgNVBAcM
6+
BEJvbm4xEjAQBgNVBAoMCWdyZXBwbGFiczENMAsGA1UECwwETm9uZTESMBAGA1UE
7+
AwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA0II7
8+
7/IAZpKaluc8q8i9PTWZ/R+I9AcCifrZGSkCSTwL52qWeoVFnv4xEpUhQVgqVqAq
9+
q099J1+y0bZ+4YtdCxs+DHBuT70XVY5n0Dc92viOqCFfCMB9IjT6lc0ZEzj3qcks
10+
Qh+TrebEYrqlLgO+K6H2i1cdQW1qzOg7BD5YOUAU6vBm/McpcsEON0bEl7CkscdS
11+
vJBt19xZaccKfWUx3m93PMjqrmUQRzBPI8T9iqD244lltAmhx5LzdFD+mJaf0iOG
12+
UElfEjKqEFbmtwOOxy60wCEsbaKokrILAGEWXacA+3T+nmAvF0pSkh4WUVbUixnY
13+
Pqda9cpjJM+gmOcZ/QIDAQABo1MwUTAdBgNVHQ4EFgQUu5xOVc73Ro+iwoK1qc9U
14+
NWM/2IYwHwYDVR0jBBgwFoAUu5xOVc73Ro+iwoK1qc9UNWM/2IYwDwYDVR0TAQH/
15+
BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAHnsDTSkngXjnY97MB82r7TPY3L3q
16+
p0dfu8FKt/Ybik9tjoX3CWWAcbv/7SxiXdC8Dw1xbqJO8UHJwzPeHyQZRebcDbrB
17+
vbN8C4iJDiMyfs6fnxZCAWL/zWIhxujqJSurae4kcm2sMNxbwcMITnixHwU38euQ
18+
paXkyKzXzzj0vFxNQCj34QNisFLqM2nrZuDneQ7VbnKHDF5hf5ZiPViDmUEFwHjk
19+
HpAKgUXWULX6UWRdDROJyfGu9mjY8mcQSzhUXL3+qZ5mYlyLZ0aR5bgz+/+BTvoN
20+
0GHeHwlSNG//Cd7LiKdTiUjWxL7pLhksru0OPyWJpIT9FjAKSQo0zrpT4g==
21+
-----END CERTIFICATE-----
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
21068AE6AF5C4D92DD05FA750CF6F70E0D92A0DB

0 commit comments

Comments
 (0)