Skip to content

Use virtual functions for notifications from mqtt_cpp::endpoint to other code, instead of multiple std::functions #444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 24, 2019

Conversation

jonesmz
Copy link
Contributor

@jonesmz jonesmz commented Sep 18, 2019

An optional template wrapper, callable_overlay, is provided that provides a collection of std::functions that can
be set to register dynamically changable callbacks for each of the virtual member function calls that can happen.

@codecov-io
Copy link

codecov-io commented Sep 18, 2019

Codecov Report

Merging #444 into master will increase coverage by 0.03%.
The diff coverage is 88.63%.

@@            Coverage Diff             @@
##           master     #444      +/-   ##
==========================================
+ Coverage   84.94%   84.98%   +0.03%     
==========================================
  Files          40       41       +1     
  Lines        6342     6392      +50     
==========================================
+ Hits         5387     5432      +45     
- Misses        955      960       +5

@jonesmz
Copy link
Contributor Author

jonesmz commented Sep 18, 2019

@ropieur

Not sure I hold all cards in hand to debate, but a reason also for passing the handler as an argument to async calls is to give the compiler the opportunity to "inject" a lambda directly in the calling site and would give it the opportunity for more aggressive optimization. But this requires to avoid type erasure (e.g not to push the lambda inside a std::function).

Could you elaborate on this please?

I agree with @jonesmz on the fact that a std::function has a heavy cost. OTOH, I am not in favour of forcing a derivation from a "virtual interface" (java like approach).
Maybe, you could require a "concept" (with a class requiring a set of methods responding to the same requirements as virtual interface without the virtual aspect) that could be injected at once (so no need for std::function and no hard relation ...).

So static polymorphism, where the "policy" of what to do for the various callbacks is provided by a single object that has a set of functions that conform to the parameters that the current handlers take?

Something like this?

struct policy
{
    bool on_connack(...);
    bool on_publish(...);
    bool on_subscribe(...);
};

template<typename Policy>
struct endpoint
{
    endpoint(Policy policy) : policy_(std::move(policy)) { }

    void handle_incoming_packet(...)
    {
        switch(packet_type)
        {
            case connack: policy_.on_conack(); break;
            case publish: policy_.on_publish(); break;
            case subscribe: policy_.on_subscribe(); break;
        }
    }
};

I'm not opposed to that, since it's still better than std::function, but I can't think of a way to support the current mqtt_cpp API using static polymorphism. The virtual function way of doing things can do both static functions, and also std::function, and as long as you teach GCC that the overrides for the virtual functions should be always_inline, you end up with the same performance.

With static polymorphism, the handlers would need to be injected into the policy object, not the endpoint itself.

@ropieur
Copy link

ropieur commented Sep 18, 2019

@ropieur

Not sure I hold all cards in hand to debate, but a reason also for passing the handler as an argument to async calls is to give the compiler the opportunity to "inject" a lambda directly in the calling site and would give it the opportunity for more aggressive optimization. But this requires to avoid type erasure (e.g not to push the lambda inside a std::function).

Could you elaborate on this please?

As a metaphor, that would be a bit like calling a standard algorithm with a lambda, where the lambda type is used for instantiating the template code. But I just discovered that an internal queue was used to keep the handlers, which requires in the end type erasure.

I agree with @jonesmz on the fact that a std::function has a heavy cost. OTOH, I am not in favour of forcing a derivation from a "virtual interface" (java like approach).
Maybe, you could require a "concept" (with a class requiring a set of methods responding to the same requirements as virtual interface without the virtual aspect) that could be injected at once (so no need for std::function and no hard relation ...).

So static polymorphism, where the "policy" of what to do for the various callbacks is provided by a single object that has a set of functions that conform to the parameters that the current handlers take?

Something like this?

Yes indeed

struct policy
{
    bool on_connack(...);
    bool on_publish(...);
    bool on_subscribe(...);
};

template<typename Policy>
struct endpoint
{
    endpoint(Policy policy) : policy_(std::move(policy)) { }

    void handle_incoming_packet(...)
    {
        switch(packet_type)
        {
            case connack: policy_.on_conack(); break;
            case publish: policy_.on_publish(); break;
            case subscribe: policy_.on_subscribe(); break;
        }
    }
};

