This is an extremely primitive MQTT to Kafka bridge, consisting of around 100 lines of code. It is not optimized for extensibility, performance nor stability and should not be used in a production environment.
All it does is reading messages from an MQTT broker, replacing topic separators (e.g. home/outside/humidity
-> home.outside.humidity
) and forwarding the message to Kafka.
Environment Variable | Default value | Description |
---|---|---|
CLIENT_ID | Mqtt2KafkaBridge |
Client ID used to connect to MQTT and Kafka broker. |
KAFKA_BROKER_HOST | localhost:9092 |
Host and port of your Kafka broker. |
KAFKA_TOPIC_SEPARATOR | . |
The topic separator used for Kafka topics, it replaces MQTT_TOPIC_SEPARATOR . |
MQTT_BROKER_HOST | localhost:1883 |
Host and port of your MQTT broker. |
MQTT_BROKER_USER | Username used to connect to MQTT broker. | |
MQTT_BROKER_PASSWORD | Password used to connect to MQTT broker. | |
MQTT_AUTOMATIC_RECONNECT | true |
Whether or not the MQTT client should reconnect automatically after connection to broker is lost. |
MQTT_TOPIC_FILTER | # |
The MQTT topic filter that we subscribe to, by default all messages. |
MQTT_TOPIC_SEPARATOR | / |
The topic separator used for MQTT topics, it is replaced by KAFKA_TOPIC_SEPARATOR . |
./gradlew build
java -jar build/libs/iot-home.mqtt2kafkabridge-1.0.jar
export KAFKA_BROKER_HOST=kafka:9092;
export MQTT_BROKER_HOST=mqtt:9092;
java -jar build/libs/iot-home.mqtt2kafkabridge-1.0.jar;
dockerhub: https://hub.docker.com/r/marmaechler/mqtt2kafkabridge
version: "3"
services:
mqtt2kafkabridge:
image: marmaechler/mqtt2kafkabridge:latest
environment:
KAFKA_BROKER_HOST: example.com:9092
KAFKA_TOPIC_SEPARATOR: _
MQTT_TOPIC_FILTER: home/#
volumes:
- ./mqtt2kafkabridge/logs:/opt/mqtt2kafkabridge/logs
version: "3"
services:
hivemq:
build: hivemq
restart: always
ports:
- 1883:1883
- 8080:8080
- 8000:8000
zookeeper:
image: bitnami/zookeeper:latest
restart: always
environment:
ALLOW_ANONYMOUS_LOGIN: 'yes'
volumes:
- ./zookeeper/persistence:/bitnami/zookeeper
kafka:
image: bitnami/kafka:latest
depends_on:
- zookeeper
restart: always
ports:
- 9092:9092
- 29092:29092
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: 'yes'
# see https://rmoff.net/2018/08/02/kafka-listeners-explained/
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
volumes:
- ./kafka/persistence:/bitnami/kafka
mqtt2kafkabridge:
image: marmaechler/mqtt2kafkabridge:latest
depends_on:
- kafka
- hivemq
restart: always
environment:
KAFKA_BROKER_HOST: kafka:9092
MQTT_BROKER_HOST: hivemq:1883
volumes:
- ./mqtt2kafkabridge/logs:/opt/mqtt2kafkabridge/logs