Skip to content

Add support for shared subscriptions to test_broker.hpp #672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

Add support for shared subscriptions to test_broker.hpp #672

wants to merge 1 commit into from

Conversation

jonesmz
Copy link
Contributor

@jonesmz jonesmz commented Oct 2, 2020

This PR includes #671, Github should automatically figure things out if #671 is merged first.

This PR adds support for shared subscriptions to the mqtt broker.

The standard specifies some tricky behavior, so I'm sure I got some details wrong.

For this implementation, i've chosen a "round-robin" decision strategy for which client in a specific shared subscription group is to receive a message. If the list of clients remains the same, then each client will receive a message in turn. If new clients are added, they are added to the order in whatever way boost-multi-index places them in the index, and if they are removed, then the iterator is adjusted to the next iterator that is not part of the just-removed client.

Please provide implementation feedback, both from an architectural perspective, and an implementation perspective.

@redboltz
Copy link
Owner

redboltz commented Oct 6, 2020

It seems that there are errors on topic_alias_recv test after your modification.

In order to check behavior, I changed severity leve.

https://github.com/redboltz/mqtt_cpp/blob/master/test/global_fixture.hpp#L15

You can do that replace warning with trace and recompile.

I got the following result on the PR.

./topic_alias_recv --log_level=all --run_test="test_topic_alias_recv/resend_publish"

Running 1 test case...
Entering test module "mqtt_client_cpp_test"
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(15): Entering test suite "test_topic_alias_recv"
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(19): Test case "test_topic_alias_recv/pubsub" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(171): Test case "test_topic_alias_recv/overwrite" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(370): Test case "test_topic_alias_recv/no_entry" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(491): Entering test case "resend_publish"
12:26:07.296950 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:174 ADDR[0x561ec0e13460] create version:v3_1_1 async_send_store:false
12:26:07.298780 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:174 ADDR[0x561ec0e13460] create version:v5 async_send_store:false
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(675): info: check chk("start") has passed
12:26:07.299675 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561ec0e13460] connect client_id:cid1 user_name:none keep_alive:0
12:26:07.299719 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7f1b74001a70] create version:undetermined async_send_store:false
12:26:07.299851 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7f1b74001a70] start_session
12:26:07.300178 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7f1b74001a70] connack session_present:false reason:success
12:26:07.300407 TID[0] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:61 ADDR[0x561ec0e13710] clear_topic_alias
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(534): info: check chk("h_connack1") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(535): info: check sp == false has passed
12:26:07.300594 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:973 ADDR[0x561ec0e13460] disconnect reason:normal_disconnection
12:26:07.300910 TID[0] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:61 ADDR[0x561ec0e13710] clear_topic_alias
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(645): info: check chk("h_close1") has passed
12:26:07.301215 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561ec0e13460] connect client_id:cid1 user_name:none keep_alive:0
12:26:07.301259 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7f1b74001a70] create version:undetermined async_send_store:false
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(654): info: check ret has passed
12:26:07.301360 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7f1b74001a70] start_session
12:26:07.301596 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7f1b74001a70] connack session_present:false reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(540): info: check chk("h_connack2") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(541): info: check sp == false has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(542): info: check connack_return_code == mqtt::v5::connect_reason_code::success has passed
12:26:07.301849 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1313 ADDR[0x561ec0e13460] subscribe pid:1 topic:topic1 qos:at_least_once rh:send nl:no rap:dont
12:26:07.302160 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1945 ADDR[0x7f1b74001a70] suback pid:1
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(576): info: check chk("h_suback") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(577): info: check reasons.size() == 1U has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(578): info: check reasons[0] == mqtt::v5::suback_reason_code::granted_qos_1 has passed
12:26:07.302467 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1030 ADDR[0x561ec0e13460] publish pid:0 topic:topic1 qos:at_most_once retain:no dup:no
12:26:07.302598 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1030 ADDR[0x561ec0e13460] publish pid:2 topic: qos:at_least_once retain:no dup:no
12:26:07.302730 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:990 ADDR[0x561ec0e13460] force_disconnect
12:26:07.302764 TID[1] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:30 ADDR[0x7f1b74001d20] register_topic_alias topic:topic1 alias:1
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(659): info: check chk("h_error") has passed
12:26:07.302873 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7f1b74001a70] publish pid:0 topic:topic1 qos:at_most_once retain:no dup:no
12:26:07.303123 TID[1] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:51 ADDR[0x7f1b74001d20] find_topic_by_alias alias:1 topic:topic1
12:26:07.303205 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7f1b74001a70] publish pid:1 topic:topic1 qos:at_least_once retain:no dup:no
12:26:07.303306 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1802 ADDR[0x7f1b74001a70] puback pid:2 reason:success
12:26:07.403977 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561ec0e13460] connect client_id:cid1 user_name:none keep_alive:0
12:26:07.404057 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7f1b74001a70] create version:undetermined async_send_store:false
12:26:07.404528 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7f1b74001a70] start_session
12:26:07.405576 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7f1b74001a70] connack session_present:true reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(547): info: check chk("h_connack3") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(548): info: check sp == true has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(549): info: check connack_return_code == mqtt::v5::connect_reason_code::success has passed
12:26:07.407170 TID[1] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:51 ADDR[0x7f1b74001d20] find_topic_by_alias alias:1 topic:topic1


