@@ -17,15 +17,13 @@ PubSubClient::PubSubClient()
17
17
this ->stream = NULL ;
18
18
setCallback (NULL );
19
19
}
20
-
21
20
// cppcheck-suppress uninitMemberVar
22
21
PubSubClient::PubSubClient (Client& client)
23
22
{
24
23
this ->_state = MQTT_DISCONNECTED;
25
24
setClient (client);
26
25
this ->stream = NULL ;
27
26
}
28
-
29
27
// cppcheck-suppress uninitMemberVar
30
28
PubSubClient::PubSubClient (IPAddress addr, uint16_t port, Client& client)
31
29
{
@@ -63,7 +61,6 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
63
61
setClient (client);
64
62
setStream (stream);
65
63
}
66
-
67
64
// cppcheck-suppress uninitMemberVar
68
65
PubSubClient::PubSubClient (uint8_t *ip, uint16_t port, Client& client)
69
66
{
@@ -101,7 +98,6 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
101
98
setClient (client);
102
99
setStream (stream);
103
100
}
104
-
105
101
// cppcheck-suppress uninitMemberVar
106
102
PubSubClient::PubSubClient (const char * domain, uint16_t port, Client& client)
107
103
{
@@ -143,22 +139,29 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
143
139
144
140
bool PubSubClient::connect (const char *id)
145
141
{
146
- return connect (id,NULL ,NULL ,0 ,0 ,0 ,0 );
142
+ return connect (id,NULL ,NULL ,0 ,0 ,0 ,0 , 1 );
147
143
}
148
144
149
145
bool PubSubClient::connect (const char *id, const char *user, const char *pass)
150
146
{
151
- return connect (id,user,pass,0 ,0 ,0 ,0 );
147
+ return connect (id,user,pass,0 ,0 ,0 ,0 , 1 );
152
148
}
153
149
154
150
bool PubSubClient::connect (const char *id, const char * willTopic, uint8_t willQos,
155
151
bool willRetain, const char * willMessage)
156
152
{
157
- return connect (id,NULL ,NULL ,willTopic,willQos,willRetain,willMessage);
153
+ return connect (id,NULL ,NULL ,willTopic,willQos,willRetain,willMessage, 1 );
158
154
}
159
155
160
156
bool PubSubClient::connect (const char *id, const char *user, const char *pass,
161
157
const char * willTopic, uint8_t willQos, bool willRetain, const char * willMessage)
158
+ {
159
+ return connect (id,user,pass,willTopic,willQos,willRetain,willMessage,1 );
160
+ }
161
+
162
+ bool PubSubClient::connect (const char *id, const char *user, const char *pass,
163
+ const char * willTopic, uint8_t willQos, bool willRetain, const char * willMessage,
164
+ bool cleanSession)
162
165
{
163
166
if (!connected ()) {
164
167
int result = 0 ;
@@ -171,7 +174,7 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
171
174
if (result == 1 ) {
172
175
nextMsgId = 1 ;
173
176
// Leave room in the buffer for header and variable length field
174
- uint16_t length = 5 ;
177
+ uint16_t length = MQTT_MAX_HEADER_SIZE ;
175
178
unsigned int j;
176
179
177
180
#if MQTT_VERSION == MQTT_VERSION_3_1
@@ -187,9 +190,12 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
187
190
188
191
uint8_t v;
189
192
if (willTopic) {
190
- v = 0x06 |(willQos<<3 )|(willRetain<<5 );
193
+ v = 0x04 |(willQos<<3 )|(willRetain<<5 );
191
194
} else {
192
- v = 0x02 ;
195
+ v = 0x00 ;
196
+ }
197
+ if (cleanSession) {
198
+ v = v|0x02 ;
193
199
}
194
200
195
201
if (user != NULL ) {
@@ -204,20 +210,26 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
204
210
205
211
buffer[length++] = ((MQTT_KEEPALIVE) >> 8 );
206
212
buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF );
213
+
214
+ CHECK_STRING_LENGTH (length,id)
207
215
length = writeString (id,buffer,length);
208
216
if (willTopic) {
217
+ CHECK_STRING_LENGTH (length,willTopic)
209
218
length = writeString (willTopic,buffer,length);
219
+ CHECK_STRING_LENGTH (length,willMessage)
210
220
length = writeString (willMessage,buffer,length);
211
221
}
212
222
213
223
if (user != NULL ) {
224
+ CHECK_STRING_LENGTH (length,user)
214
225
length = writeString (user,buffer,length);
215
226
if (pass != NULL ) {
227
+ CHECK_STRING_LENGTH (length,pass)
216
228
length = writeString (pass,buffer,length);
217
229
}
218
230
}
219
231
220
- write (MQTTCONNECT,buffer,length-5 );
232
+ write (MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE );
221
233
222
234
lastInActivity = lastOutActivity = millis ();
223
235
@@ -256,6 +268,7 @@ bool PubSubClient::readByte(uint8_t * result)
256
268
{
257
269
uint32_t previousMillis = millis ();
258
270
while (!_client->available ()) {
271
+ yield ();
259
272
uint32_t currentMillis = millis ();
260
273
if (currentMillis - previousMillis >= ((int32_t ) MQTT_SOCKET_TIMEOUT * 1000 )) {
261
274
return false ;
@@ -291,6 +304,12 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength)
291
304
uint8_t start = 0 ;
292
305
293
306
do {
307
+ if (len == 5 ) {
308
+ // Invalid remaining length encoding - kill the connection
309
+ _state = MQTT_DISCONNECTED;
310
+ _client->stop ();
311
+ return 0 ;
312
+ }
294
313
if (!readByte (&digit)) {
295
314
return 0 ;
296
315
}
@@ -364,16 +383,15 @@ bool PubSubClient::loop()
364
383
uint8_t type = buffer[0 ]&0xF0 ;
365
384
if (type == MQTTPUBLISH) {
366
385
if (callback) {
367
- uint16_t tl = (buffer[llen+1 ]<<8 )+buffer[llen+2 ];
386
+ uint16_t tl = (buffer[llen+1 ]<<8 )+buffer[llen+2 ]; /* topic length in bytes */
387
+ memmove (buffer+llen+2 ,buffer+llen+3 ,tl); /* move topic inside buffer 1 byte to front */
388
+ buffer[llen+2 +tl] = 0 ; /* end the topic as a 'C' string with \x00 */
389
+ char *topic = (char *) buffer+llen+2 ;
368
390
uint8_t *payload;
369
- char topic[tl+1 ];
370
- for (uint16_t i=0 ; i<tl; i++) {
371
- topic[i] = buffer[llen+3 +i];
372
- }
373
- topic[tl] = 0 ;
374
391
// msgId only present for QOS>0
375
392
if ((buffer[0 ]&0x06 ) == MQTTQOS1) {
376
- uint16_t msgId = (buffer[llen+3 +tl]<<8 )+buffer[llen+3 +tl+1 ];
393
+ uint16_t msgId = 0 ;
394
+ msgId = (buffer[llen+3 +tl]<<8 )+buffer[llen+3 +tl+1 ];
377
395
payload = buffer+llen+3 +tl+2 ;
378
396
callback (topic,payload,len-llen-3 -tl-2 );
379
397
@@ -396,6 +414,9 @@ bool PubSubClient::loop()
396
414
} else if (type == MQTTPINGRESP) {
397
415
pingOutstanding = false ;
398
416
}
417
+ } else if (!connected ()) {
418
+ // readPacket has closed the connection
419
+ return false ;
399
420
}
400
421
}
401
422
return true ;
@@ -422,12 +443,12 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned i
422
443
bool retained)
423
444
{
424
445
if (connected ()) {
425
- if (MQTT_MAX_PACKET_SIZE < 5 + 2 +strlen (topic) + plength) {
446
+ if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2 +strlen (topic) + plength) {
426
447
// Too long
427
448
return false ;
428
449
}
429
450
// Leave room in the buffer for header and variable length field
430
- uint16_t length = 5 ;
451
+ uint16_t length = MQTT_MAX_HEADER_SIZE ;
431
452
length = writeString (topic,buffer,length);
432
453
uint16_t i;
433
454
for (i=0 ; i<plength; i++) {
@@ -437,11 +458,16 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned i
437
458
if (retained) {
438
459
header |= 1 ;
439
460
}
440
- return write (header,buffer,length-5 );
461
+ return write (header,buffer,length-MQTT_MAX_HEADER_SIZE );
441
462
}
442
463
return false ;
443
464
}
444
465
466
+ bool PubSubClient::publish_P (const char * topic, const char * payload, bool retained)
467
+ {
468
+ return publish_P (topic, (const uint8_t *)payload, strlen (payload), retained);
469
+ }
470
+
445
471
bool PubSubClient::publish_P (const char * topic, const uint8_t * payload, unsigned int plength,
446
472
bool retained)
447
473
{
@@ -487,12 +513,46 @@ bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned
487
513
return rc == tlen + 4 + plength;
488
514
}
489
515
490
- bool PubSubClient::write (uint8_t header, uint8_t * buf, uint16_t length)
516
+ bool PubSubClient::beginPublish (const char * topic, unsigned int plength, bool retained)
517
+ {
518
+ if (connected ()) {
519
+ // Send the header and variable length field
520
+ uint16_t length = MQTT_MAX_HEADER_SIZE;
521
+ length = writeString (topic,buffer,length);
522
+ uint8_t header = MQTTPUBLISH;
523
+ if (retained) {
524
+ header |= 1 ;
525
+ }
526
+ size_t hlen = buildHeader (header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
527
+ uint16_t rc = _client->write (buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
528
+ lastOutActivity = millis ();
529
+ return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
530
+ }
531
+ return false ;
532
+ }
533
+
534
+ int PubSubClient::endPublish ()
535
+ {
536
+ return 1 ;
537
+ }
538
+
539
+ size_t PubSubClient::write (uint8_t data)
540
+ {
541
+ lastOutActivity = millis ();
542
+ return _client->write (data);
543
+ }
544
+
545
+ size_t PubSubClient::write (const uint8_t *buffer, size_t size)
546
+ {
547
+ lastOutActivity = millis ();
548
+ return _client->write (buffer,size);
549
+ }
550
+
551
+ size_t PubSubClient::buildHeader (uint8_t header, uint8_t * buf, uint16_t length)
491
552
{
492
553
uint8_t lenBuf[4 ];
493
554
uint8_t llen = 0 ;
494
555
uint8_t pos = 0 ;
495
- uint16_t rc;
496
556
uint16_t len = length;
497
557
do {
498
558
uint8_t digit;
@@ -507,12 +567,19 @@ bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
507
567
508
568
buf[4 -llen] = header;
509
569
for (int i=0 ; i<llen; i++) {
510
- buf[5 -llen+i] = lenBuf[i];
570
+ buf[MQTT_MAX_HEADER_SIZE -llen+i] = lenBuf[i];
511
571
}
572
+ return llen+1 ; // Full header size is variable length bit plus the 1-byte fixed header
573
+ }
574
+
575
+ bool PubSubClient::write (uint8_t header, uint8_t * buf, uint16_t length)
576
+ {
577
+ uint16_t rc;
578
+ uint8_t hlen = buildHeader (header, buf, length);
512
579
513
580
#ifdef MQTT_MAX_TRANSFER_SIZE
514
- uint8_t * writeBuf = buf+(4 -llen );
515
- uint16_t bytesRemaining = length+1 +llen ; // Match the length type
581
+ uint8_t * writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen );
582
+ uint16_t bytesRemaining = length+hlen ; // Match the length type
516
583
uint8_t bytesToWrite;
517
584
bool result = true ;
518
585
while ((bytesRemaining > 0 ) && result) {
@@ -524,9 +591,9 @@ bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
524
591
}
525
592
return result;
526
593
#else
527
- rc = _client->write (buf+(4 -llen ),length+1 +llen );
594
+ rc = _client->write (buf+(MQTT_MAX_HEADER_SIZE-hlen ),length+hlen );
528
595
lastOutActivity = millis ();
529
- return (rc == 1 +llen +length);
596
+ return (rc == hlen +length);
530
597
#endif
531
598
}
532
599
@@ -537,7 +604,6 @@ bool PubSubClient::subscribe(const char* topic)
537
604
538
605
bool PubSubClient::subscribe (const char * topic, uint8_t qos)
539
606
{
540
- // original: if (qos < 0 || qos > 1) { (qos is uint8_t, hence qos < 0 impossible, tekka)
541
607
if (qos > 1 ) {
542
608
return false ;
543
609
}
@@ -547,7 +613,7 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos)
547
613
}
548
614
if (connected ()) {
549
615
// Leave room in the buffer for header and variable length field
550
- uint16_t length = 5 ;
616
+ uint16_t length = MQTT_MAX_HEADER_SIZE ;
551
617
nextMsgId++;
552
618
if (nextMsgId == 0 ) {
553
619
nextMsgId = 1 ;
@@ -556,7 +622,7 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos)
556
622
buffer[length++] = (nextMsgId & 0xFF );
557
623
length = writeString ((char *)topic, buffer,length);
558
624
buffer[length++] = qos;
559
- return write (MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5 );
625
+ return write (MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE );
560
626
}
561
627
return false ;
562
628
}
@@ -568,15 +634,15 @@ bool PubSubClient::unsubscribe(const char* topic)
568
634
return false ;
569
635
}
570
636
if (connected ()) {
571
- uint16_t length = 5 ;
637
+ uint16_t length = MQTT_MAX_HEADER_SIZE ;
572
638
nextMsgId++;
573
639
if (nextMsgId == 0 ) {
574
640
nextMsgId = 1 ;
575
641
}
576
642
buffer[length++] = (nextMsgId >> 8 );
577
643
buffer[length++] = (nextMsgId & 0xFF );
578
644
length = writeString (topic, buffer,length);
579
- return write (MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5 );
645
+ return write (MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE );
580
646
}
581
647
return false ;
582
648
}
@@ -587,6 +653,7 @@ void PubSubClient::disconnect()
587
653
buffer[1 ] = 0 ;
588
654
_client->write (buffer,2 );
589
655
_state = MQTT_DISCONNECTED;
656
+ _client->flush ();
590
657
_client->stop ();
591
658
lastInActivity = lastOutActivity = millis ();
592
659
}
@@ -644,7 +711,6 @@ PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port)
644
711
this ->port = port;
645
712
return *this ;
646
713
}
647
-
648
714
// cppcheck-suppress passedByValue
649
715
PubSubClient& PubSubClient::setCallback (MQTT_CALLBACK_SIGNATURE)
650
716
{
0 commit comments