diff --git a/examples/client/main.cpp b/examples/client/main.cpp index 0ed513f..17c61cc 100644 --- a/examples/client/main.cpp +++ b/examples/client/main.cpp @@ -382,18 +382,18 @@ void wait_for_notification(rpc_client & client) { void test_sub1() { rpc_client client; + client.enable_auto_reconnect(); + client.enable_auto_heartbeat(); bool r = client.connect("127.0.0.1", 9000); if (!r) { return; } - client.enable_auto_heartbeat(); - client.subscribe("key", [](string_view data) { std::cout << data << "\n"; }); - client.subscribe("key", "sub_key", [](string_view data) { + client.subscribe("key", "unique_token", [](string_view data) { std::cout << data << "\n"; }); @@ -404,13 +404,13 @@ void test_sub1() { } client1.publish("key", "hello subscriber"); - client1.publish("key", "sub_key", "ok"); + client1.publish_by_token("key", "sub_key", "ok"); std::thread thd([&client1] { while (true) { try { client1.publish("key", "hello subscriber"); - client1.publish("key", "sub_key", "ok"); + client1.publish_by_token("key", "unique_token", "ok"); std::this_thread::sleep_for(std::chrono::seconds(1)); } catch (const std::exception& ex) { diff --git a/examples/server/main.cpp b/examples/server/main.cpp index 32f6343..3fc008b 100644 --- a/examples/server/main.cpp +++ b/examples/server/main.cpp @@ -105,15 +105,19 @@ int main() { server.register_handler("echo", echo); server.register_handler("get_int", get_int); - server.register_handler("publish", [&server](rpc_conn conn, std::string key, std::string sub_key, std::string val) { - server.publish(std::move(key), std::move(sub_key), std::move(val)); + server.register_handler("publish_by_token", [&server](rpc_conn conn, std::string key, std::string token, std::string val) { + server.publish_by_token(std::move(key), std::move(token), std::move(val)); + }); + + server.register_handler("publish", [&server](rpc_conn conn, std::string key, std::string token, std::string val) { + server.publish(std::move(key), std::move(val)); }); std::thread thd([&server] { while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); - server.publish("key", "", "hello subscriber"); - server.publish("key", "sub_key", "hello subscriber"); + server.publish("key", "hello subscriber"); + server.publish_by_token("key", "unique_token", "hello subscriber"); } }); diff --git a/include/rpc_client.hpp b/include/rpc_client.hpp index 2b7cf63..c8e6879 100644 --- a/include/rpc_client.hpp +++ b/include/rpc_client.hpp @@ -274,8 +274,8 @@ namespace rest_rpc { } template - void subscribe(std::string key, std::string sub_key, Func f) { - auto composite_key = key + sub_key; + void subscribe(std::string key, std::string token, Func f) { + auto composite_key = key + token; auto it = sub_map_.find(composite_key); if (it != sub_map_.end()) { assert("duplicated subscribe"); @@ -284,18 +284,18 @@ namespace rest_rpc { sub_map_.emplace(std::move(composite_key), std::move(f)); msgpack_codec codec; - auto ret = codec.pack_args(key, sub_key); + auto ret = codec.pack_args(key, token); write(0, request_type::sub_pub, std::move(ret)); } - template + template void publish(std::string key, T&& t) { - call<100000>("publish", std::move(key), "", std::forward(t)); + call("publish", std::move(key), "", std::forward(t)); } - template - void publish(std::string key, std::string sub_key, T&& t) { - call<100000>("publish", std::move(key), std::move(sub_key), std::forward(t)); + template + void publish_by_token(std::string key, std::string token, T&& t) { + call("publish_by_token", std::move(key), std::move(token), std::forward(t)); } private: diff --git a/include/rpc_server.h b/include/rpc_server.h index e2ba069..fa5aa30 100644 --- a/include/rpc_server.h +++ b/include/rpc_server.h @@ -104,51 +104,26 @@ namespace rest_rpc { publish(key, "", std::move(data)); } - void publish(const std::string& key, std::string sub_key, std::string data) { - decltype(sub_map_.equal_range(key)) range; - - { - std::unique_lock lock(sub_mtx_); - if (sub_map_.empty()) - return; - - range = sub_map_.equal_range(key + sub_key); - } - - auto shared_data = std::make_shared(std::move(data)); - for (auto it = range.first; it != range.second; ++it) { - auto conn = it->second.lock(); - if (conn == nullptr || conn->has_closed()) { - if (!sub_key.empty()) { - std::unique_lock lock(retry_mtx_); - auto it = retry_map_.find(sub_key); - if (it == retry_map_.end()) - continue; - - auto retry = std::make_shared(io_service_pool_.get_io_service(), - shared_data, 30 * 1000); - retry_map_.emplace(std::move(sub_key), retry); - retry->start_timer(); - } - - continue; - } + void publish_by_token(std::string token, std::string data) { + publish("", std::move(token), std::move(data)); + } - conn->publish(key + sub_key, *shared_data); - } + void publish_by_token(const std::string& key, std::string token, std::string data) { + publish(key, std::move(token), std::move(data)); } - std::unordered_multimap> get_subscriber_map() { + std::set get_token_list() { std::unique_lock lock(sub_mtx_); - return sub_map_; + return token_list_; } private: void do_accept() { conn_.reset(new connection(io_service_pool_.get_io_service(), timeout_seconds_)); - conn_->set_callback([this](std::string key, std::string sub_key, std::weak_ptr conn) { + conn_->set_callback([this](std::string key, std::string token, std::weak_ptr conn) { std::unique_lock lock(sub_mtx_); - sub_map_.emplace(key + sub_key, conn); + sub_map_.emplace(std::move(key) + token, conn); + token_list_.emplace(std::move(token)); }); acceptor_.async_accept(conn_->socket(), [this](boost::system::error_code ec) { @@ -202,6 +177,28 @@ namespace rest_rpc { } } + void publish(std::string key, std::string token, std::string data) { + decltype(sub_map_.equal_range(key)) range; + + { + std::unique_lock lock(sub_mtx_); + if (sub_map_.empty()) + return; + + range = sub_map_.equal_range(key + token); + } + + auto shared_data = std::make_shared(std::move(data)); + for (auto it = range.first; it != range.second; ++it) { + auto conn = it->second.lock(); + if (conn == nullptr || conn->has_closed()) { + continue; + } + + conn->publish(key + token, *shared_data); + } + } + io_service_pool io_service_pool_; tcp::acceptor acceptor_; std::shared_ptr conn_; @@ -217,11 +214,9 @@ namespace rest_rpc { std::function conn_timeout_callback_; std::unordered_multimap> sub_map_; + std::set token_list_; std::mutex sub_mtx_; - std::map> retry_map_; - std::mutex retry_mtx_; - std::shared_ptr pub_sub_thread_; bool stop_check_pub_sub_ = false; };