12:26:07.407615 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7f1b74001a70] publish pid:1 topic:topic1 qos:at_least_once retain:no dup:no
12:26:07.408155 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7f1b74001a70] publish pid:2 topic:topic1 qos:at_least_once retain:no dup:no


/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(624): info: check chk("h_publish") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(625): info: check pubopts.get_dup() == mqtt::dup::no has passed
12:26:07.408633 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1802 ADDR[0x7f1b74001a70] puback pid:2 reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(626): info: check pubopts.get_qos() == mqtt::qos::at_least_once has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(627): info: check pubopts.get_retain() == mqtt::retain::no has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(628): info: check topic == "topic1" has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(629): info: check contents == "topic1_contents_2" has passed
12:26:07.409231 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1452 ADDR[0x561ec0e13460] unsubscribe pid:3 topic:topic1
/home/kondo/work/mqtt_cpp/test/checker.hpp(202): error: in "test_topic_alias_recv/resend_publish": h_publish has already been passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(624): error: in "test_topic_alias_recv/resend_publish": check chk("h_publish") has failed

The log indicates that publish API called twice. It is unexpected behavior.

Here is master branch result:

Running 1 test case...
Entering test module "mqtt_client_cpp_test"
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(15): Entering test suite "test_topic_alias_recv"
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(19): Test case "test_topic_alias_recv/pubsub" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(171): Test case "test_topic_alias_recv/overwrite" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(370): Test case "test_topic_alias_recv/no_entry" is skipped because disabled
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(491): Entering test case "resend_publish"
12:30:40.775988 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:174 ADDR[0x561e3f4ec460] create version:v3_1_1 async_send_store:false
12:30:40.777846 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:174 ADDR[0x561e3f4ec460] create version:v5 async_send_store:false
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(675): info: check chk("start") has passed
12:30:40.778701 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561e3f4ec460] connect client_id:cid1 user_name:none keep_alive:0
12:30:40.778738 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7fe544001a70] create version:undetermined async_send_store:false
12:30:40.778895 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7fe544001a70] start_session
12:30:40.779213 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7fe544001a70] connack session_present:false reason:success
12:30:40.779436 TID[0] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:61 ADDR[0x561e3f4ec710] clear_topic_alias
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(534): info: check chk("h_connack1") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(535): info: check sp == false has passed
12:30:40.779614 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:973 ADDR[0x561e3f4ec460] disconnect reason:normal_disconnection
12:30:40.779919 TID[0] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:61 ADDR[0x561e3f4ec710] clear_topic_alias
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(645): info: check chk("h_close1") has passed
12:30:40.780196 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561e3f4ec460] connect client_id:cid1 user_name:none keep_alive:0
12:30:40.780238 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7fe544001a70] create version:undetermined async_send_store:false
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(654): info: check ret has passed
12:30:40.780347 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7fe544001a70] start_session
12:30:40.780571 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7fe544001a70] connack session_present:false reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(540): info: check chk("h_connack2") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(541): info: check sp == false has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(542): info: check connack_return_code == mqtt::v5::connect_reason_code::success has passed
12:30:40.780850 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1313 ADDR[0x561e3f4ec460] subscribe pid:1 topic:topic1 qos:at_least_once rh:send nl:no rap:dont
12:30:40.781154 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1945 ADDR[0x7fe544001a70] suback pid:1
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(576): info: check chk("h_suback") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(577): info: check reasons.size() == 1U has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(578): info: check reasons[0] == mqtt::v5::suback_reason_code::granted_qos_1 has passed
12:30:40.781462 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1030 ADDR[0x561e3f4ec460] publish pid:0 topic:topic1 qos:at_most_once retain:no dup:no
12:30:40.781598 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1030 ADDR[0x561e3f4ec460] publish pid:2 topic: qos:at_least_once retain:no dup:no
12:30:40.781726 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:990 ADDR[0x561e3f4ec460] force_disconnect
12:30:40.781745 TID[1] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:30 ADDR[0x7fe544001d20] register_topic_alias topic:topic1 alias:1
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(659): info: check chk("h_error") has passed
12:30:40.781848 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7fe544001a70] publish pid:0 topic:topic1 qos:at_most_once retain:no dup:no
12:30:40.782078 TID[1] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:51 ADDR[0x7fe544001d20] find_topic_by_alias alias:1 topic:topic1
12:30:40.782155 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7fe544001a70] publish pid:1 topic:topic1 qos:at_least_once retain:no dup:no
12:30:40.782250 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1802 ADDR[0x7fe544001a70] puback pid:2 reason:success
12:30:40.883022 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1743 ADDR[0x561e3f4ec460] connect client_id:cid1 user_name:none keep_alive:0
12:30:40.883114 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:193 ADDR[0x7fe544001a70] create version:undetermined async_send_store:false
12:30:40.883552 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:863 ADDR[0x7fe544001a70] start_session
12:30:40.884640 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1776 ADDR[0x7fe544001a70] connack session_present:true reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(547): info: check chk("h_connack3") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(548): info: check sp == true has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(549): info: check connack_return_code == mqtt::v5::connect_reason_code::success has passed
12:30:40.886286 TID[1] SEV[info   ] CHANNEL[mqtt_impl] topic_alias_recv.hpp:51 ADDR[0x7fe544001d20] find_topic_by_alias alias:1 topic:topic1

