Skip to content

Commit 78fe285

Browse files
committed
mqtt 5
1 parent c63766a commit 78fe285

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2156
-85
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
.PHONY: clean build fmt test
44

5-
TAG ?= v0.1.1
5+
TAG ?= v0.2.0
66

77
BUILD_FLAGS ?=
88
BINARY ?= mqtt-proxy

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ MQTT Proxy allows MQTT clients to send messages to other messaging systems
1414

1515
* MQTT protocol
1616
* [x] [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html)
17-
* [ ] [MQTT 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html)
17+
* [x] [MQTT 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html)
1818
* Publisher
1919
* [x] Noop
2020
* [x] [Apache Kafka](https://kafka.apache.org/)
@@ -95,6 +95,9 @@ prerequisites
9595
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 0" --repeat 1 -q 0
9696
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 1" --repeat 1 -q 1
9797
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 2" --repeat 1 -q 2
98+
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 0 / v5" --repeat 1 -q 0 -V mqttv5
99+
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 1 / v5" --repeat 1 -q 1 -V mqttv5
100+
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 2 / v5" --repeat 1 -q 2 -V mqttv5
98101
```
99102
100103
* proxy using Kafka SSL listener

apis/auth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
const (
1010
AuthAccepted = mqttproto.Accepted
11-
AuthUnauthorized = mqttproto.RefusedNotAuthorized
11+
AuthUnauthorized = mqttproto.RefusedBadUserNameOrPassword
1212
)
1313

1414
type UserPasswordAuthRequest struct {

pkg/mqtt/codec/packets.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
mqttproto "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/proto"
99
mqtt311 "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/v311"
10+
mqtt5 "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/v5"
1011
)
1112

1213
func ReadPacket(r io.Reader, protocolVersion byte) (mqttproto.ControlPacket, error) {
@@ -26,7 +27,7 @@ func ReadPacket(r io.Reader, protocolVersion byte) (mqttproto.ControlPacket, err
2627
case mqttproto.MQTT_3_1_1:
2728
return mqtt311.ReadPacket(r)
2829
case mqttproto.MQTT_5:
29-
return nil, mqtt311.NewConnAckError(mqttproto.RefusedUnacceptableProtocolVersion, "mqtt5 is not supported yet")
30+
return mqtt5.ReadPacket(r)
3031
default:
3132
return nil, mqtt311.NewConnAckError(mqttproto.RefusedUnacceptableProtocolVersion, fmt.Sprintf("unsupported protocol version %v", protocolVersion))
3233
}

pkg/mqtt/codec/proto/constants.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ const (
7878

7979
// MQTT 5 - 3.2.2.2 Connect Reason Code
8080
const (
81-
RefusedUnspecifiedError byte = 0x80 // 128
82-
RefusedUnsupportedProtocolVersion byte = 0x84 // 132
81+
RefusedV5UnspecifiedError byte = 0x80 // 128
82+
RefusedV5UnsupportedProtocolVersion byte = 0x84 // 132
83+
RefusedV5BadUserNameOrPassword byte = 0x86 // 134
8384
)

pkg/mqtt/codec/proto/encoding.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,20 +62,20 @@ func EncodeUint16(v uint16) []byte {
6262
return b
6363
}
6464

65-
func DecodeLength(r io.Reader) (int, error) {
65+
func DecodeUvarint(r io.Reader) (int, error) {
6666
byteReader := newByteReader(r)
6767
length, err := binary.ReadUvarint(byteReader)
6868
if err != nil {
6969
return 0, err
7070
}
7171
if byteReader.bytesRead > 4 {
72-
return 0, fmt.Errorf("the maximum number of bytes in the length is 4, but was %d", byteReader.bytesRead)
72+
return 0, fmt.Errorf("the maximum number of bytes in the variable byte integer is 4, but was %d", byteReader.bytesRead)
7373
}
7474
return int(length), nil
7575
}
7676

77-
// WriteLength is a modified binary.PutUvarint
78-
func WriteLength(buffer *bytes.Buffer, x uint32) int {
77+
// WriteUvarint is a modified binary.PutUvarint
78+
func WriteUvarint(buffer *bytes.Buffer, x uint32) int {
7979
i := 0
8080
for x >= 0x80 {
8181
buffer.WriteByte(byte(x) | 0x80)
@@ -104,3 +104,14 @@ func (r *byteReader) ReadByte() (byte, error) {
104104
r.bytesRead += n
105105
return r.buf[0], nil
106106
}
107+
108+
type CountingReader struct {
109+
Reader io.Reader
110+
BytesRead int
111+
}
112+
113+
func (r *CountingReader) Read(p []byte) (n int, err error) {
114+
n, err = r.Reader.Read(p)
115+
r.BytesRead += n
116+
return n, err
117+
}

pkg/mqtt/codec/proto/fixedheader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ func (fh *FixedHeader) Unpack(typeAndFlags byte, r io.Reader) (err error) {
4444
fh.Dup = (typeAndFlags & 0x08) == 0x08
4545
fh.Qos = (typeAndFlags & 0x06) >> 1
4646
fh.Retain = (typeAndFlags & 0x01) != 0
47-
fh.RemainingLength, err = DecodeLength(r)
47+
fh.RemainingLength, err = DecodeUvarint(r)
4848
return err
4949
}
5050

5151
func (fh *FixedHeader) Pack() bytes.Buffer {
5252
var header bytes.Buffer
5353
header.WriteByte(fh.getFixedHeaderByte1())
54-
WriteLength(&header, uint32(fh.RemainingLength))
54+
WriteUvarint(&header, uint32(fh.RemainingLength))
5555
return header
5656
}
5757

pkg/mqtt/codec/proto/fixedheader_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,12 @@ func TestFixedHeaderDecodeError(t *testing.T) {
145145
{
146146
name: "remaining length too long",
147147
input: "008080808001",
148-
expected: "the maximum number of bytes in the length is 4, but was 5",
148+
expected: "the maximum number of bytes in the variable byte integer is 4, but was 5",
149149
},
150150
{
151151
name: "remaining length too long",
152152
input: "00808080808001",
153-
expected: "the maximum number of bytes in the length is 4, but was 6",
153+
expected: "the maximum number of bytes in the variable byte integer is 4, but was 6",
154154
},
155155
}
156156
for _, tc := range tests {

pkg/mqtt/codec/v311/connack_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func TestNewConnackPacket(t *testing.T) {
1515
packet := NewControlPacket(mqttproto.CONNACK).(*ConnackPacket)
1616
a.Equal(mqttproto.CONNACK, packet.MessageType)
1717
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
18+
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
1819
t.Log(packet)
1920
}
2021

@@ -78,6 +79,7 @@ func TestConnackPacketCodec(t *testing.T) {
7879
}
7980
packet := decoded.(*ConnackPacket)
8081
a.Equal(*tc.packet, *packet)
82+
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
8183

8284
// encode
8385
var output bytes.Buffer

pkg/mqtt/codec/v311/connect_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func TestNewConnectPacket(t *testing.T) {
1515
packet := NewControlPacket(mqttproto.CONNECT).(*ConnectPacket)
1616
a.Equal(mqttproto.CONNECT, packet.MessageType)
1717
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
18+
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
1819
t.Log(packet)
1920
}
2021

@@ -115,6 +116,7 @@ func TestDecodeConnectPacket(t *testing.T) {
115116
}
116117
packet := decoded.(*ConnectPacket)
117118
a.Equal(*tc.packet, *packet)
119+
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
118120

119121
// encode
120122
var output bytes.Buffer

0 commit comments

Comments
 (0)