-
Notifications
You must be signed in to change notification settings - Fork 110
Add support for shared subscriptions to test_broker.hpp #672
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
It seems that there are errors on topic_alias_recv test after your modification. In order to check behavior, I changed severity leve. https://github.com/redboltz/mqtt_cpp/blob/master/test/global_fixture.hpp#L15 You can do that replace warning with trace and recompile. I got the following result on the PR. ./topic_alias_recv --log_level=all --run_test="test_topic_alias_recv/resend_publish"
Running 1 test case...
Entering test module "mqtt_client_cpp_test"
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(15): Entering test suite "test_topic_alias_recv"
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(19): Test case "test_topic_alias_recv/pubsub" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(171): Test case "test_topic_alias_recv/overwrite" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(370): Test case "test_topic_alias_recv/no_entry" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(491): Entering test case "resend_publish"
12:26:07.296950 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:174 ADDR[0x561ec0e13460] create version:v3_1_1 async_send_store:false
12:26:07.298780 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:174 ADDR[0x561ec0e13460] create version:v5 async_send_store:false
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(675): info: check chk("start") has passed
12:26:07.299675 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561ec0e13460] connect client_id:cid1 user_name:none keep_alive:0
12:26:07.299719 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7f1b74001a70] create version:undetermined async_send_store:false
12:26:07.299851 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7f1b74001a70] start_session
12:26:07.300178 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7f1b74001a70] connack session_present:false reason:success
12:26:07.300407 TID[0] SEV[info ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:61 ADDR[0x561ec0e13710] clear_topic_alias
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(534): info: check chk("h_connack1") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(535): info: check sp == false has passed
12:26:07.300594 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:973 ADDR[0x561ec0e13460] disconnect reason:normal_disconnection
12:26:07.300910 TID[0] SEV[info ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:61 ADDR[0x561ec0e13710] clear_topic_alias
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(645): info: check chk("h_close1") has passed
12:26:07.301215 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561ec0e13460] connect client_id:cid1 user_name:none keep_alive:0
12:26:07.301259 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7f1b74001a70] create version:undetermined async_send_store:false
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(654): info: check ret has passed
12:26:07.301360 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7f1b74001a70] start_session
12:26:07.301596 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7f1b74001a70] connack session_present:false reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(540): info: check chk("h_connack2") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(541): info: check sp == false has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(542): info: check connack_return_code == mqtt::v5::connect_reason_code::success has passed
12:26:07.301849 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1313 ADDR[0x561ec0e13460] subscribe pid:1 topic:topic1 qos:at_least_once rh:send nl:no rap:dont
12:26:07.302160 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1945 ADDR[0x7f1b74001a70] suback pid:1
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(576): info: check chk("h_suback") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(577): info: check reasons.size() == 1U has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(578): info: check reasons[0] == mqtt::v5::suback_reason_code::granted_qos_1 has passed
12:26:07.302467 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1030 ADDR[0x561ec0e13460] publish pid:0 topic:topic1 qos:at_most_once retain:no dup:no
12:26:07.302598 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1030 ADDR[0x561ec0e13460] publish pid:2 topic: qos:at_least_once retain:no dup:no
12:26:07.302730 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:990 ADDR[0x561ec0e13460] force_disconnect
12:26:07.302764 TID[1] SEV[info ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:30 ADDR[0x7f1b74001d20] register_topic_alias topic:topic1 alias:1
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(659): info: check chk("h_error") has passed
12:26:07.302873 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7f1b74001a70] publish pid:0 topic:topic1 qos:at_most_once retain:no dup:no
12:26:07.303123 TID[1] SEV[info ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:51 ADDR[0x7f1b74001d20] find_topic_by_alias alias:1 topic:topic1
12:26:07.303205 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7f1b74001a70] publish pid:1 topic:topic1 qos:at_least_once retain:no dup:no
12:26:07.303306 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1802 ADDR[0x7f1b74001a70] puback pid:2 reason:success
12:26:07.403977 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561ec0e13460] connect client_id:cid1 user_name:none keep_alive:0
12:26:07.404057 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7f1b74001a70] create version:undetermined async_send_store:false
12:26:07.404528 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7f1b74001a70] start_session
12:26:07.405576 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7f1b74001a70] connack session_present:true reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(547): info: check chk("h_connack3") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(548): info: check sp == true has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(549): info: check connack_return_code == mqtt::v5::connect_reason_code::success has passed
12:26:07.407170 TID[1] SEV[info ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:51 ADDR[0x7f1b74001d20] find_topic_by_alias alias:1 topic:topic1
12:26:07.407615 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7f1b74001a70] publish pid:1 topic:topic1 qos:at_least_once retain:no dup:no
12:26:07.408155 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7f1b74001a70] publish pid:2 topic:topic1 qos:at_least_once retain:no dup:no
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(624): info: check chk("h_publish") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(625): info: check pubopts.get_dup() == mqtt::dup::no has passed
12:26:07.408633 TID[1] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1802 ADDR[0x7f1b74001a70] puback pid:2 reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(626): info: check pubopts.get_qos() == mqtt::qos::at_least_once has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(627): info: check pubopts.get_retain() == mqtt::retain::no has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(628): info: check topic == "topic1" has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(629): info: check contents == "topic1_contents_2" has passed
12:26:07.409231 TID[0] SEV[info ] CHANNEL[mqtt_api] endpoint.hpp:1452 ADDR[0x561ec0e13460] unsubscribe pid:3 topic:topic1
/home/kondo/work/mqtt_cpp/test/checker.hpp(202): error: in "test_topic_alias_recv/resend_publish": h_publish has already been passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(624): error: in "test_topic_alias_recv/resend_publish": check chk("h_publish") has failed The log indicates that publish API called twice. It is unexpected behavior. Here is master branch result:
Publish API is called once. I think that it is a hint to fix the PR 's bug. |
BTW, I added wildcard test #674. Fixing the existing test errors on the PR would be a great help to understand the code because I don't need to care about the test errors. |
Unfortunately, I no longer work for the company that was paying me to improve mqtt_cpp, so I won't be able to spend much, if any, time on this project. At the moment, the force-push that I just did is my last planned contribution to mqtt_cpp. I'm happy to answer questions, and it's possible I might make new pull requests in the future, but I'm not currently planning to write new code for the project. Hopefully this PR is clear enough that you're able to understand where the problem is. |
Ok, I understand. I will take over debugging when I have a time. |
test/test_broker.hpp
Outdated
|
||
// This initializes the round-robin iterator storage for this shared subscription, if and only if, it is not already. | ||
// Notably, we do not check the return of the emplace call, since if the item already existed, we don't want a new one made. | ||
shared_subs_.emplace(item.share, item.topic, subs_.get<tag_topic>().find(std::make_tuple(item.share, item.topic))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I set a breakpoint here, then stopped here on failure test case. The test case doesn't use shared subscriptions.
(gdb) p item
$10 = (const test_broker::session_subscription &) @0x7ffff0003870: {
client_id = {
<std::basic_string_view<char, std::char_traits<char> >> = "cid1",
members of mqtt::buffer:
lifetime_ = count 2, weak count 1 = {
value = 0 '\000'
}
},
share = {
<std::basic_string_view<char, std::char_traits<char> >> = "",
members of mqtt::buffer:
lifetime_ = uninitialized
},
topic = {
<std::basic_string_view<char, std::char_traits<char> >> = "topic1",
members of mqtt::buffer:
lifetime_ = count 2, weak count 1 = {
value = 0 '\000'
}
},
messages = std::vector of length 0, capacity 0,
qos_value = mqtt::qos::at_least_once,
rap_value = mqtt::rap::dont
}
I will continue debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe problem is that this code block needs to be inside the if(!item.share.empty()) check.
shared_subs_ should have no items with empty share.
Quote (https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250): A Shared Subscription's Topic Filter MUST start with $share/ and MUST contain a ShareName that is at least one character long
Because this is being initialized, it causes this code, in the do_publish() function (https://github.com/redboltz/mqtt_cpp/pull/672/files/3663a61452f2a3aaad022a23a41f6267a1ef4759#diff-45ca9626e6c7c611be2ceb2ff63031a5R1232) to send duplicate messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed a change that may fix the issue.
Codecov Report
@@ Coverage Diff @@
## master #672 +/- ##
=========================================
Coverage ? 82.69%
=========================================
Files ? 46
Lines ? 7077
Branches ? 0
=========================================
Hits ? 5852
Misses ? 1225
Partials ? 0 |
@@ -900,6 +1096,47 @@ class test_broker { | |||
} | |||
} | |||
} | |||
|
|||
// Re-activate any saved "shared-subscriptions" that match the newly subscribed topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what re-activate mean here.
Here is inside of the function subscribe_handler(). If re-activation means moving shared subscription information from offline to online, then the process should be done at the function connect_handler().
It seems that you already done it at the connect_handler().
So, I guess that the process here should newly add or overwrite existing online shared subscription, and no process required for offline shared subscriptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are shared subscriptions for sessions that are disconnected, but not removed (session lifetime timer) then when a new shared subscription is subscribed then the messages for the saved shared subscription need to be sent to the new subscriber.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is not for newly connected sessions. Its for existing sessions adding a new shared subscription to a topic that has shared subscriptions for sessions that are disconnected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I don't understand yet.
Could you write example sequence ?
- Client c1 connected to broker with Session Expiry Interval: infinity.
- Client c1 subscribed t1 with share1 (QoS0)
- Client c1 disconnected.
- Client c1 connected to broker with Session Expiry Interval: infinity and Clean Start: false.
- I said that the stored (offline) shared subscription information should be moved to online here. And if stored publish message that is corresponding to the shared subscriptions, send it to c1 here.
- Client c1 subscribed t1 with share1 (QoS1)
- Simply updated QoS0 to QoS1 on c1 t1 share1 subscription. If new QoS is QoS0 then do nothing.
I understand the spec as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that you misunderstand shared subscriptions spec.
If the algorithm is simple round robin, then the following behavior is expected.
Simple round robin means online session and offline session have the same priority.
Implementer can use online prioritized policy but it is different topic.
- Client c1 connected to broker with Session Expiry Interval: infinity.
- Client c2 connected to broker with Session Expiry Interval: infinity.
- Client c1 subscribed t1 with share1 (QoS0)
- Client c2 subscribed t1 with share1 (QoS0)
- Client c1 disconnected.
- Client c3 publish t1, then the message (a) is delivered to c1 (online session).
- Client c3 publish t1, then the message (b) is delivered to c2 (offline session).
- Client c3 publish t1, then the message (c) is delivered to c1 (online session).
- Client c3 publish t1, then the message (d) is delivered to c2 (offline session).
- Client c1 connected to broker with Session Expiry Interval: infinity and Clean Start: false.
- The broker restore c1's shared subscription (it is similar to non shared subscriptions), and the broker deliver the message (a) and (c) to c1.
- Client c1 subscribed t1 with share1 (QoS0), the broker do nothing.
It is very similar to non shared subscriptions behavior. But deliver to only one of client that have the same share name instead of all clients.
So, restore shared subscriptions and deliver stored message process should be step 10 not step 12
|
||
// This initializes the round-robin iterator storage for this shared subscription, if and only if, it is not already. | ||
// Notably, we do not check the return of the emplace call, since if the item already existed, we don't want a new one made. | ||
shared_subs_.emplace(share, topic, subs_.get<tag_topic>().find(std::make_tuple(share, topic))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a related comment of https://github.com/redboltz/mqtt_cpp/pull/672/files#r502181653
The emplace only works if saved_shared_subs_ exists. There is no chance to emplace newly shared subscribed one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subs_.get<tag_topic>().find(std::make_tuple(share....))
Will find the subscription for this share-group and topic from the for loop earlier in the function on line 1068
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand yet.
- Client c1 connected to broker with Session Expiry Interval: infinity.
- Client c1 subscribed t1 with share1 (QoS0)
In this timing, emplacing some information to shared_subs_ is not needed ???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In do_publish(), shared_subs_ is referred.
https://github.com/jonesmz/mqtt_cpp/blob/shared-subscription/test/test_broker.hpp#L1243
If the first shared subscribe dosen't add shared_subs_, then the publish message is not delivered.
It is very difficult for me to continue discussion on this PR. I added temporary fix for #672 (comment) |
It seems that #672 contains not only shared subscription support (incomplete) but also wildcard bug fix. This is the part of wildcard bug fix.
Closing this PR because it is superseded by other recent work. |
This PR includes #671, Github should automatically figure things out if #671 is merged first.
This PR adds support for shared subscriptions to the mqtt broker.
The standard specifies some tricky behavior, so I'm sure I got some details wrong.
For this implementation, i've chosen a "round-robin" decision strategy for which client in a specific shared subscription group is to receive a message. If the list of clients remains the same, then each client will receive a message in turn. If new clients are added, they are added to the order in whatever way boost-multi-index places them in the index, and if they are removed, then the iterator is adjusted to the next iterator that is not part of the just-removed client.
Please provide implementation feedback, both from an architectural perspective, and an implementation perspective.