@@ -200,7 +200,6 @@ protected function performConnectionHandshake(
200
200
): void
201
201
{
202
202
try {
203
- $ i = 0 ;
204
203
$ buffer = '' ;
205
204
206
205
// protocol header
@@ -213,48 +212,33 @@ protected function performConnectionHandshake(
213
212
$ buffer .= chr (0x64 ); // protocol name: d
214
213
$ buffer .= chr (0x70 ); // protocol name: p
215
214
$ buffer .= chr (0x03 ); // protocol version (3.1)
216
- $ i += 9 ;
217
215
218
216
// connection flags
219
- $ flags = $ this ->buildConnectionFlags ($ username , $ password , $ sendCleanSessionFlag );
220
- $ buffer .= chr ($ flags );
221
- $ i ++;
217
+ $ buffer .= chr ($ this ->buildConnectionFlags ($ username , $ password , $ sendCleanSessionFlag ));
222
218
223
219
// keep alive settings
224
220
$ buffer .= chr ($ this ->settings ->getKeepAlive () >> 8 );
225
221
$ buffer .= chr ($ this ->settings ->getKeepAlive () & 0xff );
226
- $ i += 2 ;
227
222
228
223
// client id (connection identifier)
229
- $ clientIdPart = $ this ->buildLengthPrefixedString ($ this ->clientId );
230
- $ buffer .= $ clientIdPart ;
231
- $ i += strlen ($ clientIdPart );
224
+ $ buffer .= $ this ->buildLengthPrefixedString ($ this ->clientId );
232
225
233
226
// last will topic and message
234
227
if ($ this ->settings ->hasLastWill ()) {
235
- $ topicPart = $ this ->buildLengthPrefixedString ($ this ->settings ->getLastWillTopic ());
236
- $ buffer .= $ topicPart ;
237
- $ i += strlen ($ topicPart );
238
-
239
- $ messagePart = $ this ->buildLengthPrefixedString ($ this ->settings ->getLastWillMessage ());
240
- $ buffer .= $ messagePart ;
241
- $ i += strlen ($ messagePart );
228
+ $ buffer .= $ this ->buildLengthPrefixedString ($ this ->settings ->getLastWillTopic ());
229
+ $ buffer .= $ this ->buildLengthPrefixedString ($ this ->settings ->getLastWillMessage ());
242
230
}
243
231
244
232
// credentials
245
233
if ($ username !== null ) {
246
- $ usernamePart = $ this ->buildLengthPrefixedString ($ username );
247
- $ buffer .= $ usernamePart ;
248
- $ i += strlen ($ usernamePart );
234
+ $ buffer .= $ this ->buildLengthPrefixedString ($ username );
249
235
}
250
236
if ($ password !== null ) {
251
- $ passwordPart = $ this ->buildLengthPrefixedString ($ password );
252
- $ buffer .= $ passwordPart ;
253
- $ i += strlen ($ passwordPart );
237
+ $ buffer .= $ this ->buildLengthPrefixedString ($ password );
254
238
}
255
239
256
240
// message type and message length
257
- $ header = chr (0x10 ) . chr ( $ i );
241
+ $ header = chr (0x10 ) . $ this -> encodeMessageLength ( strlen ( $ buffer ) );
258
242
259
243
// send the connection message
260
244
$ this ->logger ->info ('Sending connection handshake to MQTT broker. ' );
@@ -458,33 +442,28 @@ protected function publishMessage(
458
442
call_user_func ($ handler , $ this , $ topic , $ message , $ messageId , $ qualityOfService , $ retain );
459
443
}
460
444
461
- $ i = 0 ;
462
445
$ buffer = '' ;
463
446
464
- $ topicPart = $ this ->buildLengthPrefixedString ($ topic );
465
- $ buffer .= $ topicPart ;
466
- $ i += strlen ($ topicPart );
447
+ $ buffer .= $ this ->buildLengthPrefixedString ($ topic );
467
448
468
449
if ($ messageId !== null ) {
469
450
$ buffer .= $ this ->encodeMessageId ($ messageId );
470
- $ i += 2 ;
471
451
}
472
452
473
453
$ buffer .= $ message ;
474
- $ i += strlen ($ message );
475
454
476
455
$ command = 0x30 ;
477
456
if ($ retain ) {
478
- $ command + = 1 << 0 ;
457
+ $ command | = 1 << 0 ;
479
458
}
480
459
if ($ qualityOfService > 0 ) {
481
- $ command + = $ qualityOfService << 1 ;
460
+ $ command | = $ qualityOfService << 1 ;
482
461
}
483
462
if ($ isDuplicate ) {
484
- $ command + = 1 << 3 ;
463
+ $ command | = 1 << 3 ;
485
464
}
486
465
487
- $ header = chr ($ command ) . $ this ->encodeMessageLength ($ i );
466
+ $ header = chr ($ command ) . $ this ->encodeMessageLength (strlen ( $ buffer ) );
488
467
489
468
$ this ->writeToSocket ($ header . $ buffer );
490
469
}
@@ -524,21 +503,19 @@ public function subscribe(string $topic, callable $callback, int $qualityOfServi
524
503
'qos ' => $ qualityOfService ,
525
504
]);
526
505
527
- $ i = 0 ;
528
- $ buffer = '' ;
529
506
$ messageId = $ this ->nextMessageId ();
530
- $ buffer .= $ this ->encodeMessageId ($ messageId );
531
- $ i += 2 ;
532
507
533
- $ topicPart = $ this ->buildLengthPrefixedString ($ topic );
534
- $ buffer .= $ topicPart ;
535
- $ i += strlen ($ topicPart );
536
- $ buffer .= chr ($ qualityOfService );
537
- $ i ++;
508
+ $ buffer = '' ;
509
+
510
+ $ buffer .= $ this ->encodeMessageId ($ messageId );
511
+
512
+ $ buffer .= $ this ->buildLengthPrefixedString ($ topic );
513
+
514
+ $ buffer .= chr ($ qualityOfService );
538
515
539
516
$ this ->repository ->addNewTopicSubscription ($ topic , $ callback , $ messageId , $ qualityOfService );
540
517
541
- $ header = chr (0x82 ) . chr ( $ i );
518
+ $ header = chr (0x82 ) . $ this -> encodeMessageLength ( strlen ( $ buffer ) );
542
519
543
520
$ this ->writeToSocket ($ header . $ buffer );
544
521
}
@@ -576,15 +553,15 @@ protected function sendUnsubscribeRequest(int $messageId, string $topic, bool $i
576
553
'is_duplicate ' => $ isDuplicate ,
577
554
]);
578
555
579
- $ buffer = $ this ->encodeMessageId ($ messageId );
580
- $ i = 2 ;
556
+ $ buffer = '' ;
557
+
558
+ $ buffer .= $ this ->encodeMessageId ($ messageId );
581
559
582
- $ topicPart = $ this ->buildLengthPrefixedString ($ topic );
583
- $ buffer .= $ topicPart ;
584
- $ i += strlen ($ topicPart );
560
+ $ buffer .= $ this ->buildLengthPrefixedString ($ topic );
585
561
586
562
$ command = 0xa2 | ($ isDuplicate ? 1 << 3 : 0 );
587
- $ header = chr ($ command ) . chr ($ i );
563
+
564
+ $ header = chr ($ command ) . $ this ->encodeMessageLength (strlen ($ buffer ));
588
565
589
566
$ this ->writeToSocket ($ header . $ buffer );
590
567
}
0 commit comments