From 64373139fac5f41be7b646f45042fbd55f9fa900 Mon Sep 17 00:00:00 2001 From: qicosmos Date: Tue, 27 Aug 2019 14:12:14 +0800 Subject: [PATCH] improve publish interface, support publish struct --- examples/client/main.cpp | 59 ++++++++++++++++++++++------------------ examples/server/main.cpp | 4 ++- include/connection.h | 2 +- include/rpc_client.hpp | 8 ++++-- include/rpc_server.h | 20 +++++++++++--- 5 files changed, 59 insertions(+), 34 deletions(-) diff --git a/examples/client/main.cpp b/examples/client/main.cpp index 17c61cc..b1c3f21 100644 --- a/examples/client/main.cpp +++ b/examples/client/main.cpp @@ -394,6 +394,12 @@ void test_sub1() { }); client.subscribe("key", "unique_token", [](string_view data) { + msgpack_codec codec; + person p = codec.unpack(data.data(), data.size()); + std::cout << p.name << "\n"; + }); + + client.subscribe("key1", "unique_token", [](string_view data) { std::cout << data << "\n"; }); @@ -403,33 +409,34 @@ void test_sub1() { return; } - client1.publish("key", "hello subscriber"); - client1.publish_by_token("key", "sub_key", "ok"); - - std::thread thd([&client1] { - while (true) { - try { - client1.publish("key", "hello subscriber"); - client1.publish_by_token("key", "unique_token", "ok"); - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - catch (const std::exception& ex) { - std::cout << ex.what() << "\n"; - } - } - }); + //person p{10, "jack", 21}; + //client1.publish("key", "hello subscriber"); + //client1.publish_by_token("key", "sub_key", p); + // + //std::thread thd([&client1, p] { + // while (true) { + // try { + // client1.publish("key", "hello subscriber"); + // client1.publish_by_token("key", "unique_token", p); + // std::this_thread::sleep_for(std::chrono::seconds(1)); + // } + // catch (const std::exception& ex) { + // std::cout << ex.what() << "\n"; + // } + // } + //}); - std::thread thd1([&client1] { - while (true) { - try { - int r = client1.call("add", 2, 3); - std::cout << "add result: " << r << "\n"; - } - catch (const std::exception& ex) { - std::cout << ex.what() << "\n"; - } - } - }); + //std::thread thd1([&client1] { + // while (true) { + // try { + // int r = client1.call("add", 2, 3); + // std::cout << "add result: " << r << "\n"; + // } + // catch (const std::exception& ex) { + // std::cout << ex.what() << "\n"; + // } + // } + //}); std::string str; std::cin >> str; diff --git a/examples/server/main.cpp b/examples/server/main.cpp index 3fc008b..9d50b7f 100644 --- a/examples/server/main.cpp +++ b/examples/server/main.cpp @@ -114,10 +114,12 @@ int main() { }); std::thread thd([&server] { + person p{ 1, "tom", 20 }; while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); server.publish("key", "hello subscriber"); - server.publish_by_token("key", "unique_token", "hello subscriber"); + server.publish_by_token("key", "unique_token", p); + server.publish_by_token("key1", "unique_token", "hello subscriber1"); } }); diff --git a/include/connection.h b/include/connection.h index 106d8f4..8f2ffb0 100644 --- a/include/connection.h +++ b/include/connection.h @@ -77,7 +77,7 @@ namespace rest_rpc { return socket_.remote_endpoint().address().to_string(); } - + void publish(const std::string& key, const std::string& data) { auto result = msgpack_codec::pack_args_str(result_code::OK, key, data); response(0, std::move(result), request_type::sub_pub); diff --git a/include/rpc_client.hpp b/include/rpc_client.hpp index c8e6879..975c0d1 100644 --- a/include/rpc_client.hpp +++ b/include/rpc_client.hpp @@ -290,12 +290,16 @@ namespace rest_rpc { template void publish(std::string key, T&& t) { - call("publish", std::move(key), "", std::forward(t)); + msgpack_codec codec; + auto buf = codec.pack(std::move(t)); + call("publish", std::move(key), "", std::string(buf.data(), buf.size())); } template void publish_by_token(std::string key, std::string token, T&& t) { - call("publish_by_token", std::move(key), std::move(token), std::forward(t)); + msgpack_codec codec; + auto buf = codec.pack(std::move(t)); + call("publish_by_token", std::move(key), std::move(token), std::string(buf.data(), buf.size())); } private: diff --git a/include/rpc_server.h b/include/rpc_server.h index 67a0f4e..9ac19fe 100644 --- a/include/rpc_server.h +++ b/include/rpc_server.h @@ -67,11 +67,13 @@ namespace rest_rpc { conn_timeout_callback_ = std::move(callback); } - void publish(const std::string& key, std::string data) { + template + void publish(const std::string& key, T data) { publish(key, "", std::move(data)); } - void publish_by_token(const std::string& key, std::string token, std::string data) { + template + void publish_by_token(const std::string& key, std::string token, T data) { publish(key, std::move(token), std::move(data)); } @@ -140,7 +142,8 @@ namespace rest_rpc { } } - void publish(std::string key, std::string token, std::string data) { + template + void publish(std::string key, std::string token, T data) { decltype(sub_map_.equal_range(key)) range; { @@ -151,7 +154,16 @@ namespace rest_rpc { range = sub_map_.equal_range(key + token); } - auto shared_data = std::make_shared(std::move(data)); + std::shared_ptr shared_data = nullptr; + if constexpr(std::is_assignable::value) { + shared_data = std::make_shared(std::move(data)); + } + else { + msgpack_codec codec; + auto buf = codec.pack(std::move(data)); + shared_data = std::make_shared(buf.data(), buf.size()); + } + for (auto it = range.first; it != range.second; ++it) { auto conn = it->second.lock(); if (conn == nullptr || conn->has_closed()) {