I'm not opposed to that, since it's still better than std::function, but I can't think of a way to support the current mqtt_cpp API using static polymorphism. The virtual function way of doing things can do both static functions, and also std::function, and as long as you teach GCC that the overrides for the virtual functions should be always_inline, you end up with the same performance.

A static polymorphism does not prevent to inject a dynamic one. At least with the static one, the devirtualization is not needed. With that approach, you leave the possible performance penalty to the caller.

With static polymorphism, the handlers would need to be injected into the policy object, not the endpoint itself.

I don't see the drawback.

@redboltz
Copy link
Owner

Just question. Class template endpoint doesn't have a virtual destructor even if it has (pure) virtual functions. I think it is good. Is it intentional?

I think the the virtual destructor isn't needed because endpoint inherits enable_shared_from_this. It indicates that the endpoint is always allocated vir shared_ptr and the shared_ptr has actual (leaf) type ptr.

I mean as follows:

On stack                          On heap
shared_ptr<endpoint> sp --------> shared_count
       |                                |
       |                                |This can delete actual type *1
       |                                V
       +------------------------> endpoint(or inherited type)

Thanks to *1, we don't need virtual destructor.

@jonesmz
Copy link
Contributor Author

jonesmz commented Sep 18, 2019

As a metaphor, that would be a bit like calling a standard algorithm with a lambda, where the lambda type is used for instantiating the template code. But I just discovered that an internal queue was used to keep the handlers, which requires in the end type erasure.

Right, the internal queue is necessary to support (among other things) arbitrary numbers of aync_publish() calls in a row, before returning control to boost::asio for processing of async operations.

A static polymorphism does not prevent to inject a dynamic one. At least with the static one, the devirtualization is not needed. With that approach, you leave the possible performance penalty to the caller.
I don't see the drawback.

The drawback is API breakage. That's all I meant.

@jonesmz
Copy link
Contributor Author

jonesmz commented Sep 18, 2019

Just question. Class template endpoint doesn't have a virtual destructor even if it has (pure) virtual functions. I think it is good. Is it intentional?

Actually, I forgot to think about it.

I prefer "protected, non virtual, = default" destructor for abstract base types. This ensures that only code that knows the exact type can delete.

I updated the code to make ~endpoint() = default; in the protected: section

@ropieur
Copy link

ropieur commented Sep 18, 2019

As a metaphor, that would be a bit like calling a standard algorithm with a lambda, where the lambda type is used for instantiating the template code. But I just discovered that an internal queue was used to keep the handlers, which requires in the end type erasure.

Right, the internal queue is necessary to support (among other things) arbitrary numbers of aync_publish() calls in a row, before returning control to boost::asio for processing of async operations.

Do you mean that I don't need to enqueue my "messages" at the application level and the mqtt client takes in charge to keep all publications in the right order? If yes, what is the strategy in case of failure? Communication interruption? Publication refused by the broker?

@jonesmz
Copy link
Contributor Author

jonesmz commented Sep 18, 2019

Do you mean that I don't need to enqueue my "messages" at the application level and the mqtt client takes in charge to keep all publications in the right order? If yes, what is the strategy in case of failure? Communication interruption? Publication refused by the broker?

So long as you're calling async_publish() on the same thread, mqtt_cpp::endpoint will deliver them in the order that you called async_publish().

