Skip to content

Commit

Permalink
Created state engine parsing incomming TCP traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
ejvr committed Jun 28, 2017
1 parent 6640e04 commit 06256c1
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 59 deletions.
86 changes: 35 additions & 51 deletions src/mqtt/qmqtt_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ QMQTT::Network::Network(QObject* parent)
, _host(DEFAULT_HOST)
, _autoReconnect(DEFAULT_AUTORECONNECT)
, _autoReconnectInterval(DEFAULT_AUTORECONNECT_INTERVAL_MS)
, _bytesRemaining(0)
, _socket(new QMQTT::Socket)
, _autoReconnectTimer(new QMQTT::Timer)
, _readState(Header)
{
initialize();
}
Expand All @@ -62,9 +62,9 @@ QMQTT::Network::Network(const QSslConfiguration &config, bool ignoreSelfSigned,
, _hostName(DEFAULT_HOST_NAME)
, _autoReconnect(DEFAULT_AUTORECONNECT)
, _autoReconnectInterval(DEFAULT_AUTORECONNECT_INTERVAL_MS)
, _bytesRemaining(0)
, _socket(new QMQTT::SslSocket(config, ignoreSelfSigned))
, _autoReconnectTimer(new QMQTT::Timer)
, _readState(Header)
{
initialize();
}
Expand All @@ -77,9 +77,9 @@ QMQTT::Network::Network(SocketInterface* socketInterface, TimerInterface* timerI
, _host(DEFAULT_HOST)
, _autoReconnect(DEFAULT_AUTORECONNECT)
, _autoReconnectInterval(DEFAULT_AUTORECONNECT_INTERVAL_MS)
, _bytesRemaining(0)
, _socket(socketInterface)
, _autoReconnectTimer(timerInterface)
, _readState(Header)
{
initialize();
}
Expand Down Expand Up @@ -127,7 +127,7 @@ void QMQTT::Network::connectToHost(const QString& hostName, const quint16 port)

void QMQTT::Network::connectToHost()
{
_bytesRemaining = 0;
_readState = Header;
if (_hostName.isEmpty())
{
_socket->connectToHost(_host, _port);
Expand Down Expand Up @@ -190,59 +190,43 @@ void QMQTT::Network::setAutoReconnectInterval(const int autoReconnectInterval)
void QMQTT::Network::onSocketReadReady()
{
QIODevice *ioDevice = _socket->ioDevice();
while(!ioDevice->atEnd())
{
if(_bytesRemaining == 0)
{
if (!ioDevice->getChar(reinterpret_cast<char *>(&_header)))
{
// malformed packet
emit error(QAbstractSocket::OperationError);
ioDevice->close();
return;
}

_bytesRemaining = readRemainingLength();
if (_bytesRemaining < 0)
{
// malformed remaining length
emit error(QAbstractSocket::OperationError);
ioDevice->close();
return;
// Only read the available (cached) bytes, so the read will never block.
QByteArray data = ioDevice->read(ioDevice->bytesAvailable());
foreach(char byte, data) {
switch (_readState) {
case Header:
_header = static_cast<quint8>(byte);
_readState = Length;
_length = 0;
_shift = 0;
break;
case Length:
_length |= (byte & 0x7F) << _shift;
_shift += 7;
if ((byte & 0x80) != 0)
break;
if (_length == 0) {
_readState = Header;
Frame frame(_header, _data);
emit received(frame);
break;
}
}

QByteArray data = ioDevice->read(_bytesRemaining);
_buffer.append(data);
_bytesRemaining -= data.size();

if(_bytesRemaining == 0)
{
Frame frame(_header, _buffer);
_buffer.clear();
_readState = PayLoad;
_data.clear();
break;
case PayLoad:
_data.append(byte);
--_length;
if (_length > 0)
break;
_readState = Header;
Frame frame(_header, _data);
emit received(frame);
break;
}
}
}

int QMQTT::Network::readRemainingLength()
{
quint8 byte = 0;
int length = 0;
int multiplier = 1;
QIODevice *ioDevice = _socket->ioDevice();
do {
if (!ioDevice->getChar(reinterpret_cast<char *>(&byte)))
return -1;
length += (byte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128*128*128)
return -1;
} while ((byte & 128) != 0);

return length;
}

void QMQTT::Network::onDisconnected()
{
emit disconnected();
Expand Down
16 changes: 12 additions & 4 deletions src/mqtt/qmqtt_network_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,27 @@ protected slots:

protected:
void initialize();
int readRemainingLength();

quint16 _port;
QHostAddress _host;
QString _hostName;
QByteArray _buffer;
bool _autoReconnect;
int _autoReconnectInterval;
int _bytesRemaining;
quint8 _header;
SocketInterface* _socket;
TimerInterface* _autoReconnectTimer;

enum ReadState {
Header,
Length,
PayLoad
};

ReadState _readState;
quint8 _header;
int _length;
int _shift;
QByteArray _data;

protected slots:
void onSocketReadReady();
void onDisconnected();
Expand Down
1 change: 1 addition & 0 deletions tests/gtest/tests/iodevicemock.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class IODeviceMock : public QIODevice
MOCK_METHOD2(readData, qint64(char*, qint64));
MOCK_METHOD2(writeData, qint64(const char *, qint64));
MOCK_CONST_METHOD0(openMode, QIODevice::OpenMode());
MOCK_CONST_METHOD0(bytesAvailable, qint64());

MOCK_METHOD2(write, qint64(const char*, qint64));
MOCK_METHOD1(write, qint64(const char*));
Expand Down
8 changes: 4 additions & 4 deletions tests/gtest/tests/networktest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class NetworkTest : public Test
return actualLength;
}

bool fixtureByteArrayIsEmpty() const
qint64 fixtureBytesAvailable() const
{
return _byteArray.isEmpty();
return _byteArray.size();
}

SocketMock* _socketMock;
Expand Down Expand Up @@ -164,8 +164,8 @@ TEST_F(NetworkTest, networkEmitsReceivedSignalOnceAFrameIsReceived_Test)
buffer.close();
EXPECT_EQ(132, _byteArray.size());

EXPECT_CALL(*_socketMock->mockIoDevice, atEnd())
.WillRepeatedly(Invoke(this, &NetworkTest::fixtureByteArrayIsEmpty));
EXPECT_CALL(*_socketMock->mockIoDevice, bytesAvailable())
.WillRepeatedly(Invoke(this, &NetworkTest::fixtureBytesAvailable));
EXPECT_CALL(*_socketMock->mockIoDevice, readData(_, _))
.WillRepeatedly(Invoke(this, &NetworkTest::readDataFromFixtureByteArray));

Expand Down

0 comments on commit 06256c1

Please sign in to comment.