From 11c634edb6de219fb79dd0665fad8cdcaa03e275 Mon Sep 17 00:00:00 2001 From: weizili Date: Tue, 11 Apr 2017 21:38:13 +0800 Subject: [PATCH] Fix a thread.join exception coredump because of lamba function use = to capture parameters which will cause the shared_ptrstd::thread copied and then we cannot determine when to destruct the thread. See https://github.com/Qihoo360/evpp/issues/29 --- evpp/event_loop_thread.cc | 2 ++ evpp/http/http_server.cc | 46 ++++++++++++++++++++++++--------------- evpp/http/http_server.h | 4 ++-- test/http_server_test.cc | 4 +++- 4 files changed, 35 insertions(+), 21 deletions(-) diff --git a/evpp/event_loop_thread.cc b/evpp/event_loop_thread.cc index 4b95ac5e6..00e8ca974 100644 --- a/evpp/event_loop_thread.cc +++ b/evpp/event_loop_thread.cc @@ -58,6 +58,8 @@ void EventLoopThread::Run(const Functor& pre, const Functor& post) { assert(event_loop_->IsStopped()); LOG_INFO << "this=" << this << " EventLoopThread stopped"; status_ = kStopped; + usleep(10 * 1000 * 1000); + LOG_INFO << "this=" << this << " EventLoopThread stopped. The end"; } void EventLoopThread::Stop(bool wait_thread_exit) { diff --git a/evpp/http/http_server.cc b/evpp/http/http_server.cc index decbf1c8a..9d8396f68 100644 --- a/evpp/http/http_server.cc +++ b/evpp/http/http_server.cc @@ -24,11 +24,11 @@ bool Server::Init(int listen_port) { lt.thread = std::make_shared(); lt.thread->SetName(std::string("StandaloneHTTPServer-Main-") + std::to_string(listen_port)); - lt.hserver = std::make_shared(lt.thread->loop()); - if (!lt.hserver->Listen(listen_port)) { + lt.hservice = std::make_shared(lt.thread->loop()); + if (!lt.hservice->Listen(listen_port)) { int serrno = errno; LOG_ERROR << "this=" << this << " http server listen at port " << listen_port << " failed. errno=" << serrno << " " << strerror(serrno); - lt.hserver->Stop(); + lt.hservice->Stop(); return false; } listen_threads_.push_back(lt); @@ -71,34 +71,44 @@ bool Server::AfterFork() { bool Server::Start() { std::shared_ptr> exited_listen_thread_count(new std::atomic(0)); bool rc = tpool_->Start(true); + if (!rc) { + LOG_ERROR << "this=" << this << " start thread pool failed."; + return false; + } for (auto& lt : listen_threads_) { - auto http_close_fn = [=]() { - lt.hserver->Stop(); - LOG_INFO << "this=" << this << " http service at 0.0.0.0:" << lt.hserver->port() << " has stopped."; - OnListenThreadExited(exited_listen_thread_count->fetch_add(1) + 1); + auto& hservice = lt.hservice; + auto& lthread = lt.thread; + auto http_close_fn = [hservice, this, exited_listen_thread_count]() { + hservice->Stop(); + LOG_INFO << "this=" << this << " http service at 0.0.0.0:" << hservice->port() << " has stopped."; + this->OnListeningThreadExited(exited_listen_thread_count->fetch_add(1) + 1); }; - rc = rc && lt.thread->Start(true, + rc = lthread->Start(true, EventLoopThread::Functor(), http_close_fn); - assert(lt.thread->IsRunning()); + if (!rc) { + LOG_ERROR << "this=" << this << " start listening thread failed."; + return false; + } + + assert(lthread->IsRunning()); for (auto& c : callbacks_) { auto cb = std::bind(&Server::Dispatch, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, c.second); - lt.hserver->RegisterHandler(c.first, cb); + hservice->RegisterHandler(c.first, cb); } HTTPRequestCallback cb = std::bind(&Server::Dispatch, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, default_callback_); - lt.hserver->RegisterDefaultHandler(cb); - if (!rc) { - return false; - } + hservice->RegisterDefaultHandler(cb); } + + assert(rc); while (!IsRunning()) { usleep(1); } @@ -157,7 +167,7 @@ void Server::Pause() { for (auto& lt : listen_threads_) { EventLoop* loop = lt.thread->loop(); auto f = [<]() { - lt.hserver->Pause(); + lt.hservice->Pause(); }; loop->RunInLoop(f); } @@ -168,7 +178,7 @@ void Server::Continue() { for (auto& lt : listen_threads_) { EventLoop* loop = lt.thread->loop(); auto f = [<]() { - lt.hserver->Continue(); + lt.hservice->Continue(); }; loop->RunInLoop(f); } @@ -271,7 +281,7 @@ EventLoop* Server::GetNextLoop(EventLoop* default_loop, const ContextPtr& ctx) { #endif } -void Server::OnListenThreadExited(int exited_listen_thread_count) { +void Server::OnListeningThreadExited(int exited_listen_thread_count) { LOG_INFO << "this=" << this << " OnListenThreadExited exited_listen_thread_count=" << exited_listen_thread_count << " listen_threads_.size=" << listen_threads_.size(); if (exited_listen_thread_count == int(listen_threads_.size())) { LOG_INFO << "this=" << this << " stop the working thread pool."; @@ -281,7 +291,7 @@ void Server::OnListenThreadExited(int exited_listen_thread_count) { Service* Server::service(int index) const { if (index < int(listen_threads_.size())) { - return listen_threads_[index].hserver.get(); + return listen_threads_[index].hservice.get(); } return nullptr; diff --git a/evpp/http/http_server.h b/evpp/http/http_server.h index 9a3975d9f..08af51ab2 100644 --- a/evpp/http/http_server.h +++ b/evpp/http/http_server.h @@ -59,14 +59,14 @@ class EVPP_EXPORT Server : public ThreadDispatchPolicy { const HTTPRequestCallback& user_callback); EventLoop* GetNextLoop(EventLoop* default_loop, const ContextPtr& ctx); - void OnListenThreadExited(int exited_listen_thread_count); + void OnListeningThreadExited(int exited_listen_thread_count); private: struct ListenThread { // The listening main thread std::shared_ptr thread; // Every listening main thread runs a HTTP Service to listen, receive, dispatch, send response the HTTP request. - std::shared_ptr hserver; + std::shared_ptr hservice; }; std::vector listen_threads_; diff --git a/test/http_server_test.cc b/test/http_server_test.cc index 73fade57f..b971dc93b 100644 --- a/test/http_server_test.cc +++ b/test/http_server_test.cc @@ -159,8 +159,9 @@ static void TestAll() { } -TEST_UNIT(testHTTPServer1) { +TEST_UNIT(testHTTPServer) { for (int i = 0; i < 5; ++i) { + LOG_INFO << "Running testHTTPServer i=" << i; evpp::http::Server ph(i); ph.RegisterDefaultHandler(&DefaultRequestHandler); ph.RegisterHandler("/push/boot", &RequestHandler); @@ -168,6 +169,7 @@ TEST_UNIT(testHTTPServer1) { H_TEST_ASSERT(r); TestAll(); ph.Stop(true); + usleep(1000 * 1000); } }