If there is an unrecoverable local failure, mqtt_cpp::endpoint will call the error handler (If this PR is merged, that's the virtual on_error() function) and processing of MQTT stops. You'll need to re-connect to the broker in that case.

If there is a failure on the brokers end, then you'll either get disconnected, or you'll receive a puback packet with the error. You can save the mqtt_cpp::endpoint::packet_id_t that async_publish() returns, and then later compare the mqtt_cpp::endpoint::packet_id_t that the puback handler is provided as a parameter to figure out which publication failed on the brokers side of things.

@jonesmz
Copy link
Contributor Author

jonesmz commented Sep 18, 2019

Do note that I would like to support a higher level API than what even mqtt_cpp::async_client currently offers

See: #374

The TL;DR; is that I want to be able to say:

pClient->async_publish(msgdatahere, { /* When sent over network / }, { / when reply received */ });

That will involve an internal std::map<packet_id_t, callback_t> that mqtt_cpp::client uses to figure out which callback belongs to the incoming response packet.

It should be very simple to add that to mqtt::client as an optional feature as a pull request, if you were so-inclined.

…her code, instead of multiple std::functions

An optional template wrapper, callable_overlay, is provided that provides a collection of std::functions that can
be set to register dynamically changable callbacks for each of the virtual member function calls that can happen.
@ropieur
Copy link

ropieur commented Sep 18, 2019

Do you mean that I don't need to enqueue my "messages" at the application level and the mqtt client takes in charge to keep all publications in the right order? If yes, what is the strategy in case of failure? Communication interruption? Publication refused by the broker?

So long as you're calling async_publish on the same thread, mqtt_cpp::endpoint will deliver them in the order that you called async_publish.

If there is an unrecoverable local failure, mqtt_cpp::endpoint will call the error handler (If this PR is merged, that's the virtual on_error() function) and processing of MQTT stops. You'll need to re-connect to the broker in that case.

What about publications still in the queue? Will they be resent after the connection succeeds? Are they purged?
To me, providing such a level of servicing is a lot of responsibilities at once. This design imposes the strategy for queuing (BTW, is there a limit in the queue?) and error management (continue publishing vs purging etc).

@redboltz
Copy link
Owner

redboltz commented Sep 18, 2019

What about publications still in the queue? Will they be resent after the connection succeeds? Are they purged?

Let me explain current implementation. It is not only master but also #444 (this PR).
The following is achieved without queue mechanism and already implemented.

Session Enable*1 QoS Situation Behavior
no all publish is called but not published yet purged on close
yes 0 publish is called but not published yet purged on close
yes 1 publish is called but not puback received yet resend publish on reconnect after connack is received
yes 2 publish is called but not pubrec received yet resend publish on reconnect after connack is received
yes 2 pubrel(including automatic one) is called but not pubcomp received yet resend pubrel on reconnect after connack is received

*1 Session Enable means

MQTT version Enable
v3.1.1 Connect with Cleas Session is false
v5 Connect with Clean Start is false

@ropieur
Copy link

ropieur commented Sep 18, 2019

What about publications still in the queue? Will they be resent after the connection succeeds? Are they purged?

Let me explain current implementation. It is not only master but also #444 (this PR).
The following is achieved without queue mechanism and already implemented.
Session Enable*1 QoS Situation Behavior
no all publish is called but not published yet purged on close
yes 0 publish is called but not published yet purged on close
yes 1 publish is called but not puback received yet resend publish on reconnect after connack is received
yes 2 publish is called but not pubrec received yet resend publish on reconnect after connack is received
yes 2 pubrel(including automatic one) is called but not pubcomp received yet resend pubrel on reconnect after connack is received

*1 Session Enable means
MQTT version Enable
v3.1.1 Connect with Cleas Session is false
v5 Connect with Clean Start is false

@redboltz,

I made some tests with the library, by publishing a sequence of thousand messages with qos::at_least_once (1).
I use mosquitto as a broker and mosquitto_sub as a subscriber to all messages (#).
In order to manage a disconnection between the client and the broker, I specifically sleep for 500ms in the handler of succeeded async_publish in order to allow me to interrupt the broker.
The test client detects disconnection thanks to the error_handler and restart connection retries every 2 seconds until it succeeds.
During the disconnection, the handler of all remaining messages are invoked with an error. I would expect a full suspend of the operations here.
I confirm that messages with qos=1 are published once the communication with the broker is re-established, but now without invoking any of the associated handler.
Can you confirm that this is the expected behaviour? In such a case I don't see the goal of such an handler because it will be difficult to attach a regular post processing to it.

@redboltz
Copy link
Owner

@ropieur, could you elaborate your question?

Please write a detail sequence as follows

  1. mqtt_cpp client connect to the broker with CleanSession false MQTT v3.1.1.
  2. mqtt_cpp received connack (you can know this connack handler is called)
  3. mqtt_cpp publish QoS1 message *1.
  4. mosquitto_sub received *1 message
  5. mqtt_cpp received puback message (you can know this suback handler is called).
  6. mqtt_cpp disconnect by force_disconnect().
  7. 500ms wait.
  8. mqtt_cpp client connect to the broker with CleanSession false MQTT v3.1.1.
  9. mqtt_cpp received connack (you can know this connack handler is called)
  10. *1 message is NOT resent.

If you sequence as above, step 10 is expected behavior.
The key point is mqtt_cpp receives puback or not. Even if you disconnect in async_publish handler, puback could have already been received.
If you make sure disconnecting before puback is received, you can call force_disconnect() API just after you call async_publish(), not in the completion handler.

BTW, disconnect() and async_disconnect() API just send DISCONNECT packet to the broker. When the broker receives DISCONNECT packet, then the broker close the session from the broker side. That means there is a room to process in-flight message including *1.

Here is resend test. https://github.com/redboltz/mqtt_cpp/blob/master/test/resend.cpp
It is implemented by sync_client but no essential difference.

@ropieur
Copy link

ropieur commented Sep 19, 2019

@redboltz,

Here follows my test code. Not sure I wrote something wrong in there.
I could perform unit test in a controlled fashion, but the intent of this test is to verify the behaviour of my implementation with an external broker and see how it recovers.

#include <iostream>
#include <vector>
#include <thread>
#include <memory>

#include <boost/asio.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/format.hpp>

#include <mqtt_client_cpp.hpp>
const std::string address = "localhost";

int main(int argc, char* argv[])
{
    try {
        boost::ignore_unused(argc, argv);

        boost::asio::io_context ioc;
        boost::asio::steady_timer timer(ioc);

        auto client = mqtt::make_async_client(ioc, "localhost", 1883);

        std::function<void()> publish = [client]{
            std::cout << "Publishing 1000 events" << std::endl;
            for (std::size_t i = 0; i < 1000;  ++i) {
                auto topic = str(boost::format("topic%1%") % i);
                auto payload = str(boost::format("payload%1%") % i);
                client->async_publish(topic, payload, mqtt::qos::at_least_once, false,
                    [client](const auto &error){
                        if (error) {
                            std::cout << "Error in publish handler: message=" << error.message() << std::endl;
                            return;
                        }
                        const std::chrono::milliseconds delay{500};
                        std::cout << "Waiting " << delay.count() << " ms..." << std::endl;
                        std::this_thread::sleep_for(delay);
                    }
                );
            }
        };

        auto reconnect = [&timer, client]{
            const std::chrono::milliseconds delay(2000);
            std::cout << "restart connection in " << delay.count() << "ms ..." << std::endl;
            timer.expires_after(delay);
            timer.async_wait([&](const auto &error){
                if (error) {
                    std::cout << "timer interrupted" << std::endl;
                    return;
                }
                client->connect();
            });
        };

        client->set_client_id("mqtt_cpp_test");
        client->set_error_handler([&](const auto &error){
            if (error) {
                std::cout << "error handler " << error.message() << std::endl;
                if (!client->connected()) {
                    reconnect();
                }
            }
        });

        client->set_connack_handler([&](bool session_present, mqtt::connect_return_code return_code)
        {
            boost::ignore_unused(session_present, return_code);
            std::cout << "connack_handler called!!" << std::endl;
            if (return_code == mqtt::connect_return_code::accepted) {
                std::cout << "fully connected" << std::endl;
                if (publish) {
                    publish();
                    publish = nullptr;
                }
            }
            else {
                if (!client->connected()) {
                    client->async_disconnect([&] (boost::system::error_code const& ) {
                        if (client->connected()) {
                            client->force_disconnect();
                        }
                        reconnect();
                    });
                }
                else {
                    reconnect();
                }
            }
            return true;
        });
        client->connect();
        ioc.run();

        std::cout << "terminated" << std::endl;
        return 0;
    }
    catch (const std::exception &ex) {
        std::cerr << ex.what() << std::endl;
    }
    catch (...) {
        std::cerr << "unexpected exception" << std::endl;
    }
    return 1;
}

@redboltz
Copy link
Owner

redboltz commented Sep 19, 2019

@ropieur , please explain what you want to do in detail. As I explained sequence.
Unfortunately, I don't have much time to understand your intention from your posted code.
I'm busy on #444 test on my broker. Or please wait a week, I might have a time.

@redboltz
Copy link
Owner

I integrated #444 to my broker and I have done several performance tests on our environment. It works nicely. I merge the PR. Thank you for waiting.

@redboltz redboltz merged commit 6ec6e9d into redboltz:master Sep 24, 2019
@redboltz
Copy link
Owner

@ropieur , could you create a new issue for
#444 (comment) ?

@jonesmz jonesmz deleted the virtual-functions branch September 27, 2019 19:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants