From 7a16b8f97867f81080dad1faa60df85f39684ab7 Mon Sep 17 00:00:00 2001 From: qinguoyi <1532979219@qq.com> Date: Mon, 11 May 2020 21:28:41 +0800 Subject: [PATCH] :sparkles: && :bug: add reactor && fix bug --- README.md | 29 +++++++++---- config.cpp | 10 ++++- config.h | 3 ++ http/http_conn.cpp | 24 ++++++----- http/http_conn.h | 13 ++++-- main.cpp | 9 ++-- makefile | 2 +- threadpool/threadpool.h | 63 +++++++++++++++++++++++---- timer/lst_timer.cpp | 1 + timer/lst_timer.h | 21 ++++++++- webserver.cpp | 96 ++++++++++++++++++++++++++++++++--------- webserver.h | 8 +--- 12 files changed, 217 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index b82ac646..fdcf2001 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ TinyWebServer =============== Linux下C++轻量级Web服务器,助力初学者快速实践网络编程,搭建属于自己的服务器. -* 使用**线程池 + epoll(ET和LT均实现) + 模拟Proactor模式**的并发模型 +* 使用**线程池 + epoll(ET和LT均实现) + 反应堆(Reactor和Proactor均实现)**的并发模型 * 使用**状态机**解析HTTP请求报文,支持解析**GET和POST**请求 * 访问服务器数据库实现web端用户**注册、登录**功能,可以请求服务器**图片和视频文件** * 实现**同步/异步日志系统**,记录服务器运行状态 @@ -59,13 +59,21 @@ Demo演示 ------------- 在关闭日志后,使用Webbench对服务器进行压力测试,在ET非阻塞和LT阻塞模式下均可实现上万的并发连接. -> * ET非阻塞,57838 QPS +> * Proactor,LT阻塞,84016 QPS -
+
-> * LT阻塞,64525 QPS +> * Proactor,ET非阻塞,83419 QPS -
+
+ +> * Reactor,LT阻塞,60218 QPS + +
+ +> * Reactor,ET非阻塞,58138 QPS + +
> * 并发连接总数:10500 > * 访问服务器时间:5s @@ -89,6 +97,7 @@ Demo演示 - [x] main函数封装重构 - [x] 新增命令行日志开关,关闭日志后更新压力测试结果 - [x] 改进编译方式,只配置一次SQL信息即可 +- [x] 新增Reactor模式,并完成压力测试 目前有两个版本,版本间的代码结构有较大改动,文档和代码运行方法也不一致. 重构版本更简洁,原始版本(raw_version)更大保留游双代码的原汁原味,从原始版本更容易入手. @@ -159,7 +168,7 @@ Demo演示 ------ ```C++ -./server [-p port] [-v SQLVerify] [-l LOGWrite] [-m TRIGMode] [-o OPT_LINGER] [-s sql_num] [-t thread_num] [-c close_log] +./server [-p port] [-v SQLVerify] [-l LOGWrite] [-m TRIGMode] [-o OPT_LINGER] [-s sql_num] [-t thread_num] [-c close_log] [-a actor_model] ``` 温馨提示:以上参数不是非必须,不用全部使用,根据个人情况搭配选用即可. @@ -186,11 +195,14 @@ Demo演示 * -c,关闭日志,默认打开 * 0,打开日志 * 1,关闭日志 +* -a,选择反应堆模型,默认Proactor + * 0,Proactor模型 + * 1,Reactor模型 测试示例命令与含义 ```C++ -./server -p 9007 -v 1 -l 1 -m 0 -o 1 -s 10 -t 10 -c 1 +./server -p 9007 -v 1 -l 1 -m 0 -o 1 -s 10 -t 10 -c 1 -a 1 ``` - [x] 端口9007 @@ -201,6 +213,7 @@ Demo演示 - [x] 数据库连接池内有10条连接 - [x] 线程池内有10条线程 - [x] 关闭日志 +- [x] Reactor反应堆模型 庖丁解牛 ------------ @@ -222,4 +235,4 @@ Demo演示 ------------ Linux高性能服务器编程,游双著. -感谢以下朋友的PR和帮助: [RownH](https://github.com/RownH),[ZWiley](https://github.com/ZWiley),[zjuHong](https://github.com/zjuHong),[mamil](https://github.com/mamil),[byfate](https://github.com/byfate). +感谢以下朋友的PR和帮助: [RownH](https://github.com/RownH),[ZWiley](https://github.com/ZWiley),[zjuHong](https://github.com/zjuHong),[mamil](https://github.com/mamil),[byfate](https://github.com/byfate),[MaJun827]https://github.com/MaJun827). diff --git a/config.cpp b/config.cpp index 1f5241fa..03a0d13f 100644 --- a/config.cpp +++ b/config.cpp @@ -24,11 +24,14 @@ Config::Config(){ //关闭日志,默认不关闭 close_log = 0; + + //并发模型,默认是proactor + actor_model = 0; } void Config::parse_arg(int argc, char*argv[]){ int opt; - const char *str = "p:v:l:m:o:s:t:c:"; + const char *str = "p:v:l:m:o:s:t:c:a:"; while ((opt = getopt(argc, argv, str)) != -1) { switch (opt) @@ -73,6 +76,11 @@ void Config::parse_arg(int argc, char*argv[]){ close_log = atoi(optarg); break; } + case 'a': + { + actor_model = atoi(optarg); + break; + } default: break; } diff --git a/config.h b/config.h index 92856727..701cb3e3 100644 --- a/config.h +++ b/config.h @@ -36,6 +36,9 @@ class Config //是否关闭日志 int close_log; + + //并发模型选择 + int actor_model; }; #endif \ No newline at end of file diff --git a/http/http_conn.cpp b/http/http_conn.cpp index 0b82ed37..f6f60488 100644 --- a/http/http_conn.cpp +++ b/http/http_conn.cpp @@ -1,5 +1,5 @@ #include "http_conn.h" -#include "../log/log.h" + #include #include @@ -15,8 +15,9 @@ const char *error_500_title = "Internal Error"; const char *error_500_form = "There was an unusual problem serving the request file.\n"; locker m_lock; +map users; -void http_conn::initmysql_result(connection_pool *connPool, map &users) +void http_conn::initmysql_result(connection_pool *connPool) { //先从连接池中取一个连接 MYSQL *mysql = NULL; @@ -46,7 +47,7 @@ void http_conn::initmysql_result(connection_pool *connPool, map } } -void http_conn::initresultFile(connection_pool *connPool, map &users) +void http_conn::initresultFile(connection_pool *connPool) { ofstream out("./CGImysql/id_passwd.txt"); //先从连接池中取一个连接 @@ -135,6 +136,7 @@ void http_conn::close_conn(bool real_close) { if (real_close && (m_sockfd != -1)) { + printf("close %d\n", m_sockfd); removefd(m_epollfd, m_sockfd); m_sockfd = -1; m_user_count--; @@ -142,7 +144,7 @@ void http_conn::close_conn(bool real_close) } //初始化连接,外部调用初始化套接字地址 -void http_conn::init(int sockfd, const sockaddr_in &addr, char *root, map &users, int SQLVerify, int TRIGMode, +void http_conn::init(int sockfd, const sockaddr_in &addr, char *root, int SQLVerify, int TRIGMode, int close_log, string user, string passwd, string sqlname) { m_sockfd = sockfd; @@ -153,7 +155,6 @@ void http_conn::init(int sockfd, const sockaddr_in &addr, char *root, map(name, password)); + users.insert(pair(name, password)); m_lock.unlock(); if (!res) @@ -466,7 +470,7 @@ http_conn::HTTP_CODE http_conn::do_request() //若浏览器端输入的用户名和密码在表中可以查找到,返回1,否则返回0 else if (*(p + 1) == '2') { - if (m_users.find(name) != m_users.end() && m_users[name] == password) + if (users.find(name) != users.end() && users[name] == password) strcpy(m_url, "/welcome.html"); else strcpy(m_url, "/logError.html"); @@ -487,11 +491,11 @@ http_conn::HTTP_CODE http_conn::do_request() strcat(sql_insert, password); strcat(sql_insert, "')"); - if (m_users.find(name) == m_users.end()) + if (users.find(name) == users.end()) { m_lock.lock(); int res = mysql_query(mysql, sql_insert); - m_users.insert(pair(name, password)); + users.insert(pair(name, password)); m_lock.unlock(); if (!res) diff --git a/http/http_conn.h b/http/http_conn.h index 95bbe68a..89af3b64 100644 --- a/http/http_conn.h +++ b/http/http_conn.h @@ -20,8 +20,11 @@ #include #include #include + #include "../lock/locker.h" #include "../CGImysql/sql_connection_pool.h" +#include "../timer/lst_timer.h" +#include "../log/log.h" class http_conn { @@ -70,7 +73,7 @@ class http_conn ~http_conn() {} public: - void init(int sockfd, const sockaddr_in &addr, char *, map &, int, int, int, string user, string passwd, string sqlname); + void init(int sockfd, const sockaddr_in &addr, char *, int, int, int, string user, string passwd, string sqlname); void close_conn(bool real_close = true); void process(); bool read_once(); @@ -79,8 +82,11 @@ class http_conn { return &m_address; } - void initmysql_result(connection_pool *connPool, map &); - void initresultFile(connection_pool *connPool, map &); + void initmysql_result(connection_pool *connPool); + void initresultFile(connection_pool *connPool); + int timer_flag; + int improv; + private: void init(); @@ -106,6 +112,7 @@ class http_conn static int m_epollfd; static int m_user_count; MYSQL *mysql; + int m_state; //读为0, 写为1 private: int m_sockfd; diff --git a/main.cpp b/main.cpp index b9b207aa..566c41ac 100644 --- a/main.cpp +++ b/main.cpp @@ -5,7 +5,7 @@ int main(int argc, char *argv[]) //需要修改的数据库信息,登录名,密码,库名 string user = "root"; string passwd = "root"; - string databasename = "yourdb"; + string databasename = "qgydb"; //命令行解析 Config config; @@ -15,14 +15,15 @@ int main(int argc, char *argv[]) //初始化 server.init(config.PORT, user, passwd, databasename, config.LOGWrite, config.SQLVerify, - config.OPT_LINGER, config.TRIGMode, config.sql_num, config.thread_num, config.close_log); + config.OPT_LINGER, config.TRIGMode, config.sql_num, config.thread_num, + config.close_log, config.actor_model); //日志 server.log_write(); - + //数据库 server.sql_pool(); - + //线程池 server.thread_pool(); diff --git a/makefile b/makefile index a90e5d10..2dbbd366 100644 --- a/makefile +++ b/makefile @@ -1,5 +1,5 @@ server: main.cpp ./timer/lst_timer.h ./timer/lst_timer.cpp ./threadpool/threadpool.h ./http/http_conn.cpp ./http/http_conn.h ./lock/locker.h ./log/log.cpp ./log/log.h ./log/block_queue.h ./CGImysql/sql_connection_pool.cpp ./CGImysql/sql_connection_pool.h webserver.h webserver.cpp config.h config.cpp - g++ -o server main.cpp ./timer/lst_timer.h ./timer/lst_timer.cpp ./threadpool/threadpool.h ./http/http_conn.cpp ./http/http_conn.h ./lock/locker.h ./log/log.cpp ./log/log.h ./CGImysql/sql_connection_pool.cpp ./CGImysql/sql_connection_pool.h webserver.h webserver.cpp config.h config.cpp -lpthread -lmysqlclient + g++ -o server main.cpp ./timer/lst_timer.h ./timer/lst_timer.cpp ./threadpool/threadpool.h ./http/http_conn.cpp ./http/http_conn.h ./lock/locker.h ./log/log.cpp ./log/log.h ./CGImysql/sql_connection_pool.cpp ./CGImysql/sql_connection_pool.h webserver.h webserver.cpp config.h config.cpp -lpthread -lmysqlclient -g CGISQL.cgi:./CGImysql/sign.cpp ./CGImysql/sql_connection_pool.cpp ./CGImysql/sql_connection_pool.h g++ -o ./root/CGISQL.cgi ./CGImysql/sign.cpp ./CGImysql/sql_connection_pool.cpp ./CGImysql/sql_connection_pool.h -lmysqlclient -lpthread diff --git a/threadpool/threadpool.h b/threadpool/threadpool.h index 6394a76b..b2848a61 100644 --- a/threadpool/threadpool.h +++ b/threadpool/threadpool.h @@ -13,9 +13,10 @@ class threadpool { public: /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/ - threadpool(connection_pool *connPool, int thread_number = 8, int max_request = 10000); + threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000); ~threadpool(); - bool append(T *request); + bool append(T *request, int state); + bool append_p(T *request); private: /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/ @@ -30,9 +31,10 @@ class threadpool locker m_queuelocker; //保护请求队列的互斥锁 sem m_queuestat; //是否有任务需要处理 connection_pool *m_connPool; //数据库 + int m_actor_model; //模型切换 }; template -threadpool::threadpool( connection_pool *connPool, int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool) +threadpool::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool) { if (thread_number <= 0 || max_requests <= 0) throw std::exception(); @@ -59,7 +61,22 @@ threadpool::~threadpool() delete[] m_threads; } template -bool threadpool::append(T *request) +bool threadpool::append(T *request, int state) +{ + m_queuelocker.lock(); + if (m_workqueue.size() >= m_max_requests) + { + m_queuelocker.unlock(); + return false; + } + request->m_state = state; + m_workqueue.push_back(request); + m_queuelocker.unlock(); + m_queuestat.post(); + return true; +} +template +bool threadpool::append_p(T *request) { m_queuelocker.lock(); if (m_workqueue.size() >= m_max_requests) @@ -96,10 +113,40 @@ void threadpool::run() m_queuelocker.unlock(); if (!request) continue; - - connectionRAII mysqlcon(&request->mysql, m_connPool); - - request->process(); + if (1 == m_actor_model) + { + if (0 == request->m_state) + { + if (request->read_once()) + { + request->improv = 1; + connectionRAII mysqlcon(&request->mysql, m_connPool); + request->process(); + } + else + { + request->improv = 1; + request->timer_flag = 1; + } + } + else + { + if (request->write()) + { + request->improv = 1; + } + else + { + request->improv = 1; + request->timer_flag = 1; + } + } + } + else + { + connectionRAII mysqlcon(&request->mysql, m_connPool); + request->process(); + } } } #endif diff --git a/timer/lst_timer.cpp b/timer/lst_timer.cpp index 3bca31e1..9a6be87f 100644 --- a/timer/lst_timer.cpp +++ b/timer/lst_timer.cpp @@ -1,4 +1,5 @@ #include "lst_timer.h" +#include "../http/http_conn.h" sort_timer_lst::sort_timer_lst() { diff --git a/timer/lst_timer.h b/timer/lst_timer.h index 9e0939d5..45b906e3 100644 --- a/timer/lst_timer.h +++ b/timer/lst_timer.h @@ -1,8 +1,27 @@ #ifndef LST_TIMER #define LST_TIMER +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include -#include "../http/http_conn.h" #include "../log/log.h" class util_timer; diff --git a/webserver.cpp b/webserver.cpp index b75add9e..5f518527 100644 --- a/webserver.cpp +++ b/webserver.cpp @@ -29,7 +29,7 @@ WebServer::~WebServer() } void WebServer::init(int port, string user, string passWord, string databaseName, int log_write, int sqlverify, - int opt_linger, int trigmode, int sql_num, int thread_num, int close_log) + int opt_linger, int trigmode, int sql_num, int thread_num, int close_log, int actor_model) { m_port = port; m_user = user; @@ -42,6 +42,7 @@ void WebServer::init(int port, string user, string passWord, string databaseName m_OPT_LINGER = opt_linger; m_TRIGMode = trigmode; m_close_log = close_log; + m_actormodel = actor_model; } void WebServer::log_write() @@ -64,15 +65,15 @@ void WebServer::sql_pool() //初始化数据库读取表 if (0 == m_SQLVerify) - users->initmysql_result(m_connPool, m_users_passwd); + users->initmysql_result(m_connPool); else if (1 == m_SQLVerify) - users->initresultFile(m_connPool, m_users_passwd); + users->initresultFile(m_connPool); } void WebServer::thread_pool() { //线程池 - m_pool = new threadpool(m_connPool, m_thread_num); + m_pool = new threadpool(m_actormodel, m_connPool, m_thread_num); } void WebServer::eventListen() @@ -135,7 +136,7 @@ void WebServer::eventListen() void WebServer::timer(int connfd, struct sockaddr_in client_address) { - users[connfd].init(connfd, client_address, m_root, m_users_passwd, m_SQLVerify, m_TRIGMode, m_close_log, m_user, m_passWord, m_databaseName); + users[connfd].init(connfd, client_address, m_root, m_SQLVerify, m_TRIGMode, m_close_log, m_user, m_passWord, m_databaseName); //初始化client_data数据 //创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中 @@ -257,49 +258,104 @@ bool WebServer::dealwithsignal(bool timeout, bool &stop_server) void WebServer::dealwithread(int sockfd) { util_timer *timer = users_timer[sockfd].timer; - if (users[sockfd].read_once()) - { - LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); - Log::get_instance()->flush(); + //reactor + if (1 == m_actormodel) + { + if (timer) + { + adjust_timer(timer); + } //若监测到读事件,将该事件放入请求队列 - m_pool->append(users + sockfd); + m_pool->append(users + sockfd, 0); - if (timer) + while (true) { - adjust_timer(timer); + if (1 == users[sockfd].improv) + { + if (1 == users[sockfd].timer_flag) + { + deal_timer(timer, sockfd); + users[sockfd].timer_flag = 0; + } + users[sockfd].improv = 0; + break; + } } } else { - deal_timer(timer, sockfd); + //proactor + if (users[sockfd].read_once()) + { + LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); + Log::get_instance()->flush(); + + //若监测到读事件,将该事件放入请求队列 + m_pool->append_p(users + sockfd); + + if (timer) + { + adjust_timer(timer); + } + } + else + { + deal_timer(timer, sockfd); + } } } void WebServer::dealwithwrite(int sockfd) { util_timer *timer = users_timer[sockfd].timer; - if (users[sockfd].write()) + //reactor + if (1 == m_actormodel) { - - LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); - Log::get_instance()->flush(); - if (timer) { adjust_timer(timer); } + + m_pool->append(users + sockfd, 1); + + while (true) + { + if (1 == users[sockfd].improv) + { + if (1 == users[sockfd].timer_flag) + { + deal_timer(timer, sockfd); + users[sockfd].timer_flag = 0; + } + users[sockfd].improv = 0; + break; + } + } } else { - deal_timer(timer, sockfd); + //proactor + if (users[sockfd].write()) + { + LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); + Log::get_instance()->flush(); + + if (timer) + { + adjust_timer(timer); + } + } + else + { + deal_timer(timer, sockfd); + } } } void WebServer::eventLoop() { - bool timeout = false; bool stop_server = false; diff --git a/webserver.h b/webserver.h index 44f79bbf..5b26d399 100644 --- a/webserver.h +++ b/webserver.h @@ -12,12 +12,8 @@ #include #include -#include "./lock/locker.h" #include "./threadpool/threadpool.h" -#include "./timer/lst_timer.h" #include "./http/http_conn.h" -#include "./log/log.h" -#include "./CGImysql/sql_connection_pool.h" const int MAX_FD = 65536; //最大文件描述符 const int MAX_EVENT_NUMBER = 10000; //最大事件数 @@ -31,7 +27,7 @@ class WebServer void init(int port , string user, string passWord, string databaseName, int log_write , int sqlverify, int opt_linger, int trigmode, int sql_num, - int thread_num, int close_log); + int thread_num, int close_log, int actor_model); void thread_pool(); void sql_pool(); @@ -53,6 +49,7 @@ class WebServer char *m_root; int m_log_write; int m_close_log; + int m_actormodel; int m_pipefd[2]; int m_epollfd; @@ -65,7 +62,6 @@ class WebServer string m_databaseName; //使用数据库名 int m_sql_num; int m_SQLVerify; - map m_users_passwd; //线程池相关 threadpool *m_pool;