Skip to content

Commit

Permalink
add token interface
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Aug 27, 2019
1 parent ae51d8c commit d8843c0
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 55 deletions.
10 changes: 5 additions & 5 deletions examples/client/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,18 @@ void wait_for_notification(rpc_client & client) {

void test_sub1() {
rpc_client client;
client.enable_auto_reconnect();
client.enable_auto_heartbeat();
bool r = client.connect("127.0.0.1", 9000);
if (!r) {
return;
}

client.enable_auto_heartbeat();

client.subscribe("key", [](string_view data) {
std::cout << data << "\n";
});

client.subscribe("key", "sub_key", [](string_view data) {
client.subscribe("key", "unique_token", [](string_view data) {
std::cout << data << "\n";
});

Expand All @@ -404,13 +404,13 @@ void test_sub1() {
}

client1.publish("key", "hello subscriber");
client1.publish("key", "sub_key", "ok");
client1.publish_by_token("key", "sub_key", "ok");

std::thread thd([&client1] {
while (true) {
try {
client1.publish("key", "hello subscriber");
client1.publish("key", "sub_key", "ok");
client1.publish_by_token("key", "unique_token", "ok");
std::this_thread::sleep_for(std::chrono::seconds(1));
}
catch (const std::exception& ex) {
Expand Down
12 changes: 8 additions & 4 deletions examples/server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,19 @@ int main() {
server.register_handler("echo", echo);
server.register_handler("get_int", get_int);

server.register_handler("publish", [&server](rpc_conn conn, std::string key, std::string sub_key, std::string val) {
server.publish(std::move(key), std::move(sub_key), std::move(val));
server.register_handler("publish_by_token", [&server](rpc_conn conn, std::string key, std::string token, std::string val) {
server.publish_by_token(std::move(key), std::move(token), std::move(val));
});

server.register_handler("publish", [&server](rpc_conn conn, std::string key, std::string token, std::string val) {
server.publish(std::move(key), std::move(val));
});

std::thread thd([&server] {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
server.publish("key", "", "hello subscriber");
server.publish("key", "sub_key", "hello subscriber");
server.publish("key", "hello subscriber");
server.publish_by_token("key", "unique_token", "hello subscriber");
}
});

Expand Down
16 changes: 8 additions & 8 deletions include/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ namespace rest_rpc {
}

template<typename Func>
void subscribe(std::string key, std::string sub_key, Func f) {
auto composite_key = key + sub_key;
void subscribe(std::string key, std::string token, Func f) {
auto composite_key = key + token;
auto it = sub_map_.find(composite_key);
if (it != sub_map_.end()) {
assert("duplicated subscribe");
Expand All @@ -284,18 +284,18 @@ namespace rest_rpc {

sub_map_.emplace(std::move(composite_key), std::move(f));
msgpack_codec codec;
auto ret = codec.pack_args(key, sub_key);
auto ret = codec.pack_args(key, token);
write(0, request_type::sub_pub, std::move(ret));
}

template<typename T>
template<typename T, size_t TIMEOUT = 3>
void publish(std::string key, T&& t) {
call<100000>("publish", std::move(key), "", std::forward<T>(t));
call<TIMEOUT>("publish", std::move(key), "", std::forward<T>(t));
}

template<typename T>
void publish(std::string key, std::string sub_key, T&& t) {
call<100000>("publish", std::move(key), std::move(sub_key), std::forward<T>(t));
template<typename T, size_t TIMEOUT=3>
void publish_by_token(std::string key, std::string token, T&& t) {
call<TIMEOUT>("publish_by_token", std::move(key), std::move(token), std::forward<T>(t));
}

private:
Expand Down
71 changes: 33 additions & 38 deletions include/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,51 +104,26 @@ namespace rest_rpc {
publish(key, "", std::move(data));
}

void publish(const std::string& key, std::string sub_key, std::string data) {
decltype(sub_map_.equal_range(key)) range;

{
std::unique_lock<std::mutex> lock(sub_mtx_);
if (sub_map_.empty())
return;

range = sub_map_.equal_range(key + sub_key);
}

auto shared_data = std::make_shared<std::string>(std::move(data));
for (auto it = range.first; it != range.second; ++it) {
auto conn = it->second.lock();
if (conn == nullptr || conn->has_closed()) {
if (!sub_key.empty()) {
std::unique_lock<std::mutex> lock(retry_mtx_);
auto it = retry_map_.find(sub_key);
if (it == retry_map_.end())
continue;

auto retry = std::make_shared<retry_data>(io_service_pool_.get_io_service(),
shared_data, 30 * 1000);
retry_map_.emplace(std::move(sub_key), retry);
retry->start_timer();
}

continue;
}
void publish_by_token(std::string token, std::string data) {
publish("", std::move(token), std::move(data));
}

conn->publish(key + sub_key, *shared_data);
}
void publish_by_token(const std::string& key, std::string token, std::string data) {
publish(key, std::move(token), std::move(data));
}

std::unordered_multimap<std::string, std::weak_ptr<connection>> get_subscriber_map() {
std::set<std::string> get_token_list() {
std::unique_lock<std::mutex> lock(sub_mtx_);
return sub_map_;
return token_list_;
}

private:
void do_accept() {
conn_.reset(new connection(io_service_pool_.get_io_service(), timeout_seconds_));
conn_->set_callback([this](std::string key, std::string sub_key, std::weak_ptr<connection> conn) {
conn_->set_callback([this](std::string key, std::string token, std::weak_ptr<connection> conn) {
std::unique_lock<std::mutex> lock(sub_mtx_);
sub_map_.emplace(key + sub_key, conn);
sub_map_.emplace(std::move(key) + token, conn);
token_list_.emplace(std::move(token));
});

acceptor_.async_accept(conn_->socket(), [this](boost::system::error_code ec) {
Expand Down Expand Up @@ -202,6 +177,28 @@ namespace rest_rpc {
}
}

void publish(std::string key, std::string token, std::string data) {
decltype(sub_map_.equal_range(key)) range;

{
std::unique_lock<std::mutex> lock(sub_mtx_);
if (sub_map_.empty())
return;

range = sub_map_.equal_range(key + token);
}

auto shared_data = std::make_shared<std::string>(std::move(data));
for (auto it = range.first; it != range.second; ++it) {
auto conn = it->second.lock();
if (conn == nullptr || conn->has_closed()) {
continue;
}

conn->publish(key + token, *shared_data);
}
}

io_service_pool io_service_pool_;
tcp::acceptor acceptor_;
std::shared_ptr<connection> conn_;
Expand All @@ -217,11 +214,9 @@ namespace rest_rpc {

std::function<void(int64_t)> conn_timeout_callback_;
std::unordered_multimap<std::string, std::weak_ptr<connection>> sub_map_;
std::set<std::string> token_list_;
std::mutex sub_mtx_;

std::map<std::string, std::shared_ptr<retry_data>> retry_map_;
std::mutex retry_mtx_;

std::shared_ptr<std::thread> pub_sub_thread_;
bool stop_check_pub_sub_ = false;
};
Expand Down

0 comments on commit d8843c0

Please sign in to comment.