Skip to content

Commit

Permalink
Merge pull request emqx#97 from ejvr/emqtt
Browse files Browse the repository at this point in the history
Fixed unit tests & introduced state engine for message parsing
  • Loading branch information
mwallnoefer authored Jun 28, 2017
2 parents fad3a4a + 06256c1 commit 99795b6
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 65 deletions.
86 changes: 35 additions & 51 deletions src/mqtt/qmqtt_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ QMQTT::Network::Network(QObject* parent)
, _host(DEFAULT_HOST)
, _autoReconnect(DEFAULT_AUTORECONNECT)
, _autoReconnectInterval(DEFAULT_AUTORECONNECT_INTERVAL_MS)
, _bytesRemaining(0)
, _socket(new QMQTT::Socket)
, _autoReconnectTimer(new QMQTT::Timer)
, _readState(Header)
{
initialize();
}
Expand All @@ -62,9 +62,9 @@ QMQTT::Network::Network(const QSslConfiguration &config, bool ignoreSelfSigned,
, _hostName(DEFAULT_HOST_NAME)
, _autoReconnect(DEFAULT_AUTORECONNECT)
, _autoReconnectInterval(DEFAULT_AUTORECONNECT_INTERVAL_MS)
, _bytesRemaining(0)
, _socket(new QMQTT::SslSocket(config, ignoreSelfSigned))
, _autoReconnectTimer(new QMQTT::Timer)
, _readState(Header)
{
initialize();
}
Expand All @@ -77,9 +77,9 @@ QMQTT::Network::Network(SocketInterface* socketInterface, TimerInterface* timerI
, _host(DEFAULT_HOST)
, _autoReconnect(DEFAULT_AUTORECONNECT)
, _autoReconnectInterval(DEFAULT_AUTORECONNECT_INTERVAL_MS)
, _bytesRemaining(0)
, _socket(socketInterface)
, _autoReconnectTimer(timerInterface)
, _readState(Header)
{
initialize();
}
Expand Down Expand Up @@ -127,7 +127,7 @@ void QMQTT::Network::connectToHost(const QString& hostName, const quint16 port)

void QMQTT::Network::connectToHost()
{
_bytesRemaining = 0;
_readState = Header;
if (_hostName.isEmpty())
{
_socket->connectToHost(_host, _port);
Expand Down Expand Up @@ -190,59 +190,43 @@ void QMQTT::Network::setAutoReconnectInterval(const int autoReconnectInterval)
void QMQTT::Network::onSocketReadReady()
{
QIODevice *ioDevice = _socket->ioDevice();
while(!ioDevice->atEnd())
{
if(_bytesRemaining == 0)
{
if (!ioDevice->getChar(reinterpret_cast<char *>(&_header)))
{
// malformed packet
emit error(QAbstractSocket::OperationError);
ioDevice->close();
return;
}

_bytesRemaining = readRemainingLength();
if (_bytesRemaining < 0)
{
// malformed remaining length
emit error(QAbstractSocket::OperationError);
ioDevice->close();
return;
// Only read the available (cached) bytes, so the read will never block.
QByteArray data = ioDevice->read(ioDevice->bytesAvailable());
foreach(char byte, data) {
switch (_readState) {
case Header:
_header = static_cast<quint8>(byte);
_readState = Length;
_length = 0;
_shift = 0;
break;
case Length:
_length |= (byte & 0x7F) << _shift;
_shift += 7;
if ((byte & 0x80) != 0)
break;
if (_length == 0) {
_readState = Header;
Frame frame(_header, _data);
emit received(frame);
break;
}
}

QByteArray data = ioDevice->read(_bytesRemaining);
_buffer.append(data);
_bytesRemaining -= data.size();

if(_bytesRemaining == 0)
{
Frame frame(_header, _buffer);
_buffer.clear();
_readState = PayLoad;
_data.clear();
break;
case PayLoad:
_data.append(byte);
--_length;
if (_length > 0)
break;
_readState = Header;
Frame frame(_header, _data);
emit received(frame);
break;
}
}
}

int QMQTT::Network::readRemainingLength()
{
quint8 byte = 0;
int length = 0;
int multiplier = 1;
QIODevice *ioDevice = _socket->ioDevice();
do {
if (!ioDevice->getChar(reinterpret_cast<char *>(&byte)))
return -1;
length += (byte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128*128*128)
return -1;
} while ((byte & 128) != 0);

return length;
}

void QMQTT::Network::onDisconnected()
{
emit disconnected();
Expand Down
16 changes: 12 additions & 4 deletions src/mqtt/qmqtt_network_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,27 @@ protected slots:

protected:
void initialize();
int readRemainingLength();

quint16 _port;
QHostAddress _host;
QString _hostName;
QByteArray _buffer;
bool _autoReconnect;
int _autoReconnectInterval;
int _bytesRemaining;
quint8 _header;
SocketInterface* _socket;
TimerInterface* _autoReconnectTimer;

enum ReadState {
Header,
Length,
PayLoad
};

ReadState _readState;
quint8 _header;
int _length;
int _shift;
QByteArray _data;

protected slots:
void onSocketReadReady();
void onDisconnected();
Expand Down
23 changes: 17 additions & 6 deletions tests/gtest/tests/clienttest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,9 @@ TEST_F(ClientTest, publishEmitsPublishedSignal_Test)

_client->publish(message);

EXPECT_EQ(1, spy.count());
EXPECT_EQ(message, spy.at(0).at(0).value<QMQTT::Message>());
ASSERT_EQ(1, spy.count());
EXPECT_EQ(message.id(), spy.at(0).at(0).value<quint16>());
EXPECT_EQ(QOS0, spy.at(0).at(1).value<quint8>());
}

// todo: network received sends a puback, test what happens
Expand All @@ -433,8 +434,15 @@ TEST_F(ClientTest, subscribeEmitsSubscribedSignal_Test)

_client->subscribe("topic", QOS2);

EXPECT_EQ(1, spy.count());
EXPECT_EQ("topic", spy.at(0).at(0).toString());
QByteArray payLoad;
payLoad.append((char)0x12); // message ID
payLoad.append((char)0x23); // message ID
payLoad.append((char)QOS2); // QOS
QMQTT::Frame frame(SUBACK_TYPE, payLoad);
emit _networkMock->received(frame);

ASSERT_EQ(1, spy.count());
EXPECT_EQ(QOS2, spy.at(0).at(1).toInt());
}

// todo: network received sends suback triggers a subscribed signal (other things?)
Expand All @@ -447,8 +455,11 @@ TEST_F(ClientTest, unsubscribeEmitsUnsubscribedSignal_Test)

_client->unsubscribe("topic");

QMQTT::Frame frame(UNSUBACK_TYPE, QByteArray());
emit _networkMock->received(frame);

EXPECT_EQ(1, spy.count());
EXPECT_EQ("topic", spy.at(0).at(0).toString());
// EXPECT_EQ("topic", spy.at(0).at(0).toString());
}

// todo: network received sends unsuback then emit unsubscribed signal (only then?)
Expand All @@ -466,7 +477,7 @@ TEST_F(ClientTest, clientEmitsErrorWhenNetworkEmitsError_Test)
{
QSignalSpy spy(_client.data(), &QMQTT::Client::error);
emit _networkMock->error(QAbstractSocket::ConnectionRefusedError);
EXPECT_EQ(1, spy.count());
ASSERT_EQ(1, spy.count());
EXPECT_EQ(QMQTT::SocketConnectionRefusedError,
spy.at(0).at(0).value<QMQTT::ClientError>());
}
1 change: 1 addition & 0 deletions tests/gtest/tests/iodevicemock.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class IODeviceMock : public QIODevice
MOCK_METHOD2(readData, qint64(char*, qint64));
MOCK_METHOD2(writeData, qint64(const char *, qint64));
MOCK_CONST_METHOD0(openMode, QIODevice::OpenMode());
MOCK_CONST_METHOD0(bytesAvailable, qint64());

MOCK_METHOD2(write, qint64(const char*, qint64));
MOCK_METHOD1(write, qint64(const char*));
Expand Down
8 changes: 4 additions & 4 deletions tests/gtest/tests/networktest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class NetworkTest : public Test
return actualLength;
}

bool fixtureByteArrayIsEmpty() const
qint64 fixtureBytesAvailable() const
{
return _byteArray.isEmpty();
return _byteArray.size();
}

SocketMock* _socketMock;
Expand Down Expand Up @@ -164,8 +164,8 @@ TEST_F(NetworkTest, networkEmitsReceivedSignalOnceAFrameIsReceived_Test)
buffer.close();
EXPECT_EQ(132, _byteArray.size());

EXPECT_CALL(*_socketMock->mockIoDevice, atEnd())
.WillRepeatedly(Invoke(this, &NetworkTest::fixtureByteArrayIsEmpty));
EXPECT_CALL(*_socketMock->mockIoDevice, bytesAvailable())
.WillRepeatedly(Invoke(this, &NetworkTest::fixtureBytesAvailable));
EXPECT_CALL(*_socketMock->mockIoDevice, readData(_, _))
.WillRepeatedly(Invoke(this, &NetworkTest::readDataFromFixtureByteArray));

Expand Down

0 comments on commit 99795b6

Please sign in to comment.