Skip to content

Commit

Permalink
improve publish interface, support publish struct
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Aug 27, 2019
1 parent 0892629 commit 6437313
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 34 deletions.
59 changes: 33 additions & 26 deletions examples/client/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,12 @@ void test_sub1() {
});

client.subscribe("key", "unique_token", [](string_view data) {
msgpack_codec codec;
person p = codec.unpack<person>(data.data(), data.size());
std::cout << p.name << "\n";
});

client.subscribe("key1", "unique_token", [](string_view data) {
std::cout << data << "\n";
});

Expand All @@ -403,33 +409,34 @@ void test_sub1() {
return;
}

client1.publish("key", "hello subscriber");
client1.publish_by_token("key", "sub_key", "ok");

std::thread thd([&client1] {
while (true) {
try {
client1.publish("key", "hello subscriber");
client1.publish_by_token("key", "unique_token", "ok");
std::this_thread::sleep_for(std::chrono::seconds(1));
}
catch (const std::exception& ex) {
std::cout << ex.what() << "\n";
}
}
});
//person p{10, "jack", 21};
//client1.publish("key", "hello subscriber");
//client1.publish_by_token("key", "sub_key", p);
//
//std::thread thd([&client1, p] {
// while (true) {
// try {
// client1.publish("key", "hello subscriber");
// client1.publish_by_token("key", "unique_token", p);
// std::this_thread::sleep_for(std::chrono::seconds(1));
// }
// catch (const std::exception& ex) {
// std::cout << ex.what() << "\n";
// }
// }
//});

std::thread thd1([&client1] {
while (true) {
try {
int r = client1.call<int>("add", 2, 3);
std::cout << "add result: " << r << "\n";
}
catch (const std::exception& ex) {
std::cout << ex.what() << "\n";
}
}
});
//std::thread thd1([&client1] {
// while (true) {
// try {
// int r = client1.call<int>("add", 2, 3);
// std::cout << "add result: " << r << "\n";
// }
// catch (const std::exception& ex) {
// std::cout << ex.what() << "\n";
// }
// }
//});

std::string str;
std::cin >> str;
Expand Down
4 changes: 3 additions & 1 deletion examples/server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ int main() {
});

std::thread thd([&server] {
person p{ 1, "tom", 20 };
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
server.publish("key", "hello subscriber");
server.publish_by_token("key", "unique_token", "hello subscriber");
server.publish_by_token("key", "unique_token", p);
server.publish_by_token("key1", "unique_token", "hello subscriber1");
}
});

Expand Down
2 changes: 1 addition & 1 deletion include/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace rest_rpc {

return socket_.remote_endpoint().address().to_string();
}

void publish(const std::string& key, const std::string& data) {
auto result = msgpack_codec::pack_args_str(result_code::OK, key, data);
response(0, std::move(result), request_type::sub_pub);
Expand Down
8 changes: 6 additions & 2 deletions include/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,16 @@ namespace rest_rpc {

template<typename T, size_t TIMEOUT = 3>
void publish(std::string key, T&& t) {
call<TIMEOUT>("publish", std::move(key), "", std::forward<T>(t));
msgpack_codec codec;
auto buf = codec.pack(std::move(t));
call<TIMEOUT>("publish", std::move(key), "", std::string(buf.data(), buf.size()));
}

template<typename T, size_t TIMEOUT=3>
void publish_by_token(std::string key, std::string token, T&& t) {
call<TIMEOUT>("publish_by_token", std::move(key), std::move(token), std::forward<T>(t));
msgpack_codec codec;
auto buf = codec.pack(std::move(t));
call<TIMEOUT>("publish_by_token", std::move(key), std::move(token), std::string(buf.data(), buf.size()));
}

private:
Expand Down
20 changes: 16 additions & 4 deletions include/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ namespace rest_rpc {
conn_timeout_callback_ = std::move(callback);
}

void publish(const std::string& key, std::string data) {
template<typename T>
void publish(const std::string& key, T data) {
publish(key, "", std::move(data));
}

void publish_by_token(const std::string& key, std::string token, std::string data) {
template<typename T>
void publish_by_token(const std::string& key, std::string token, T data) {
publish(key, std::move(token), std::move(data));
}

Expand Down Expand Up @@ -140,7 +142,8 @@ namespace rest_rpc {
}
}

void publish(std::string key, std::string token, std::string data) {
template<typename T>
void publish(std::string key, std::string token, T data) {
decltype(sub_map_.equal_range(key)) range;

{
Expand All @@ -151,7 +154,16 @@ namespace rest_rpc {
range = sub_map_.equal_range(key + token);
}

auto shared_data = std::make_shared<std::string>(std::move(data));
std::shared_ptr<std::string> shared_data = nullptr;
if constexpr(std::is_assignable<std::string, T>::value) {
shared_data = std::make_shared<std::string>(std::move(data));
}
else {
msgpack_codec codec;
auto buf = codec.pack(std::move(data));
shared_data = std::make_shared<std::string>(buf.data(), buf.size());
}

for (auto it = range.first; it != range.second; ++it) {
auto conn = it->second.lock();
if (conn == nullptr || conn->has_closed()) {
Expand Down

0 comments on commit 6437313

Please sign in to comment.