Skip to content

add API to publish MQTT messages #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .deps
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/eclipse/paho.mqtt.golang
github.com/eclipse/paho.mqtt.golang/packets
github.com/gorilla/websocket
github.com/namsral/flag
Expand Down
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ go:
services:
- docker

script:
script:
- set -e
- make docker
- make docker-test
- if [ "$TRAVIS_BRANCH" == "master" ]; then
make docker-login ;
make docker-publish ;
fi

5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION := 0.0.4
VERSION := 0.0.5
NAME := mqtt-proxy
DATE := $(shell date +'%Y-%M-%d_%H:%M:%S')
BUILD := $(shell git rev-parse HEAD | cut -c1-8)
Expand All @@ -20,7 +20,8 @@ dev:
docker-test:
docker-compose -f docker-compose.test.yml down
docker-compose -f docker-compose.test.yml build
docker-compose -f docker-compose.test.yml run sut || (docker-compose -f docker-compose.test.yml logs -t | sort -k 3 ; docker-compose -f docker-compose.test.yml down ; exit 1)
docker-compose -f docker-compose.test.yml up --abort-on-container-exit || (docker-compose -f docker-compose.test.yml down ; exit 1)
#docker-compose -f docker-compose.test.yml run sut || (docker-compose -f docker-compose.test.yml logs -t | sort -k 3 ; docker-compose -f docker-compose.test.yml down ; exit 1)
docker-compose -f docker-compose.test.yml down

docker-test-logs:
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

