Skip to content

Commit 6357928

Browse files
committed
Implement message expiry in broker
1 parent e155f05 commit 6357928

File tree

6 files changed

+858
-81
lines changed

6 files changed

+858
-81
lines changed

test/broker_offline_message.cpp

Lines changed: 266 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v3_1_1 ) {
247247

248248
BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
249249

250-
//
250+
// Process with no timeout:
251251
// c1 ---- broker ----- c2 (CleanSession: false)
252252
//
253253
// 1. c2 subscribe t1 QoS2
@@ -256,6 +256,8 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
256256
// 4. c1 publish t1 QoS1
257257
// 5. c1 publish t1 QoS2
258258
// 6. c2 connect again
259+
// Received published messages
260+
// 7. Disconnect
259261
//
260262

261263
boost::asio::io_context iocb;
@@ -349,7 +351,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
349351
}
350352
);
351353
c2->set_v5_connack_handler(
352-
[&chk, &c2]
354+
[&chk, &c1, &c2]
353355
(bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) {
354356
auto ret = chk.match(
355357
"c1_h_connack",
@@ -419,13 +421,16 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
419421
[&chk, &c2]
420422
(packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) {
421423
MQTT_CHK("c1_h_pubcomp");
422-
c2->connect(
423-
MQTT_NS::v5::properties{
424-
MQTT_NS::v5::property::session_expiry_interval(
425-
MQTT_NS::session_never_expire
426-
)
427-
}
428-
);
424+
425+
// Reconnect immediatly
426+
c2->connect(
427+
MQTT_NS::v5::properties{
428+
MQTT_NS::v5::property::session_expiry_interval(
429+
MQTT_NS::session_never_expire
430+
)
431+
}
432+
);
433+
429434
return true;
430435
}
431436
);
@@ -551,4 +556,256 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
551556
th.join();
552557
}
553558

559+
BOOST_AUTO_TEST_CASE( offline_pubsub_v5_timeout ) {
560+
561+
// Process with timeout:
562+
// c1 ---- broker ----- c2 (CleanSession: false)
563+
//
564+
// 1. c2 subscribe t1 QoS2
565+
// 2. c2 disconnect
566+
// 3. c1 publish t1 QoS0
567+
// 4. c1 publish t1 QoS1
568+
// 5. c1 publish t1 QoS2
569+
// * Wait for timeout of messages
570+
// 6. c2 connect again
571+
// Do not receive published messages
572+
// * Wait for possible published messages
573+
// 7. Disconnect
574+
//
575+
576+
boost::asio::io_context iocb;
577+
test_broker b(iocb);
578+
MQTT_NS::optional<test_server_no_tls> s;
579+
std::promise<void> p;
580+
auto f = p.get_future();
581+
std::thread th(
582+
[&] {
583+
s.emplace(iocb, b);
584+
p.set_value();
585+
iocb.run();
586+
}
587+
);
588+
f.wait();
589+
auto finish =
590+
[&] {
591+
as::post(
592+
iocb,
593+
[&] {
594+
s->close();
595+
}
596+
);
597+
};
598+
599+
boost::asio::io_context ioc;
600+
601+
auto c1 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5);
602+
c1->set_clean_session(true);
603+
c1->set_client_id("cid1");
604+
605+
auto c2 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5);
606+
c2->set_clean_session(false);
607+
c2->set_client_id("cid2");
608+
609+
using packet_id_t = typename std::remove_reference_t<decltype(*c1)>::packet_id_t;
610+
611+
checker chk = {
612+
cont("c1_h_connack"),
613+
cont("c2_h_connack1"),
614+
615+
// c2 subscribe t1 qos2
616+
cont("c2_h_suback"),
617+
cont("c2_h_close1"),
618+
619+
// c1 publish t1 qos0
620+
// c1 publish t1 qos1
621+
// c1 publish t1 qos2
622+
cont("c1_h_puback"),
623+
cont("c1_h_pubrec"),
624+
cont("c1_h_pubcomp"),
625+
626+
// c2 connect again
627+
cont("c2_h_connack2"),
628+
629+
cont("c1_h_close"),
630+
cont("c2_h_close2"),
631+
};
632+
633+
// Following is used to perform timeout of the message
634+
unsigned int message_timeout = 1;
635+
as::steady_timer timeout(ioc);
636+
637+
MQTT_NS::v5::properties ps {
638+
MQTT_NS::v5::property::payload_format_indicator(MQTT_NS::v5::property::payload_format_indicator::string),
639+
MQTT_NS::v5::property::message_expiry_interval(message_timeout),
640+
MQTT_NS::v5::property::content_type("content type"_mb),
641+
MQTT_NS::v5::property::topic_alias(0x1234U),
642+
MQTT_NS::v5::property::response_topic("response topic"_mb),
643+
MQTT_NS::v5::property::correlation_data("correlation data"_mb),
644+
MQTT_NS::v5::property::user_property("key1"_mb, "val1"_mb),
645+
MQTT_NS::v5::property::user_property("key2"_mb, "val2"_mb),
646+
};
647+
648+
auto prop_size = ps.size();
649+
std::size_t user_prop_count = 0;
650+
651+
c1->set_v5_connack_handler(
652+
[&chk, &c2]
653+
(bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) {
654+
MQTT_CHK("c1_h_connack");
655+
BOOST_TEST(sp == false);
656+
BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success);
657+
c2->connect(
658+
MQTT_NS::v5::properties{
659+
MQTT_NS::v5::property::session_expiry_interval(
660+
MQTT_NS::session_never_expire
661+
)
662+
}
663+
);
664+
return true;
665+
}
666+
);
667+
c2->set_v5_connack_handler(
668+
[&chk, &c1, &c2, &timeout, &message_timeout]
669+
(bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) {
670+
auto ret = chk.match(
671+
"c1_h_connack",
672+
[&] {
673+
MQTT_CHK("c2_h_connack1");
674+
BOOST_TEST(sp == false);
675+
BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success);
676+
c2->subscribe("topic1", MQTT_NS::qos::exactly_once);
677+
},
678+
"c2_h_connack1",
679+
[&] {
680+
MQTT_CHK("c2_h_connack2");
681+
BOOST_TEST(sp == true);
682+
BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success);
683+
684+
// Disconnect after timeout
685+
timeout.expires_after(std::chrono::seconds(1 + message_timeout));
686+
timeout.async_wait(
687+
[&](MQTT_NS::error_code ec) {
688+
if (!ec) {
689+
c1->disconnect();
690+
}
691+
}
692+
);
693+
694+
}
695+
);
696+
BOOST_TEST(ret);
697+
return true;
698+
}
699+
);
700+
c2->set_v5_suback_handler(
701+
[&chk, &c2]
702+
(packet_id_t, std::vector<MQTT_NS::v5::suback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) mutable {
703+
MQTT_CHK("c2_h_suback");
704+
BOOST_TEST(reasons.size() == 1U);
705+
BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_2);
706+
c2->disconnect();
707+
return true;
708+
}
709+
);
710+
c2->set_close_handler(
711+
[&chk, &c1, &finish, &ps]
712+
() {
713+
auto ret = chk.match(
714+
"c2_h_suback",
715+
[&] {
716+
MQTT_CHK("c2_h_close1");
717+
c1->publish("topic1", "topic1_contents1", MQTT_NS::qos::at_most_once, ps);
718+
c1->publish("topic1", "topic1_contents2", MQTT_NS::qos::at_least_once, ps);
719+
c1->publish("topic1", "topic1_contents3", MQTT_NS::qos::exactly_once, ps);
720+
},
721+
"c2_h_close1",
722+
[&] {
723+
MQTT_CHK("c2_h_close2");
724+
finish();
725+
}
726+
);
727+
BOOST_TEST(ret);
728+
729+
}
730+
);
731+
c1->set_v5_puback_handler(
732+
[&chk]
733+
(packet_id_t, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) {
734+
MQTT_CHK("c1_h_puback");
735+
return true;
736+
}
737+
);
738+
c1->set_v5_pubrec_handler(
739+
[&chk]
740+
(packet_id_t, MQTT_NS::v5::pubrec_reason_code, MQTT_NS::v5::properties /*props*/) {
741+
MQTT_CHK("c1_h_pubrec");
742+
return true;
743+
}
744+
);
745+
c1->set_v5_pubcomp_handler(
746+
[&chk, &c2, &message_timeout, &timeout]
747+
(packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) {
748+
MQTT_CHK("c1_h_pubcomp");
749+
750+
// Reconnect when messages are timed out
751+
timeout.expires_after(std::chrono::seconds(1 + message_timeout));
752+
timeout.async_wait(
753+
[&c2](MQTT_NS::error_code ec) {
754+
if (!ec) {
755+
c2->connect(
756+
MQTT_NS::v5::properties{
757+
MQTT_NS::v5::property::session_expiry_interval(
758+
MQTT_NS::session_never_expire
759+
)
760+
}
761+
);
762+
}
763+
}
764+
);
765+
766+
return true;
767+
}
768+
);
769+
c2->set_v5_publish_handler(
770+
[&chk, &c1]
771+
(MQTT_NS::optional<packet_id_t> packet_id,
772+
MQTT_NS::publish_options pubopts,
773+
MQTT_NS::buffer topic,
774+
MQTT_NS::buffer contents,
775+
MQTT_NS::v5::properties props) {
776+
777+
// We should not received any published message when offline messages timeout
778+
BOOST_TEST(false);
779+
return true;
780+
}
781+
);
782+
c1->set_close_handler(
783+
[&chk, &c2]
784+
() {
785+
MQTT_CHK("c1_h_close");
786+
c2->disconnect();
787+
}
788+
);
789+
790+
// error cases
791+
c1->set_error_handler(
792+
[]
793+
(MQTT_NS::error_code) {
794+
BOOST_CHECK(false);
795+
}
796+
);
797+
c2->set_error_handler(
798+
[]
799+
(MQTT_NS::error_code) {
800+
BOOST_CHECK(false);
801+
}
802+
);
803+
804+
c1->connect();
805+
806+
ioc.run();
807+
BOOST_TEST(chk.all());
808+
th.join();
809+
}
810+
554811
BOOST_AUTO_TEST_SUITE_END()

test/combi_test.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ inline void do_test(
6767
[&] {
6868
s->close();
6969
b.clear_all_sessions();
70+
b.clear_all_retained_topics();
7071
}
7172
);
7273
},
@@ -118,6 +119,7 @@ inline void do_tls_test(
118119
[&] {
119120
s->close();
120121
b.clear_all_sessions();
122+
b.clear_all_retained_topics();
121123
}
122124
);
123125
},
@@ -167,6 +169,7 @@ inline void do_ws_test(
167169
[&] {
168170
s->close();
169171
b.clear_all_sessions();
172+
b.clear_all_retained_topics();
170173
}
171174
);
172175
},
@@ -218,6 +221,7 @@ inline void do_tls_ws_test(
218221
[&] {
219222
s->close();
220223
b.clear_all_sessions();
224+
b.clear_all_retained_topics();
221225
}
222226
);
223227
},

0 commit comments

Comments
 (0)