Skip to content

Commit

Permalink
Implemented new event consumer examples and fixed some bugs with it.
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Jul 7, 2024
1 parent 0b85abd commit 3ef1292
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 28 deletions.
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ set(EXECUTABLES
async_subscribe
async_consume
async_consume_v5
async_message_consume
async_message_consume_v5
data_publish
mqttpp_chat
multithr_pub_sub
Expand Down
25 changes: 14 additions & 11 deletions examples/async_consume.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ int main(int argc, char* argv[])
.automatic_reconnect()
.finalize();

// The client will handle automatic reconnects, but we add this
// callbacks to let the user know when we're reconnected.
cli.set_connected_handler([](const std::string&) {
cout << "\n*** Connected ***" << endl;
});

try {
// Start consumer before connecting to make sure to not miss any messages

Expand Down Expand Up @@ -96,13 +90,22 @@ int main(int argc, char* argv[])

cout << "\nWaiting for messages on topic: '" << TOPIC << "'" << endl;

// The client handles automatic reconnects, but we monitor
// the events here to report them to the user.
while (true) {
auto msg = cli.consume_message();

if (msg)
cout << msg->get_topic() << ": " << msg->to_string() << endl;
else
auto evt = cli.consume_event();

if (const auto* p = std::get_if<mqtt::message_arrived_event>(&evt)) {
auto& msg = p->msg;
if (msg)
cout << msg->get_topic() << ": " << msg->to_string() << endl;
}
else if (std::holds_alternative<mqtt::connected_event>(evt))
cout << "\n*** Connected ***" << endl;
else if (std::holds_alternative<mqtt::connection_lost_event>(evt))
cout << "*** Connection Lost ***" << endl;
else
cout << "???" << endl;
}
}
catch (const mqtt::exception& exc) {
Expand Down
34 changes: 19 additions & 15 deletions examples/async_consume_v5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,6 @@ int main(int argc, char* argv[])
.finalize();

try {
cli.set_connection_lost_handler([](const std::string&) {
cout << "*** Connection Lost ***" << endl;
});

cli.set_disconnected_handler([](const mqtt::properties&, mqtt::ReasonCode reason) {
cout << "*** Disconnected. Reason [0x" << hex << int{reason} << "]: " << reason
<< " ***" << endl;
});

// Start consumer before connecting to make sure to not miss messages

cli.start_consuming();
Expand Down Expand Up @@ -105,10 +96,26 @@ int main(int argc, char* argv[])
cout << "\nWaiting for messages on topic: '" << TOPIC << "'" << endl;

while (true) {
auto msg = cli.consume_message();
if (!msg)

auto evt = cli.consume_event();

if (const auto* p = std::get_if<mqtt::message_arrived_event>(&evt)) {
auto& msg = p->msg;
if (msg)
cout << msg->get_topic() << ": " << msg->to_string() << endl;
}
else if (std::holds_alternative<mqtt::connected_event>(evt)) {
cout << "\n*** Connected ***" << endl;
}
else if (std::holds_alternative<mqtt::connection_lost_event>(evt)) {
cout << "*** Connection Lost ***" << endl;
break;
cout << msg->get_topic() << ": " << msg->to_string() << endl;
}
else if (const auto* p = std::get_if<mqtt::disconnected_event>(&evt)) {
cout << "*** Disconnected. Reason [0x" << hex << int{p->reasonCode}
<< "]: " << p->reasonCode << " ***" << endl;
break;
}
}

// If we're here, the client was almost certainly disconnected.
Expand All @@ -120,9 +127,6 @@ int main(int argc, char* argv[])
cli.disconnect()->wait();
cout << "OK" << endl;
}
else {
cout << "\nClient was disconnected" << endl;
}
}
catch (const mqtt::exception& exc) {
cerr << "\n " << exc << endl;
Expand Down
5 changes: 3 additions & 2 deletions src/async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,9 @@ void async_client::start_consuming()
nullptr
);

if (rc != MQTTASYNC_SUCCESS)
throw exception(rc);
check_ret(rc);
check_ret(::MQTTAsync_setConnected(cli_, this, &async_client::on_connected));
check_ret(::MQTTAsync_setDisconnected(cli_, this, &async_client::on_disconnected));
}

void async_client::stop_consuming()
Expand Down

0 comments on commit 3ef1292

Please sign in to comment.