Skip to content

Commit

Permalink
implemented server via webservice library
Browse files Browse the repository at this point in the history
  • Loading branch information
bebuch committed Apr 5, 2018
1 parent cba4a87 commit 07401b3
Showing 1 changed file with 74 additions and 50 deletions.
124 changes: 74 additions & 50 deletions src/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace disposer_module::http_server_component{
: name(module("service_name"_param))
, module_(module) {}

void on_open(std::uintptr_t identifier)override{
void on_open(webservice::ws_identifier identifier)override{
module_.log(
[this, identifier](logsys::stdlogb& os){
os << "live service " << name << " on_open identifier("
Expand All @@ -57,7 +57,7 @@ namespace disposer_module::http_server_component{
});
}

void on_close(std::uintptr_t identifier)override{
void on_close(webservice::ws_identifier identifier)override{
module_.log(
[this, identifier](logsys::stdlogb& os){
os << "live service " << name << " on_close identifier("
Expand All @@ -69,7 +69,7 @@ namespace disposer_module::http_server_component{
}

void on_text(
std::uintptr_t identifier,
webservice::ws_identifier identifier,
std::string&& text
)override{
module_.log(
Expand All @@ -88,7 +88,7 @@ namespace disposer_module::http_server_component{
}

void on_binary(
std::uintptr_t identifier,
webservice::ws_identifier identifier,
std::string&& text
)override{
module_.log(
Expand All @@ -104,13 +104,13 @@ namespace disposer_module::http_server_component{
}

void send(std::string data){
std::set< std::uintptr_t > ready_sessions;
std::set< webservice::ws_identifier > ready_identifiers;
module_.log(
[this, &ready_sessions](logsys::stdlogb& os){
[this, &ready_identifiers](logsys::stdlogb& os){
os << "live service " << name
<< " send binary to sessions(";
bool first = true;
for(auto identifier: ready_sessions){
for(auto identifier: ready_identifiers){
if(!first){
os << ", ";
}else{
Expand All @@ -119,22 +119,22 @@ namespace disposer_module::http_server_component{
os << identifier;
}
os << ")";
}, [this, &data, &ready_sessions]{
}, [this, &data, &ready_identifiers]{
{
std::shared_lock lock(mutex_);
for(auto& [identifier, ready]: ready_){
if(!ready) continue;
ready = false;
ready_sessions.insert(
ready_sessions.end(), identifier);
ready_identifiers.insert(
ready_identifiers.end(), identifier);
}
}
send_binary(ready_sessions, std::move(data));
send_binary(ready_identifiers, std::move(data));
});
}

void on_error(
std::uintptr_t identifier,
webservice::ws_identifier identifier,
webservice::ws_handler_location location,
boost::system::error_code ec
)override{
Expand All @@ -147,7 +147,7 @@ namespace disposer_module::http_server_component{
}

void on_exception(
std::uintptr_t identifier,
webservice::ws_identifier identifier,
std::exception_ptr error
)noexcept override{
module_.exception_catching_log(
Expand All @@ -164,7 +164,7 @@ namespace disposer_module::http_server_component{
private:
Module module_;
std::shared_mutex mutex_;
std::map< std::uintptr_t, std::atomic< bool > > ready_;
std::map< webservice::ws_identifier, std::atomic< bool > > ready_;
};


Expand All @@ -180,12 +180,10 @@ namespace disposer_module::http_server_component{

~running_chains(){
shutdown_ = true;
for(;;){
std::unique_lock lock(mutex_);
if(chains_.empty()){
break;
while(running_){
if(server()->poll_one() == 0){
std::this_thread::yield();
}
server_.run_one();
}
}

Expand All @@ -211,10 +209,13 @@ namespace disposer_module::http_server_component{

send_text(nlohmann::json::object({{"run-chain", chain}}));

if(chains_.size() == 1){
async([this]{
run();
});
if(chains_.size() == 1 && !shutdown_){
running_ = true;
boost::asio::post(
this->server()->get_executor(),
[this]{
run();
});
}
}

Expand All @@ -224,6 +225,8 @@ namespace disposer_module::http_server_component{
}

void run(){
running_ = false;

if(shutdown_){
return;
}
Expand All @@ -249,17 +252,21 @@ namespace disposer_module::http_server_component{
});
}

if(!chains_.empty()){
async([this]{
run();
});
if(!chains_.empty() && !shutdown_){
running_ = true;
boost::asio::post(
this->server()->get_executor(),
[this]{
run();
});
}
}


protected:
Component component_;
webservice::server& server_;
std::atomic< bool > running_;

nlohmann::json running_chains_message(){
return nlohmann::json::object({{"running-chains",
Expand All @@ -282,6 +289,12 @@ namespace disposer_module::http_server_component{
send_text(nlohmann::json::object({{"stop-chain", chain}}));
}

void shutdown()noexcept override{
if(!shutdown_.exchange(true)){
webservice::basic_json_ws_service< std::string >::shutdown();
}
}

struct running_chains_data{
running_chains_data(
disposer::enabled_chain&& chain,
Expand Down Expand Up @@ -314,7 +327,7 @@ namespace disposer_module::http_server_component{


private:
void on_open(std::uintptr_t identifier)override{
void on_open(webservice::ws_identifier identifier)override{
component_.log(
[identifier](logsys::stdlogb& os){
os << "control service on_open identifier("
Expand All @@ -324,7 +337,7 @@ namespace disposer_module::http_server_component{
});
}

void on_close(std::uintptr_t identifier)override{
void on_close(webservice::ws_identifier identifier)override{
component_.log(
[identifier](logsys::stdlogb& os){
os << "control service on_close identifier("
Expand All @@ -333,7 +346,7 @@ namespace disposer_module::http_server_component{
}

void on_text(
std::uintptr_t identifier,
webservice::ws_identifier identifier,
nlohmann::json&& data
)override{
auto success = component_.exception_catching_log(
Expand Down Expand Up @@ -389,16 +402,17 @@ namespace disposer_module::http_server_component{
},
[this, identifier]{
this->send_text(identifier, nlohmann::json::object({
{"error", "see server log file session("
+ std::to_string(identifier) + ")"}
{"error", io_tools::make_string(
"see server log file session(",
identifier, ")")}
}));
this->send_text(this->running_chains_message());
});
}
}

void on_binary(
std::uintptr_t identifier,
webservice::ws_identifier identifier,
std::string&& text
)override{
component_.log(
Expand All @@ -412,7 +426,7 @@ namespace disposer_module::http_server_component{
}

void on_error(
std::uintptr_t identifier,
webservice::ws_identifier identifier,
webservice::ws_handler_location location,
boost::system::error_code ec
)override{
Expand All @@ -425,7 +439,7 @@ namespace disposer_module::http_server_component{
}

void on_exception(
std::uintptr_t identifier,
webservice::ws_identifier identifier,
std::exception_ptr error
)noexcept override{
component_.exception_catching_log(
Expand All @@ -450,12 +464,11 @@ namespace disposer_module::http_server_component{

private:
void on_error(
webservice::ws_server_session* session,
webservice::ws_identifier identifier,
std::string const& resource,
webservice::ws_handler_location location,
boost::system::error_code ec
)override{
auto const identifier = reinterpret_cast< std::intptr_t >(session);
component_.log(
[identifier, &resource, location, ec](logsys::stdlogb& os){
os << "service handler identifier(" << identifier
Expand All @@ -466,11 +479,10 @@ namespace disposer_module::http_server_component{
}

void on_exception(
webservice::ws_server_session* session,
webservice::ws_identifier identifier,
std::string const& resource,
std::exception_ptr error
)noexcept override{
auto const identifier = reinterpret_cast< std::intptr_t >(session);
component_.exception_catching_log(
[identifier, &resource](logsys::stdlogb& os){
os << "service handler identifier(" << identifier
Expand Down Expand Up @@ -575,6 +587,12 @@ namespace disposer_module::http_server_component{
server(server&&) = default;


void shutdown(){
server_->shutdown();
server_.reset();
}


template < typename Module >
auto add_live_service(Module module){
std::string service_name = module("service_name"_param);
Expand Down Expand Up @@ -637,27 +655,30 @@ namespace disposer_module::http_server_component{
)
: component_(component)
, ws_handler_(*ws_handler.get())
, server_(
std::make_unique< file_request_handler< Component > >(
component),
std::move(ws_handler),
std::make_unique< error_handler< Component > >(component),
boost::asio::ip::make_address(component("address"_param)),
component("port"_param),
component("thread_count"_param))
, server_(std::make_unique< webservice::server >(
std::make_unique< file_request_handler< Component > >(
component),
std::move(ws_handler),
std::make_unique< error_handler< Component > >(component),
boost::asio::ip::make_address(component("address"_param)),
component("port"_param),
component("thread_count"_param)
))
{
ws_handler_.set_ping_time(
std::chrono::milliseconds(component("timeout_in_ms"_param)));
component_.log(
[](logsys::stdlogb& os){
os << "add WebSocket control service on root(/)";
}, [this]{
ws_handler_.add_service("/", std::make_unique<
control_service< Component > >(component_, server_));
control_service< Component > >(component_, *server_));
});
}

Component component_;
ws_service_handler< Component >& ws_handler_;
webservice::server server_;
std::unique_ptr< webservice::server > server_;
};


Expand Down Expand Up @@ -690,7 +711,10 @@ namespace disposer_module::http_server_component{
})),
make("min_interval_in_ms"_param, free_type_c< std::uint32_t >,
"time between exec calls on live chains",
default_value(50))
default_value(50)),
make("timeout_in_ms"_param, free_type_c< std::uint32_t >,
"time out for websocket connections",
default_value(15000))
),
component_init_fn([](auto component){
return server< decltype(component) >(component);
Expand Down

0 comments on commit 07401b3

Please sign in to comment.