Skip to content

Commit 9473375

Browse files
committed
Fixed qos2_publish_handled_ logic.
qos2_publish_handled_ was inserted and erased but not referenced. When PUBLISH QoS2 packet is received, if qos2_publish_handled_ has the packet_id then on_publish isn't called. (Exactly once delivery) See #49.
1 parent f205716 commit 9473375

File tree

3 files changed

+126
-124
lines changed

3 files changed

+126
-124
lines changed

include/mqtt/async_client.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ class async_client : public client<Socket, PacketIdBytes> {
267267
) {
268268
set_auto_pub_response();
269269
base::set_async_pingreq(true);
270-
base::set_async_error_notify(true);
270+
base::set_async_notify(true);
271271
}
272272
};
273273

include/mqtt/endpoint.hpp

+101-66
Original file line numberDiff line numberDiff line change
@@ -858,14 +858,16 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
858858
}
859859

860860
/**
861-
* @brief Set async error notify flag
862-
* @param async send connack/disconnect if error happens
861+
* @brief Set async notify flag
862+
* @param async send packet
863863
*
864864
* MQTT protocol requests sending connack/disconnect packet with error reason code if some error happens.<BR>
865865
* This function choose sync/async connack/disconnect.<BR>
866+
* MQTT protocol requests sending pubrec even if the corresponding publish has already been handled.<BR>
867+
* This function choose sync/async pubrec.<BR>
866868
*/
867-
void set_async_error_notify(bool async = true) {
868-
async_error_notify_ = async;
869+
void set_async_notify(bool async = true) {
870+
async_notify_ = async;
869871
}
870872

871873
/**
@@ -5023,7 +5025,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
50235025
}
50245026

50255027
void send_error_disconnect(v5::disconnect_reason_code rc) {
5026-
if (async_error_notify_) {
5028+
if (async_notify_) {
50275029
async_disconnect(rc);
50285030
}
50295031
else {
@@ -5032,7 +5034,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
50325034
}
50335035

50345036
void send_error_connack(v5::connect_reason_code rc) {
5035-
if (async_error_notify_) {
5037+
if (async_notify_) {
50365038
async_connack(false, rc);
50375039
}
50385040
else {
@@ -7342,23 +7344,12 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
73427344
[&] {
73437345
switch (ep_.version_) {
73447346
case protocol_version::v3_1_1:
7345-
if (ep_.on_publish(
7346-
packet_id_,
7347-
publish_options(ep_.fixed_header_),
7348-
force_move(topic_name_),
7349-
force_move(force_move(variant_get<buffer>(var))))) {
7350-
ep_.on_mqtt_message_processed(
7351-
force_move(
7352-
std::get<0>(
7353-
any_cast<
7354-
std::tuple<any, process_type_sp>
7355-
>(session_life_keeper)
7356-
)
7357-
)
7358-
);
7359-
return true;
7360-
}
7361-
break;
7347+
return ep_.on_publish(
7348+
packet_id_,
7349+
publish_options(ep_.fixed_header_),
7350+
force_move(topic_name_),
7351+
force_move(variant_get<buffer>(var))
7352+
);
73627353
case protocol_version::v5:
73637354
if (topic_name_.empty()) {
73647355
if (auto topic_alias = get_topic_alias_from_props(props_)) {
@@ -7390,37 +7381,43 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
73907381
}
73917382
}
73927383
}
7393-
if (ep_.on_v5_publish(
7394-
packet_id_,
7395-
publish_options(ep_.fixed_header_),
7396-
force_move(topic_name_),
7397-
force_move(variant_get<buffer>(var)),
7398-
force_move(props_)
7399-
)
7400-
) {
7401-
ep_.on_mqtt_message_processed(
7402-
force_move(
7403-
std::get<0>(
7404-
any_cast<
7405-
std::tuple<any, process_type_sp>
7406-
>(session_life_keeper)
7407-
)
7408-
)
7409-
);
7410-
return true;
7411-
}
7412-
break;
7384+
return ep_.on_v5_publish(
7385+
packet_id_,
7386+
publish_options(ep_.fixed_header_),
7387+
force_move(topic_name_),
7388+
force_move(variant_get<buffer>(var)),
7389+
force_move(props_)
7390+
);
74137391
default:
74147392
BOOST_ASSERT(false);
74157393
}
74167394
return false;
74177395
};
74187396
switch (qos_value_) {
74197397
case qos::at_most_once:
7420-
handler_call();
7398+
if (handler_call()) {
7399+
ep_.on_mqtt_message_processed(
7400+
force_move(
7401+
std::get<0>(
7402+
any_cast<
7403+
std::tuple<any, process_type_sp>
7404+
>(session_life_keeper)
7405+
)
7406+
)
7407+
);
7408+
}
74217409
break;
74227410
case qos::at_least_once:
74237411
if (handler_call()) {
7412+
ep_.on_mqtt_message_processed(
7413+
force_move(
7414+
std::get<0>(
7415+
any_cast<
7416+
std::tuple<any, process_type_sp>
7417+
>(session_life_keeper)
7418+
)
7419+
)
7420+
);
74247421
ep_.auto_pub_response(
74257422
[this] {
74267423
if (ep_.connected_) {
@@ -7445,29 +7442,67 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
74457442
}
74467443
break;
74477444
case qos::exactly_once:
7448-
if (handler_call()) {
7449-
ep_.qos2_publish_handled_.emplace(*packet_id_);
7450-
ep_.auto_pub_response(
7451-
[this] {
7452-
if (ep_.connected_) {
7453-
ep_.send_pubrec(
7454-
*packet_id_,
7455-
v5::pubrec_reason_code::success,
7456-
v5::properties{}
7457-
);
7458-
}
7459-
},
7460-
[this] {
7461-
if (ep_.connected_) {
7462-
ep_.async_send_pubrec(
7463-
*packet_id_,
7464-
v5::pubrec_reason_code::success,
7465-
v5::properties{},
7466-
[](auto){}
7467-
);
7445+
if (ep_.qos2_publish_handled_.find(*packet_id_) == ep_.qos2_publish_handled_.end()) {
7446+
if (handler_call()) {
7447+
ep_.on_mqtt_message_processed(
7448+
force_move(
7449+
std::get<0>(
7450+
any_cast<
7451+
std::tuple<any, process_type_sp>
7452+
>(session_life_keeper)
7453+
)
7454+
)
7455+
);
7456+
ep_.qos2_publish_handled_.emplace(*packet_id_);
7457+
ep_.auto_pub_response(
7458+
[this] {
7459+
if (ep_.connected_) {
7460+
ep_.send_pubrec(
7461+
*packet_id_,
7462+
v5::pubrec_reason_code::success,
7463+
v5::properties{}
7464+
);
7465+
}
7466+
},
7467+
[this] {
7468+
if (ep_.connected_) {
7469+
ep_.async_send_pubrec(
7470+
*packet_id_,
7471+
v5::pubrec_reason_code::success,
7472+
v5::properties{},
7473+
[](auto){}
7474+
);
7475+
}
74687476
}
7469-
}
7477+
);
7478+
}
7479+
}
7480+
else {
7481+
// publish has already been handled
7482+
ep_.on_mqtt_message_processed(
7483+
force_move(
7484+
std::get<0>(
7485+
any_cast<
7486+
std::tuple<any, process_type_sp>
7487+
>(session_life_keeper)
7488+
)
7489+
)
74707490
);
7491+
if (ep_.async_notify_) {
7492+
ep_.async_send_pubrec(
7493+
*packet_id_,
7494+
v5::pubrec_reason_code::success,
7495+
v5::properties{},
7496+
[](auto){}
7497+
);
7498+
}
7499+
else {
7500+
ep_.send_pubrec(
7501+
*packet_id_,
7502+
v5::pubrec_reason_code::success,
7503+
v5::properties{}
7504+
);
7505+
}
74717506
}
74727507
break;
74737508
}
@@ -10458,7 +10493,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
1045810493
std::set<packet_id_t> sub_unsub_inflight_;
1045910494
bool auto_pub_response_{true};
1046010495
bool auto_pub_response_async_{false};
10461-
bool async_error_notify_{false};
10496+
bool async_notify_{false};
1046210497
bool async_send_store_ { false };
1046310498
bool async_read_on_message_processed_ { true };
1046410499
bool disconnect_requested_{false};

test/system/st_resend.cpp

+24-57
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,6 @@ BOOST_AUTO_TEST_CASE( publish_qos2_from_broker ) {
10801080
deps("h_error", "h_pubcomp", "h_publish1"),
10811081
// connect
10821082
cont("h_connack3"),
1083-
cont("h_publish2"),
10841083
// disconnect
10851084
cont("h_close2"),
10861085
};
@@ -1105,6 +1104,7 @@ BOOST_AUTO_TEST_CASE( publish_qos2_from_broker ) {
11051104
[&] {
11061105
MQTT_CHK("h_connack3");
11071106
BOOST_TEST(sp == true);
1107+
c->disconnect();
11081108
}
11091109
);
11101110
BOOST_TEST(ret);
@@ -1127,34 +1127,17 @@ BOOST_AUTO_TEST_CASE( publish_qos2_from_broker ) {
11271127
MQTT_NS::publish_options pubopts,
11281128
MQTT_NS::buffer topic,
11291129
MQTT_NS::buffer contents) {
1130-
auto ret = MQTT_ORDERED(
1131-
[&] {
1132-
MQTT_CHK("h_publish1");
1133-
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
1134-
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::exactly_once);
1135-
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
1136-
BOOST_CHECK(packet_id);
1137-
pid_pub = packet_id.value();
1138-
BOOST_TEST(topic == "topic1");
1139-
BOOST_TEST(contents == "topic1_contents");
1140-
if (chk.passed("h_pubcomp")) {
1141-
c->force_disconnect();
1142-
}
1143-
},
1144-
[&] {
1145-
MQTT_CHK("h_publish2");
1146-
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::yes);
1147-
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::exactly_once);
1148-
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
1149-
BOOST_CHECK(packet_id);
1150-
BOOST_TEST(packet_id.value() == pid_pub);
1151-
BOOST_TEST(topic == "topic1");
1152-
BOOST_TEST(contents == "topic1_contents");
1153-
c->pubrec(packet_id.value());
1154-
c->disconnect();
1155-
}
1156-
);
1157-
BOOST_TEST(ret);
1130+
MQTT_CHK("h_publish1");
1131+
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
1132+
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::exactly_once);
1133+
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
1134+
BOOST_CHECK(packet_id);
1135+
pid_pub = packet_id.value();
1136+
BOOST_TEST(topic == "topic1");
1137+
BOOST_TEST(contents == "topic1_contents");
1138+
if (chk.passed("h_pubcomp")) {
1139+
c->force_disconnect();
1140+
}
11581141
return true;
11591142
}
11601143
);
@@ -1198,6 +1181,7 @@ BOOST_AUTO_TEST_CASE( publish_qos2_from_broker ) {
11981181
[&] {
11991182
MQTT_CHK("h_connack3");
12001183
BOOST_TEST(sp == true);
1184+
c->disconnect();
12011185
}
12021186
);
12031187
BOOST_TEST(ret);
@@ -1221,34 +1205,17 @@ BOOST_AUTO_TEST_CASE( publish_qos2_from_broker ) {
12211205
MQTT_NS::buffer topic,
12221206
MQTT_NS::buffer contents,
12231207
MQTT_NS::v5::properties /*props*/) {
1224-
auto ret = MQTT_ORDERED(
1225-
[&] {
1226-
MQTT_CHK("h_publish1");
1227-
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
1228-
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::exactly_once);
1229-
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
1230-
BOOST_CHECK(packet_id);
1231-
pid_pub = packet_id.value();
1232-
BOOST_TEST(topic == "topic1");
1233-
BOOST_TEST(contents == "topic1_contents");
1234-
if (chk.passed("h_pubcomp")) {
1235-
c->force_disconnect();
1236-
}
1237-
},
1238-
[&] {
1239-
MQTT_CHK("h_publish2");
1240-
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::yes);
1241-
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::exactly_once);
1242-
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
1243-
BOOST_CHECK(packet_id);
1244-
BOOST_TEST(packet_id.value() == pid_pub);
1245-
BOOST_TEST(topic == "topic1");
1246-
BOOST_TEST(contents == "topic1_contents");
1247-
c->pubrec(packet_id.value());
1248-
c->disconnect();
1249-
}
1250-
);
1251-
BOOST_TEST(ret);
1208+
MQTT_CHK("h_publish1");
1209+
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
1210+
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::exactly_once);
1211+
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
1212+
BOOST_CHECK(packet_id);
1213+
pid_pub = packet_id.value();
1214+
BOOST_TEST(topic == "topic1");
1215+
BOOST_TEST(contents == "topic1_contents");
1216+
if (chk.passed("h_pubcomp")) {
1217+
c->force_disconnect();
1218+
}
12521219
return true;
12531220
}
12541221
);

0 commit comments

Comments
 (0)