Skip to content

Commit 4b14f5a

Browse files
tekka007henrikekblad
authored andcommitted
Update PubSubClient 2.7 (#1224)
1 parent bde7dad commit 4b14f5a

File tree

4 files changed

+144
-39
lines changed

4 files changed

+144
-39
lines changed

drivers/PubSubClient/CHANGES.txt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
1+
2.7
2+
* Fix remaining-length handling to prevent buffer overrun
3+
* Add large-payload API - beginPublish/write/publish/endPublish
4+
* Add yield call to improve reliability on ESP
5+
* Add Clean Session flag to connect options
6+
* Add ESP32 support for functional callback signature
7+
* Various other fixes
8+
19
2.4
210
* Add MQTT_SOCKET_TIMEOUT to prevent it blocking indefinitely
311
whilst waiting for inbound data
412
* Fixed return code when publishing >256 bytes
5-
13+
614
2.3
715
* Add publish(topic,payload,retained) function
816

drivers/PubSubClient/PubSubClient.cpp

Lines changed: 100 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@ PubSubClient::PubSubClient()
1717
this->stream = NULL;
1818
setCallback(NULL);
1919
}
20-
2120
// cppcheck-suppress uninitMemberVar
2221
PubSubClient::PubSubClient(Client& client)
2322
{
2423
this->_state = MQTT_DISCONNECTED;
2524
setClient(client);
2625
this->stream = NULL;
2726
}
28-
2927
// cppcheck-suppress uninitMemberVar
3028
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client)
3129
{
@@ -63,7 +61,6 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
6361
setClient(client);
6462
setStream(stream);
6563
}
66-
6764
// cppcheck-suppress uninitMemberVar
6865
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client)
6966
{
@@ -101,7 +98,6 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
10198
setClient(client);
10299
setStream(stream);
103100
}
104-
105101
// cppcheck-suppress uninitMemberVar
106102
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client)
107103
{
@@ -143,22 +139,29 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
143139

144140
bool PubSubClient::connect(const char *id)
145141
{
146-
return connect(id,NULL,NULL,0,0,0,0);
142+
return connect(id,NULL,NULL,0,0,0,0,1);
147143
}
148144

149145
bool PubSubClient::connect(const char *id, const char *user, const char *pass)
150146
{
151-
return connect(id,user,pass,0,0,0,0);
147+
return connect(id,user,pass,0,0,0,0,1);
152148
}
153149

154150
bool PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos,
155151
bool willRetain, const char* willMessage)
156152
{
157-
return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
153+
return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
158154
}
159155