12:30:40.886738 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1262 ADDR[0x7fe544001a70] publish pid:1 topic:topic1 qos:at_least_once retain:no dup:no

12:30:40.887274 TID[1] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1802 ADDR[0x7fe544001a70] puback pid:2 reason:success
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(624): info: check chk("h_publish") has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(625): info: check pubopts.get_dup() == mqtt::dup::no has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(626): info: check pubopts.get_qos() == mqtt::qos::at_least_once has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(627): info: check pubopts.get_retain() == mqtt::retain::no has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(628): info: check topic == "topic1" has passed
/home/kondo/work/mqtt_cpp/test/topic_alias_recv.cpp(629): info: check contents == "topic1_contents_2" has passed
12:30:40.888321 TID[0] SEV[info   ] CHANNEL[mqtt_api] endpoint.hpp:1452 ADDR[0x561e3f4ec460] unsubscribe pid:3 topic:topic1

Publish API is called once.

I think that it is a hint to fix the PR 's bug.

@redboltz
Copy link
Owner

redboltz commented Oct 6, 2020

BTW, I added wildcard test #674.
Please add test for shared subscriptions after fix the test errors.

Fixing the existing test errors on the PR would be a great help to understand the code because I don't need to care about the test errors.

@jonesmz
Copy link
Contributor Author