[![Build Status](https://travis-ci.org/jdavanne/docker-sni-proxy.svg?branch=master)](https://travis-ci.org/jdavanne/docker-sni-proxy)

![alt text](https://img.shields.io/docker/automated/davinci1976/mqtt-proxy.svg)
![alt text](https://img.shields.io/docker/build/davinci1976/mqtt-proxy.svg)

Proxy to apply Axway API Gateway policies (authN, authZ, content manipulation,...) on MQTT protocol for any MQTT broker.


Expand Down Expand Up @@ -72,11 +75,12 @@ make docker-test
```

## Changelog
- 0.0.5
- Add HTTP(s) API for MQTT Publish `/topics/:topic` with basic authentication
- 0.0.4
- Add HTTPS and MQTTS server support `--mqtts-*` `--https-*`
- Add HTTPS client support for authz/authn with added cert verification
- Remove the 2 steps build: use docker 17.05 build capability

- 0.0.3
- Add websocket support `--http-host` `--http-port`
- rename `--mqtt-*` variables to `--mqtt-broker-*`
Expand All @@ -87,8 +91,6 @@ make docker-test
- no configurable routes
- No TLS support for broker
- No additional TLS options supported between the client and mqtt-proxy (algo, ....)
- No HTTP API to publish a MQTT message on a topic `/topics/:topic?qos=:qos`
(like http://docs.aws.amazon.com/iot/latest/developerguide/protocols.html#http)
- No cache for publish/receive/subscribe policy check

## Contributing
Expand Down
5 changes: 2 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ version: "2"

services:
mqtt-proxy:
build: .
#build: .
image: davinci1976/mqtt-proxy
environment:
MQTT_PORT: 1883 # regular mqtt
MQTT_HOST: 0.0.0.0

MQTTS_PORT: 8883 # mqtt over tls
MQTTS_HOST: 0.0.0.0
MQTTS_CERT: "./certs/server.pem"
MQTTS_KEY: "./certs/server.key"

Expand Down
4 changes: 3 additions & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var mqttBrokerUsername string
var mqttBrokerPassword string
var authURL string
var authCAFile string

var Version string
var Build string
var Date string
Expand Down Expand Up @@ -84,7 +85,7 @@ func main() {
flag.StringVar(&authCAFile, "auth-ca-file", "", "PEM encoded CA's certificate file for the authz/authn service")
flag.Parse()

log.Println("Starting mqtt-proxy @ ", Version, Build, Date)
log.Println("Starting mqtt-proxy - version:", Version, "build:", Build, " date:", Date)
log.Println("mqtt server ", mqttBrokerHost, mqttBrokerPort, mqttBrokerUsername, mqttBrokerPassword)

if authURL != "" {
Expand Down Expand Up @@ -127,6 +128,7 @@ func main() {

if httpEnable || httpsEnable {
wsMqttPrepare()
apiPublishMqttPrepare()
}

if httpEnable {
Expand Down
102 changes: 102 additions & 0 deletions src/mqtt-api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"io/ioutil"
"math/rand"
"net/http"
"strconv"
"strings"

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/eclipse/paho.mqtt.golang/packets"
log "github.com/sirupsen/logrus"
)

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-"

func RandStringBytesRmndr(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Int63()%int64(len(letterBytes))]
}
return string(b)
}

func apiPublishMqttPrepare() {
http.HandleFunc("/topic/", func(w http.ResponseWriter, r *http.Request) {
log.Println("api: incoming request", r.Header)
user, pass, ok := r.BasicAuth()
if !ok {
http.Error(w, "Unauthorized.", 401)
return
}
var pConnect packets.ConnectPacket
pConnect.Username = user
pConnect.Password = []byte(pass)
//FIXME: a collision probability exits
pConnect.ClientIdentifier = "api-" + RandStringBytesRmndr(10)
pConnect.CleanSession = true
pConnect.ProtocolName = "MQTT"
pConnect.ProtocolVersion = 5
session := NewSession()
err := session.HandleConnect(">", &pConnect)
if err != nil {
http.Error(w, "Unauthorized.", 401)
return
}

defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Bad Request.", 400)
return
}

var pPublish packets.PublishPacket
pPublish.Qos = 1
pPublish.TopicName = strings.TrimPrefix(r.URL.RequestURI(), "/topic/")
pPublish.Payload = body

err = session.HandlePublish(">", &pPublish)
if err != nil {
http.Error(w, "Bad Request.", 400)
return
}

broker := "tcp://" + mqttBrokerHost + ":" + strconv.Itoa(mqttBrokerPort)

/*if strings.HasPrefix(broker, "mqtt://") {
broker = "tcp://" + strings.TrimPrefix(broker, "mqtt://")
}*/

opts := MQTT.NewClientOptions().AddBroker(broker)
opts.SetClientID(pConnect.ClientIdentifier)
opts.SetUsername(pConnect.Username)
opts.SetPassword(string(pConnect.Password))

//create and start a client using the above ClientOptions
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Errorln("Connect", broker, "as", pConnect.ClientIdentifier, token.Error())
http.Error(w, "Internal Error.", 500)
log.Errorln(token.Error())
return
}

//subscribe to the topic /go-mqtt/sample and request messages to be delivered
//at a maximum qos of zero, wait for the receipt to confirm the subscription
if token := client.Publish(pPublish.TopicName, 0, false, pPublish.Payload); token.Wait() && token.Error() != nil {
log.Errorln("Publish", pConnect.ClientIdentifier, pPublish.TopicName, token.Error())
client.Disconnect(0)
client = nil
http.Error(w, "Internal Error.", 500)
log.Errorln(token.Error())
return
}

http.Error(w, "OK.", 200)

client.Disconnect(0)

})
}
10 changes: 5 additions & 5 deletions src/mqtt-auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (session *Session) request(way string, uri string, request interface{}, res
return resp.StatusCode, nil
}

func (session *Session) HandleConnect(way string, p *packets.ConnectPacket, r net.Conn, w net.Conn) error {
func (session *Session) HandleConnect(way string, p *packets.ConnectPacket) error {
log.Println("Session", session.id, "- CONNECT")
var resp MQTTConnectResponse
rq := MQTTConnect{session.id, p.Username, string(p.Password), p.ClientIdentifier, p.CleanSession, p.ProtocolName, int(p.ProtocolVersion)}
Expand Down Expand Up @@ -129,7 +129,7 @@ func (session *Session) HandleConnect(way string, p *packets.ConnectPacket, r ne
return nil
}

func (session *Session) HandleSubscribe(way string, p *packets.SubscribePacket, r net.Conn, w net.Conn) error {
func (session *Session) HandleSubscribe(way string, p *packets.SubscribePacket, r net.Conn) error {
log.Println("Session", session.id, way, "- SUBSCRIBE", p.Topics, p.Qos)
var resp MQTTSubscribeResponse
topics := p.Topics
Expand Down Expand Up @@ -163,14 +163,14 @@ func (session *Session) HandleSubscribe(way string, p *packets.SubscribePacket,
return nil
}

func (session *Session) HandlePublish(way string, p *packets.PublishPacket, r net.Conn, w net.Conn) error {
func (session *Session) HandlePublish(way string, p *packets.PublishPacket) error {
action := "PUBLISH"
uri := "/publish"
if w == session.inbound {
if way == "<" {
action = "RECEIVE"
uri = "/receive"
}
log.Println("Session", session.id, way, "- "+action, r.RemoteAddr().String(), w.RemoteAddr().String())
//log.Println("Session", session.id, way, "- "+action, r.RemoteAddr().String(), w.RemoteAddr().String())
log.Println("Session", session.id, way, "- "+action, p.TopicName, p.Qos, string(p.Payload))
rq := MQTTPublish{session.id, session.Username, session.ClientIdentifier, p.TopicName, int(p.Qos), string(p.Payload)}
var resp MQTTPublishResponse
Expand Down
8 changes: 4 additions & 4 deletions src/mqtt-packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package main
import (
"net"

log "github.com/sirupsen/logrus"
"github.com/eclipse/paho.mqtt.golang/packets"
log "github.com/sirupsen/logrus"
)

func (session *Session) ForwardMQTTPacket(way string, r net.Conn, w net.Conn) error {
Expand All @@ -20,11 +20,11 @@ func (session *Session) ForwardMQTTPacket(way string, r net.Conn, w net.Conn) er
if authURL != "" {
switch p := cp.(type) {
case *packets.ConnectPacket: /*Outbound only*/
err = session.HandleConnect(way, p, r, w)
err = session.HandleConnect(way, p)
case *packets.SubscribePacket: /*Outbound only*/
err = session.HandleSubscribe(way, p, r, w)
err = session.HandleSubscribe(way, p, r)
case *packets.PublishPacket: /*Inbound/Outbound only*/
err = session.HandlePublish(way, p, r, w)
err = session.HandlePublish(way, p)
default:
err = nil
}
Expand Down
3 changes: 2 additions & 1 deletion tests/test/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"chai": "^3.5.0",
"mocha": "^3.2.0",
"mqtt": "^2.4.0",
"nodemon": "^1.11.0"
"nodemon": "^1.11.0",
"request": "^2.81.0"
}
}
50 changes: 46 additions & 4 deletions tests/test/test-tls.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const mqtt = require('mqtt')
const request = require('request')
const fs = require('fs')

function expect(a, b) {
if (a != b) {
Expand All @@ -22,7 +24,10 @@ class Once {

process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"

const MQTT_BROKER = "mqtts://mqtt-proxy:8883"
const MQTT_BROKER = 'mqtts://mqtt-proxy:8883'
const HTTP_BROKER = 'https://mqtt-proxy:8081'

const casigningcert = fs.readFileSync('certs/mqtt-proxy.pem')

describe('mqtt-proxy (TLS)', () => {

Expand Down Expand Up @@ -116,7 +121,7 @@ describe('mqtt-proxy (TLS)', () => {
})
client.on('message', function (topic, message) {
if (topic != "renamed_topic_on_subscribe") {
return done(new Error('Unexpected topic :' + topic))
return once.done(new Error('Unexpected topic :' + topic))
}
once.done()
client.close()
Expand Down Expand Up @@ -173,7 +178,7 @@ describe('mqtt-proxy (TLS)', () => {
})
client.on('message', function (topic, message) {
if (message != "altered_message_on_publish") {
return done(new Error('Unexpected message :' + topic))
return once.done(new Error('Unexpected message :' + topic))
}
once.done()
client.close()
Expand All @@ -191,7 +196,7 @@ describe('mqtt-proxy (TLS)', () => {
})
client.on('message', function (topic, message) {
if (message != "altered_message_on_receive") {
return done(new Error('Unexpected message :' + topic))
return once.done(new Error('Unexpected message :' + topic))
}
once.done()
client.close()
Expand All @@ -200,4 +205,41 @@ describe('mqtt-proxy (TLS)', () => {
once.done(new Error("premature close"))
})
})
it.skip('test api', (done) => {
const client = mqtt.connect(MQTT_BROKER, { username: "test-http-api", password: "goodpass" })
const once = new Once(done)
client.on('connect', function () {
client.subscribe('test-http-api')
console.log("API calling...")
request.post(HTTP_BROKER + "/topic/test-http-api", {
auth: {
user: 'test-http-api-http',
pass: 'goodpass'
},
body: "test_http_api_message",
agent: {
ca: casigningcert,
},
}, (err, response, body) => {
console.log("API called", err, response.headers, response.statusCode, body)
if (err) {
return once.done(err)
}
if (response.statusCode != 200) {
return once.done(new Error("bad response status code :" + response.statusCode))
}
once.done()
})
})
client.on('message', function (topic, message) {
if (message != "test_http_api_message") {
return once.done(new Error('Unexpected message :' + topic))
}
//once.done()
client.close()
})
client.on('close', function (err) {
//once.done(new Error("premature close"))
})
})
})
Loading