From 07401b384fd4144314d949ab6364a9493843c82a Mon Sep 17 00:00:00 2001 From: Benjamin Roland Buch Date: Thu, 5 Apr 2018 16:11:08 +0200 Subject: [PATCH] implemented server via webservice library --- src/http_server.cpp | 124 ++++++++++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 50 deletions(-) diff --git a/src/http_server.cpp b/src/http_server.cpp index fd8cdbc..201de5a 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -46,7 +46,7 @@ namespace disposer_module::http_server_component{ : name(module("service_name"_param)) , module_(module) {} - void on_open(std::uintptr_t identifier)override{ + void on_open(webservice::ws_identifier identifier)override{ module_.log( [this, identifier](logsys::stdlogb& os){ os << "live service " << name << " on_open identifier(" @@ -57,7 +57,7 @@ namespace disposer_module::http_server_component{ }); } - void on_close(std::uintptr_t identifier)override{ + void on_close(webservice::ws_identifier identifier)override{ module_.log( [this, identifier](logsys::stdlogb& os){ os << "live service " << name << " on_close identifier(" @@ -69,7 +69,7 @@ namespace disposer_module::http_server_component{ } void on_text( - std::uintptr_t identifier, + webservice::ws_identifier identifier, std::string&& text )override{ module_.log( @@ -88,7 +88,7 @@ namespace disposer_module::http_server_component{ } void on_binary( - std::uintptr_t identifier, + webservice::ws_identifier identifier, std::string&& text )override{ module_.log( @@ -104,13 +104,13 @@ namespace disposer_module::http_server_component{ } void send(std::string data){ - std::set< std::uintptr_t > ready_sessions; + std::set< webservice::ws_identifier > ready_identifiers; module_.log( - [this, &ready_sessions](logsys::stdlogb& os){ + [this, &ready_identifiers](logsys::stdlogb& os){ os << "live service " << name << " send binary to sessions("; bool first = true; - for(auto identifier: ready_sessions){ + for(auto identifier: ready_identifiers){ if(!first){ os << ", "; }else{ @@ -119,22 +119,22 @@ namespace disposer_module::http_server_component{ os << identifier; } os << ")"; - }, [this, &data, &ready_sessions]{ + }, [this, &data, &ready_identifiers]{ { std::shared_lock lock(mutex_); for(auto& [identifier, ready]: ready_){ if(!ready) continue; ready = false; - ready_sessions.insert( - ready_sessions.end(), identifier); + ready_identifiers.insert( + ready_identifiers.end(), identifier); } } - send_binary(ready_sessions, std::move(data)); + send_binary(ready_identifiers, std::move(data)); }); } void on_error( - std::uintptr_t identifier, + webservice::ws_identifier identifier, webservice::ws_handler_location location, boost::system::error_code ec )override{ @@ -147,7 +147,7 @@ namespace disposer_module::http_server_component{ } void on_exception( - std::uintptr_t identifier, + webservice::ws_identifier identifier, std::exception_ptr error )noexcept override{ module_.exception_catching_log( @@ -164,7 +164,7 @@ namespace disposer_module::http_server_component{ private: Module module_; std::shared_mutex mutex_; - std::map< std::uintptr_t, std::atomic< bool > > ready_; + std::map< webservice::ws_identifier, std::atomic< bool > > ready_; }; @@ -180,12 +180,10 @@ namespace disposer_module::http_server_component{ ~running_chains(){ shutdown_ = true; - for(;;){ - std::unique_lock lock(mutex_); - if(chains_.empty()){ - break; + while(running_){ + if(server()->poll_one() == 0){ + std::this_thread::yield(); } - server_.run_one(); } } @@ -211,10 +209,13 @@ namespace disposer_module::http_server_component{ send_text(nlohmann::json::object({{"run-chain", chain}})); - if(chains_.size() == 1){ - async([this]{ - run(); - }); + if(chains_.size() == 1 && !shutdown_){ + running_ = true; + boost::asio::post( + this->server()->get_executor(), + [this]{ + run(); + }); } } @@ -224,6 +225,8 @@ namespace disposer_module::http_server_component{ } void run(){ + running_ = false; + if(shutdown_){ return; } @@ -249,10 +252,13 @@ namespace disposer_module::http_server_component{ }); } - if(!chains_.empty()){ - async([this]{ - run(); - }); + if(!chains_.empty() && !shutdown_){ + running_ = true; + boost::asio::post( + this->server()->get_executor(), + [this]{ + run(); + }); } } @@ -260,6 +266,7 @@ namespace disposer_module::http_server_component{ protected: Component component_; webservice::server& server_; + std::atomic< bool > running_; nlohmann::json running_chains_message(){ return nlohmann::json::object({{"running-chains", @@ -282,6 +289,12 @@ namespace disposer_module::http_server_component{ send_text(nlohmann::json::object({{"stop-chain", chain}})); } + void shutdown()noexcept override{ + if(!shutdown_.exchange(true)){ + webservice::basic_json_ws_service< std::string >::shutdown(); + } + } + struct running_chains_data{ running_chains_data( disposer::enabled_chain&& chain, @@ -314,7 +327,7 @@ namespace disposer_module::http_server_component{ private: - void on_open(std::uintptr_t identifier)override{ + void on_open(webservice::ws_identifier identifier)override{ component_.log( [identifier](logsys::stdlogb& os){ os << "control service on_open identifier(" @@ -324,7 +337,7 @@ namespace disposer_module::http_server_component{ }); } - void on_close(std::uintptr_t identifier)override{ + void on_close(webservice::ws_identifier identifier)override{ component_.log( [identifier](logsys::stdlogb& os){ os << "control service on_close identifier(" @@ -333,7 +346,7 @@ namespace disposer_module::http_server_component{ } void on_text( - std::uintptr_t identifier, + webservice::ws_identifier identifier, nlohmann::json&& data )override{ auto success = component_.exception_catching_log( @@ -389,8 +402,9 @@ namespace disposer_module::http_server_component{ }, [this, identifier]{ this->send_text(identifier, nlohmann::json::object({ - {"error", "see server log file session(" - + std::to_string(identifier) + ")"} + {"error", io_tools::make_string( + "see server log file session(", + identifier, ")")} })); this->send_text(this->running_chains_message()); }); @@ -398,7 +412,7 @@ namespace disposer_module::http_server_component{ } void on_binary( - std::uintptr_t identifier, + webservice::ws_identifier identifier, std::string&& text )override{ component_.log( @@ -412,7 +426,7 @@ namespace disposer_module::http_server_component{ } void on_error( - std::uintptr_t identifier, + webservice::ws_identifier identifier, webservice::ws_handler_location location, boost::system::error_code ec )override{ @@ -425,7 +439,7 @@ namespace disposer_module::http_server_component{ } void on_exception( - std::uintptr_t identifier, + webservice::ws_identifier identifier, std::exception_ptr error )noexcept override{ component_.exception_catching_log( @@ -450,12 +464,11 @@ namespace disposer_module::http_server_component{ private: void on_error( - webservice::ws_server_session* session, + webservice::ws_identifier identifier, std::string const& resource, webservice::ws_handler_location location, boost::system::error_code ec )override{ - auto const identifier = reinterpret_cast< std::intptr_t >(session); component_.log( [identifier, &resource, location, ec](logsys::stdlogb& os){ os << "service handler identifier(" << identifier @@ -466,11 +479,10 @@ namespace disposer_module::http_server_component{ } void on_exception( - webservice::ws_server_session* session, + webservice::ws_identifier identifier, std::string const& resource, std::exception_ptr error )noexcept override{ - auto const identifier = reinterpret_cast< std::intptr_t >(session); component_.exception_catching_log( [identifier, &resource](logsys::stdlogb& os){ os << "service handler identifier(" << identifier @@ -575,6 +587,12 @@ namespace disposer_module::http_server_component{ server(server&&) = default; + void shutdown(){ + server_->shutdown(); + server_.reset(); + } + + template < typename Module > auto add_live_service(Module module){ std::string service_name = module("service_name"_param); @@ -637,27 +655,30 @@ namespace disposer_module::http_server_component{ ) : component_(component) , ws_handler_(*ws_handler.get()) - , server_( - std::make_unique< file_request_handler< Component > >( - component), - std::move(ws_handler), - std::make_unique< error_handler< Component > >(component), - boost::asio::ip::make_address(component("address"_param)), - component("port"_param), - component("thread_count"_param)) + , server_(std::make_unique< webservice::server >( + std::make_unique< file_request_handler< Component > >( + component), + std::move(ws_handler), + std::make_unique< error_handler< Component > >(component), + boost::asio::ip::make_address(component("address"_param)), + component("port"_param), + component("thread_count"_param) + )) { + ws_handler_.set_ping_time( + std::chrono::milliseconds(component("timeout_in_ms"_param))); component_.log( [](logsys::stdlogb& os){ os << "add WebSocket control service on root(/)"; }, [this]{ ws_handler_.add_service("/", std::make_unique< - control_service< Component > >(component_, server_)); + control_service< Component > >(component_, *server_)); }); } Component component_; ws_service_handler< Component >& ws_handler_; - webservice::server server_; + std::unique_ptr< webservice::server > server_; }; @@ -690,7 +711,10 @@ namespace disposer_module::http_server_component{ })), make("min_interval_in_ms"_param, free_type_c< std::uint32_t >, "time between exec calls on live chains", - default_value(50)) + default_value(50)), + make("timeout_in_ms"_param, free_type_c< std::uint32_t >, + "time out for websocket connections", + default_value(15000)) ), component_init_fn([](auto component){ return server< decltype(component) >(component);