jonesmz commented Oct 7, 2020

@redboltz

Unfortunately, I no longer work for the company that was paying me to improve mqtt_cpp, so I won't be able to spend much, if any, time on this project.

At the moment, the force-push that I just did is my last planned contribution to mqtt_cpp.

I'm happy to answer questions, and it's possible I might make new pull requests in the future, but I'm not currently planning to write new code for the project.

Hopefully this PR is clear enough that you're able to understand where the problem is.

@redboltz
Copy link
Owner

redboltz commented Oct 7, 2020

Ok, I understand. I will take over debugging when I have a time.


// This initializes the round-robin iterator storage for this shared subscription, if and only if, it is not already.
// Notably, we do not check the return of the emplace call, since if the item already existed, we don't want a new one made.
shared_subs_.emplace(item.share, item.topic, subs_.get<tag_topic>().find(std::make_tuple(item.share, item.topic)));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I set a breakpoint here, then stopped here on failure test case. The test case doesn't use shared subscriptions.

(gdb) p item
$10 = (const test_broker::session_subscription &) @0x7ffff0003870: {
  client_id = {
    <std::basic_string_view<char, std::char_traits<char> >> = "cid1",
    members of mqtt::buffer:
    lifetime_ = count 2, weak count 1 = {
      value = 0 '\000'
    }
  },
  share = {
    <std::basic_string_view<char, std::char_traits<char> >> = "",
    members of mqtt::buffer:
    lifetime_ = uninitialized
  },
  topic = {
    <std::basic_string_view<char, std::char_traits<char> >> = "topic1",
    members of mqtt::buffer:
    lifetime_ = count 2, weak count 1 = {
      value = 0 '\000'
    }
  },
  messages = std::vector of length 0, capacity 0,
  qos_value = mqtt::qos::at_least_once,
  rap_value = mqtt::rap::dont
}

I will continue debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe problem is that this code block needs to be inside the if(!item.share.empty()) check.

shared_subs_ should have no items with empty share.

Quote (https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250): A Shared Subscription's Topic Filter MUST start with $share/ and MUST contain a ShareName that is at least one character long

Because this is being initialized, it causes this code, in the do_publish() function (https://github.com/redboltz/mqtt_cpp/pull/672/files/3663a61452f2a3aaad022a23a41f6267a1ef4759#diff-45ca9626e6c7c611be2ceb2ff63031a5R1232) to send duplicate messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed a change that may fix the issue.

@codecov
Copy link

codecov bot commented Oct 7, 2020

Codecov Report

❗ No coverage uploaded for pull request base (master@ba94169). Click here to learn what that means.
The diff coverage is n/a.

@@            Coverage Diff            @@
##             master     #672   +/-   ##
=========================================
  Coverage          ?   82.69%           
=========================================
  Files             ?       46           
  Lines             ?     7077           
  Branches          ?        0           
=========================================
  Hits              ?     5852           
  Misses            ?     1225           
  Partials          ?        0           

@@ -900,6 +1096,47 @@ class test_broker {
}
}
}

// Re-activate any saved "shared-subscriptions" that match the newly subscribed topics.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what re-activate mean here.
Here is inside of the function subscribe_handler(). If re-activation means moving shared subscription information from offline to online, then the process should be done at the function connect_handler().
It seems that you already done it at the connect_handler().
So, I guess that the process here should newly add or overwrite existing online shared subscription, and no process required for offline shared subscriptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are shared subscriptions for sessions that are disconnected, but not removed (session lifetime timer) then when a new shared subscription is subscribed then the messages for the saved shared subscription need to be sent to the new subscriber.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is not for newly connected sessions. Its for existing sessions adding a new shared subscription to a topic that has shared subscriptions for sessions that are disconnected

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I don't understand yet.
Could you write example sequence ?

  1. Client c1 connected to broker with Session Expiry Interval: infinity.
  2. Client c1 subscribed t1 with share1 (QoS0)
  3. Client c1 disconnected.
  4. Client c1 connected to broker with Session Expiry Interval: infinity and Clean Start: false.
    • I said that the stored (offline) shared subscription information should be moved to online here. And if stored publish message that is corresponding to the shared subscriptions, send it to c1 here.
  5. Client c1 subscribed t1 with share1 (QoS1)
    • Simply updated QoS0 to QoS1 on c1 t1 share1 subscription. If new QoS is QoS0 then do nothing.