160156
bool PubSubClient::connect(const char *id, const char *user, const char *pass,
161157
const char* willTopic, uint8_t willQos, bool willRetain, const char* willMessage)
158+
{
159+
return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
160+
}
161+
162+
bool PubSubClient::connect(const char *id, const char *user, const char *pass,
163+
const char* willTopic, uint8_t willQos, bool willRetain, const char* willMessage,
164+
bool cleanSession)
162165
{
163166
if (!connected()) {
164167
int result = 0;
@@ -171,7 +174,7 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
171174
if (result == 1) {
172175
nextMsgId = 1;
173176
// Leave room in the buffer for header and variable length field
174-
uint16_t length = 5;
177+
uint16_t length = MQTT_MAX_HEADER_SIZE;
175178
unsigned int j;
176179

177180
#if MQTT_VERSION == MQTT_VERSION_3_1
@@ -187,9 +190,12 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
187190

188191
uint8_t v;
189192
if (willTopic) {
190-
v = 0x06|(willQos<<3)|(willRetain<<5);
193+
v = 0x04|(willQos<<3)|(willRetain<<5);
191194
} else {
192-
v = 0x02;
195+
v = 0x00;
196+
}
197+
if (cleanSession) {
198+
v = v|0x02;
193199
}
194200

195201
if(user != NULL) {
@@ -204,20 +210,26 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
204210

205211
buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
206212
buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
213+
214+
CHECK_STRING_LENGTH(length,id)
207215
length = writeString(id,buffer,length);
208216
if (willTopic) {
217+
CHECK_STRING_LENGTH(length,willTopic)
209218
length = writeString(willTopic,buffer,length);
219+
CHECK_STRING_LENGTH(length,willMessage)
210220
length = writeString(willMessage,buffer,length);
211221
}
212222

213223
if(user != NULL) {
224+
CHECK_STRING_LENGTH(length,user)
214225
length = writeString(user,buffer,length);
215226
if(pass != NULL) {
227+
CHECK_STRING_LENGTH(length,pass)
216228
length = writeString(pass,buffer,length);
217229
}
218230
}
219231

220-
write(MQTTCONNECT,buffer,length-5);
232+
write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE);
221233

222234
lastInActivity = lastOutActivity = millis();
223235

@@ -256,6 +268,7 @@ bool PubSubClient::readByte(uint8_t * result)
256268
{
257269
uint32_t previousMillis = millis();
258270
while(!_client->available()) {
271+
yield();
259272
uint32_t currentMillis = millis();
260273
if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)) {
261274
return false;
@@ -291,6 +304,12 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength)
291304
uint8_t start = 0;
292305

293306
do {
307+
if (len == 5) {
308+
// Invalid remaining length encoding - kill the connection
309+
_state = MQTT_DISCONNECTED;
310+
_client->stop();
311+
return 0;
312+
}
294313
if(!readByte(&digit)) {
295314
return 0;
296315
}
@@ -364,16 +383,15 @@ bool PubSubClient::loop()
364383
uint8_t type = buffer[0]&0xF0;
365384
if (type == MQTTPUBLISH) {
366385
if (callback) {
367-
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
386+
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
387+
memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
388+
buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
389+
char *topic = (char*) buffer+llen+2;
368390
uint8_t *payload;
369-
char topic[tl+1];
370-
for (uint16_t i=0; i<tl; i++) {
371-
topic[i] = buffer[llen+3+i];
372-
}
373-
topic[tl] = 0;
374391
// msgId only present for QOS>0
375392
if ((buffer[0]&0x06) == MQTTQOS1) {
376-
uint16_t msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
393+
uint16_t msgId = 0;
394+
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
377395
payload = buffer+llen+3+tl+2;
378396
callback(topic,payload,len-llen-3-tl-2);
379397

@@ -396,6 +414,9 @@ bool PubSubClient::loop()
396414
} else if (type == MQTTPINGRESP) {
397415
pingOutstanding = false;
398416
}
417+
} else if (!connected()) {
418+
// readPacket has closed the connection
419+
return false;
399420
}
400421
}
401422
return true;
@@ -422,12 +443,12 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned i
422443
bool retained)
423444
{
424445
if (connected()) {
425-
if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) {
446+
if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) {
426447
// Too long
427448
return false;
428449
}
429450
// Leave room in the buffer for header and variable length field
430-
uint16_t length = 5;
451+
uint16_t length = MQTT_MAX_HEADER_SIZE;
431452
length = writeString(topic,buffer,length);
432453
uint16_t i;
433454
for (i=0; i<plength; i++) {
@@ -437,11 +458,16 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned i
437458
if (retained) {
438459
header |= 1;
439460
}
440-
return write(header,buffer,length-5);
461+
return write(header,buffer,length-MQTT_MAX_HEADER_SIZE);
441462
}
442463
return false;
443464
}
444465

466+
bool PubSubClient::publish_P(const char* topic, const char* payload, bool retained)
467+
{
468+
return publish_P(topic, (const uint8_t*)payload, strlen(payload), retained);
469+
}
470+
445471
bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength,
446472
bool retained)
447473
{
@@ -487,12 +513,46 @@ bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned
487513
return rc == tlen + 4 + plength;
488514
}
489515

490-
bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
516+
bool PubSubClient::beginPublish(const char* topic, unsigned int plength, bool retained)
517+
{
518+
if (connected()) {
519+
// Send the header and variable length field
520+
uint16_t length = MQTT_MAX_HEADER_SIZE;
521+
length = writeString(topic,buffer,length);
522+
uint8_t header = MQTTPUBLISH;
523+
if (retained) {
524+
header |= 1;
525+
}
526+
size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
527+
uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
528+
lastOutActivity = millis();
529+
return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
530+
}
531+
return false;
532+
}
533+
534+
int PubSubClient::endPublish()
535+
{
536+
return 1;
537+
}
538+
539+
size_t PubSubClient::write(uint8_t data)
540+
{
541+
lastOutActivity = millis();
542+
return _client->write(data);
543+
}
544+
545+
size_t PubSubClient::write(const uint8_t *buffer, size_t size)
546+
{
547+
lastOutActivity = millis();
548+
return _client->write(buffer,size);
549+
}
550+
551+
size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length)
491552
{
492553
uint8_t lenBuf[4];
493554
uint8_t llen = 0;
494555
uint8_t pos = 0;
495-
uint16_t rc;
496556
uint16_t len = length;
497557
do {
498558
uint8_t digit;
@@ -507,12 +567,19 @@ bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
507567

508568
buf[4-llen] = header;
509569
for (int i=0; i<llen; i++) {
510-
buf[5-llen+i] = lenBuf[i];
570+
buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
511571
}
572+
return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
573+
}
574+
575+
bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
576+
{
577+
uint16_t rc;
578+
uint8_t hlen = buildHeader(header, buf, length);
512579

513580
#ifdef MQTT_MAX_TRANSFER_SIZE
514-
uint8_t* writeBuf = buf+(4-llen);
515-
uint16_t bytesRemaining = length+1+llen; //Match the length type
581+
uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
582+
uint16_t bytesRemaining = length+hlen; //Match the length type
516583
uint8_t bytesToWrite;
517584
bool result = true;
518585
while((bytesRemaining > 0) && result) {
@@ -524,9 +591,9 @@ bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
524591
}
525592
return result;
526593
#else
527-
rc = _client->write(buf+(4-llen),length+1+llen);
594+
rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
528595
lastOutActivity = millis();
529-
return (rc == 1+llen+length);
596+
return (rc == hlen+length);
530597
#endif
531598
}
532599

@@ -537,7 +604,6 @@ bool PubSubClient::subscribe(const char* topic)
537604

538605
bool PubSubClient::subscribe(const char* topic, uint8_t qos)
539606
{
540-
// original: if (qos < 0 || qos > 1) { (qos is uint8_t, hence qos < 0 impossible, tekka)
541607
if (qos > 1) {
542608
return false;
543609
}
@@ -547,7 +613,7 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos)
547613
}
548614
if (connected()) {
549615
// Leave room in the buffer for header and variable length field
550-
uint16_t length = 5;
616+
uint16_t length = MQTT_MAX_HEADER_SIZE;
551617
nextMsgId++;
552618
if (nextMsgId == 0) {
553619
nextMsgId = 1;
@@ -556,7 +622,7 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos)
556622
buffer[length++] = (nextMsgId & 0xFF);
557623
length = writeString((char*)topic, buffer,length);
558624
buffer[length++] = qos;
559-
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
625+
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
560626
}
561627
return false;
562628
}
@@ -568,15 +634,15 @@ bool PubSubClient::unsubscribe(const char* topic)
568634
return false;
569635
}
570636
if (connected()) {
571-
uint16_t length = 5;
637+
uint16_t length = MQTT_MAX_HEADER_SIZE;
572638
nextMsgId++;
573639
if (nextMsgId == 0) {
574640
nextMsgId = 1;
575641
}
576642
buffer[length++] = (nextMsgId >> 8);
577643
buffer[length++] = (nextMsgId & 0xFF);
578644
length = writeString(topic, buffer,length);
579-
return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
645+
return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
580646
}
581647
return false;
582648
}
@@ -587,6 +653,7 @@ void PubSubClient::disconnect()
587653
buffer[1] = 0;
588654
_client->write(buffer,2);
589655
_state = MQTT_DISCONNECTED;
656+
_client->flush();
590657
_client->stop();
591658
lastInActivity = lastOutActivity = millis();
592659
}
@@ -644,7 +711,6 @@ PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port)
644711
this->port = port;
645712
return *this;
646713
}
647-
648714
// cppcheck-suppress passedByValue
649715
PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE)
650716
{

0 commit comments

Comments
 (0)