Skip to content

Commit

Permalink
support quit with ctrl c
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Jan 27, 2022
1 parent 987a306 commit da711e4
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 26 deletions.
11 changes: 6 additions & 5 deletions examples/server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ int main() {
std::cout << "remote client address: " << conn->remote_address()
<< " networking error, reason: " << reason << "\n";
});
std::thread thd([&server] {

bool stop = false;
std::thread thd([&server, &stop] {
person p{1, "tom", 20};
while (true) {
while (!stop) {
server.publish("key", "hello subscriber");
auto list = server.get_token_list();
for (auto &token : list) {
Expand All @@ -159,7 +161,6 @@ int main() {
});

server.run();

std::string str;
std::cin >> str;
stop = true;
thd.join();
}
64 changes: 43 additions & 21 deletions include/rest_rpc/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ class rpc_server : private asio::noncopyable {
size_t check_seconds = 10)
: io_service_pool_(size), acceptor_(io_service_pool_.get_io_service(),
tcp::endpoint(tcp::v4(), port)),
timeout_seconds_(timeout_seconds), check_seconds_(check_seconds) {
timeout_seconds_(timeout_seconds), check_seconds_(check_seconds),
signals_(io_service_pool_.get_io_service()) {
do_accept();
check_thread_ = std::make_shared<std::thread>([this] { clean(); });
pub_sub_thread_ =
std::make_shared<std::thread>([this] { clean_sub_pub(); });
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)
do_await_stop();
}

rpc_server(unsigned short port, size_t size, ssl_configure ssl_conf,
Expand All @@ -37,26 +44,7 @@ class rpc_server : private asio::noncopyable {
#endif
}

~rpc_server() {
{
std::unique_lock<std::mutex> lock(mtx_);
stop_check_ = true;
cv_.notify_all();
}
check_thread_->join();

{
std::unique_lock<std::mutex> lock(sub_mtx_);
stop_check_pub_sub_ = true;
sub_cv_.notify_all();
}
pub_sub_thread_->join();

io_service_pool_.stop();
if (thd_) {
thd_->join();
}
}
~rpc_server() { stop(); }

void async_run() {
thd_ = std::make_shared<std::thread>([this] { io_service_pool_.run(); });
Expand Down Expand Up @@ -99,6 +87,32 @@ class rpc_server : private asio::noncopyable {
return token_list_;
}

void stop() {
if (has_stoped_) {
return;
}

{
std::unique_lock<std::mutex> lock(mtx_);
stop_check_ = true;
cv_.notify_all();
}
check_thread_->join();

{
std::unique_lock<std::mutex> lock(sub_mtx_);
stop_check_pub_sub_ = true;
sub_cv_.notify_all();
}
pub_sub_thread_->join();

io_service_pool_.stop();
if (thd_) {
thd_->join();
}
has_stoped_ = true;
}

private:
void do_accept() {
conn_.reset(new connection(io_service_pool_.get_io_service(),
Expand Down Expand Up @@ -207,6 +221,11 @@ class rpc_server : private asio::noncopyable {
return std::make_shared<std::string>(buf.data(), buf.size());
}

void do_await_stop() {
signals_.async_wait(
[this](std::error_code /*ec*/, int /*signo*/) { stop(); });
}

io_service_pool io_service_pool_;
tcp::acceptor acceptor_;
std::shared_ptr<connection> conn_;
Expand All @@ -221,6 +240,8 @@ class rpc_server : private asio::noncopyable {
bool stop_check_ = false;
std::condition_variable cv_;

asio::signal_set signals_;

std::function<void(int64_t)> conn_timeout_callback_;
std::function<void(std::shared_ptr<connection>, std::string)>
on_net_err_callback_ = nullptr;
Expand All @@ -234,6 +255,7 @@ class rpc_server : private asio::noncopyable {

ssl_configure ssl_conf_;
router router_;
std::atomic_bool has_stoped_ = {false};
};
} // namespace rpc_service
} // namespace rest_rpc
Expand Down

0 comments on commit da711e4

Please sign in to comment.