I understand the spec as above.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that you misunderstand shared subscriptions spec.
If the algorithm is simple round robin, then the following behavior is expected.
Simple round robin means online session and offline session have the same priority.
Implementer can use online prioritized policy but it is different topic.

  1. Client c1 connected to broker with Session Expiry Interval: infinity.
  2. Client c2 connected to broker with Session Expiry Interval: infinity.
  3. Client c1 subscribed t1 with share1 (QoS0)
  4. Client c2 subscribed t1 with share1 (QoS0)
  5. Client c1 disconnected.
  6. Client c3 publish t1, then the message (a) is delivered to c1 (online session).
  7. Client c3 publish t1, then the message (b) is delivered to c2 (offline session).
  8. Client c3 publish t1, then the message (c) is delivered to c1 (online session).
  9. Client c3 publish t1, then the message (d) is delivered to c2 (offline session).
  10. Client c1 connected to broker with Session Expiry Interval: infinity and Clean Start: false.
  11. The broker restore c1's shared subscription (it is similar to non shared subscriptions), and the broker deliver the message (a) and (c) to c1.
  12. Client c1 subscribed t1 with share1 (QoS0), the broker do nothing.

It is very similar to non shared subscriptions behavior. But deliver to only one of client that have the same share name instead of all clients.

So, restore shared subscriptions and deliver stored message process should be step 10 not step 12


// This initializes the round-robin iterator storage for this shared subscription, if and only if, it is not already.
// Notably, we do not check the return of the emplace call, since if the item already existed, we don't want a new one made.
shared_subs_.emplace(share, topic, subs_.get<tag_topic>().find(std::make_tuple(share, topic)));
Copy link
Owner

@redboltz redboltz Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a related comment of https://github.com/redboltz/mqtt_cpp/pull/672/files#r502181653
The emplace only works if saved_shared_subs_ exists. There is no chance to emplace newly shared subscribed one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subs_.get<tag_topic>().find(std::make_tuple(share....))

Will find the subscription for this share-group and topic from the for loop earlier in the function on line 1068

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand yet.

  1. Client c1 connected to broker with Session Expiry Interval: infinity.
  2. Client c1 subscribed t1 with share1 (QoS0)

In this timing, emplacing some information to shared_subs_ is not needed ???

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In do_publish(), shared_subs_ is referred.
https://github.com/jonesmz/mqtt_cpp/blob/shared-subscription/test/test_broker.hpp#L1243

If the first shared subscribe dosen't add shared_subs_, then the publish message is not delivered.

@redboltz
Copy link
Owner

It is very difficult for me to continue discussion on this PR.
I created #675 based on #672 . I updated codes, fixed bugs, and added test. It is the first step.
I moved to #675 .

I added temporary fix for #672 (comment)
It works well for delivering publish, so far. I'm still not sure in detail about the code.

@redboltz redboltz mentioned this pull request Oct 12, 2020
redboltz added a commit that referenced this pull request Oct 14, 2020
It seems that #672 contains not only shared subscription
support (incomplete) but also wildcard bug fix.

This is the part of wildcard bug fix.
@redboltz redboltz mentioned this pull request Oct 14, 2020
@jonesmz
Copy link
Contributor Author

jonesmz commented Oct 19, 2020

Closing this PR because it is superseded by other recent work.

@jonesmz jonesmz closed this Oct 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants