1
+ #ifndef PROCESSPOOL_H
2
+ #define PROCESSPOOL_H
3
+
4
+ #include < sys/types.h>
5
+ #include < sys/socket.h>
6
+ #include < netinet/in.h>
7
+ #include < arpa/inet.h>
8
+ #include < assert.h>
9
+ #include < stdio.h>
10
+ #include < unistd.h>
11
+ #include < errno.h>
12
+ #include < string.h>
13
+ #include < fcntl.h>
14
+ #include < stdlib.h>
15
+ #include < sys/epoll.h>
16
+ #include < signal.h>
17
+ #include < sys/wait.h>
18
+ #include < sys/stat.h>
19
+
20
+ /* 描述一个子进程的类,m_pid是目标子进程的PID,m_pipefd是父进程和子进程通信用的管道 */
21
+ class process {
22
+ public:
23
+ pid_t m_pid;
24
+ int m_pipefd[2 ];
25
+
26
+ public:
27
+ process () : m_pid(-1 ){}
28
+ };
29
+
30
+ /* 进程池类,将它定义为模板类是为了代码复用。其模板参数是处理逻辑任务的类 */
31
+ template < typename T >
32
+ class processpool {
33
+ private:
34
+ /* 将构造函数定义为私有化的,因此我们只能通过后面的create静态函数来创建processpool实例 */
35
+ processpool ( int listenfd, int process_number = 8 );
36
+ void setup_sig_pipe ();
37
+ void run_parent ();
38
+ void run_child ();
39
+ public:
40
+ /* 单例模式,保证程序最多创建一个processpool实例,这是程序正确处理信号的必要条件 */
41
+ static processpool< T >* create ( int listenfd, int process_number = 8 ){
42
+ if ( !m_instance ){
43
+ m_instance = new processpool< T >( listenfd, process_number );
44
+ }
45
+ return m_instance;
46
+ }
47
+ ~processpool (){
48
+ delete [] m_sub_process;
49
+ }
50
+ /* 启动进程池 */
51
+ void run ();
52
+ private:
53
+ /* 进程池允许的最大进程数量 */
54
+ static const int MAX_PROCESS_NUMBER = 16 ;
55
+ /* 每个子进程最多能处理的客户数量 */
56
+ static const int USER_PER_PROCESS = 65536 ;
57
+ /* epoll最多能处理的事件数 */
58
+ static const int MAX_EVENT_NUMBER = 10000 ;
59
+ /* 进程池中的进程总数 */
60
+ int m_process_number;
61
+ /* 子进程在池中的序号,从0开始 */
62
+ int m_idx;
63
+ /* 每个进程都有一个epoll内核事件表,用m_epollfd标识 */
64
+ int m_epollfd;
65
+ /* 监听socket */
66
+ int m_listenfd;
67
+ /* 子进程通过m_stop来决定是否停止允许 */
68
+ int m_stop;
69
+ /* 保存所有子进程的描述信息 */
70
+ process* m_sub_process;
71
+ /* 进程池静态实例 */
72
+ static processpool< T >* m_instance;
73
+ };
74
+ template < typename T >
75
+ processpool< T >* processpool< T >::m_instance = NULL ;
76
+
77
+ /* 用于处理信号的管道,以实现统一事件源。后面称之为信号管道 */
78
+ static int sig_pipefd[2 ];
79
+
80
+ static int setnonblocking ( int fd ){
81
+ int old_option = fcntl ( fd,F_GETFL );
82
+ int new_option = old_option | O_NONBLOCK;
83
+ fcntl ( fd, F_SETFL, new_option );
84
+ return old_option;
85
+ }
86
+
87
+
88
+ static void addfd ( int epollfd, int fd ){
89
+ epoll_event event;
90
+ event.data .fd = fd;
91
+ event.events = EPOLLIN | EPOLLET;
92
+ epoll_ctl ( epollfd, EPOLL_CTL_ADD, fd, &event );
93
+ setnonblocking (fd);
94
+ }
95
+
96
+ /* 从epollfd标识的epoll内核事件表中删除fd上的所有注册事件 */
97
+ static void removefd ( int epollfd, int fd ){
98
+ epoll_ctl ( epollfd, EPOLL_CTL_DEL, fd, 0 );
99
+ close ( fd );
100
+ }
101
+
102
+
103
+ static void sig_handler ( int sig ){
104
+ int save_errno = errno;
105
+ int msg = sig;
106
+ send ( sig_pipefd[1 ], (char * )&msg, 1 , 0 );
107
+ errno = save_errno;
108
+ }
109
+
110
+
111
+ static void addsig ( int sig, void ( handler )(int ), bool restart = true ){
112
+ struct sigaction sa;
113
+ memset ( &sa, ' \0 ' , sizeof ( sa ) );
114
+ sa.sa_handler = handler;
115
+ if ( restart ){
116
+ sa.sa_flags |= SA_RESTART;
117
+ }
118
+ sigfillset ( &sa.sa_mask );
119
+ assert ( sigaction ( sig, &sa, NULL ) != -1 );
120
+ }
121
+
122
+ /* 进程池构造函数,参数listenfd是监听socket,它必须在创建进程池之前被创建,否则子进程无法直接引用它,参数process_number指定进程池中子进程的数量 */
123
+ template < typename T >
124
+ processpool< T >::processpool( int listenfd, int process_number ) : m_listenfd( listenfd ), m_process_number( process_number ), m_idx( -1 ), m_stop( false ){
125
+ assert ( ( process_number > 0 ) && ( process_number <= MAX_PROCESS_NUMBER ) );
126
+
127
+ m_sub_process = new process[ process_number ];
128
+ assert ( m_sub_process );
129
+
130
+ /* 创建process_number个子进程,并建立它们和父进程之间的管道 */
131
+ for ( int i = 0 ; i < process_number; ++i ){
132
+ int ret = socketpair ( PF_UNIX, SOCK_STREAM, 0 , m_sub_process[i].m_pipefd );
133
+ assert ( ret == 0 );
134
+
135
+ m_sub_process[i].m_pid = fork ();
136
+ assert ( m_sub_process[i].m_pid >= 0 );
137
+ if ( m_sub_process[i].m_pid > 0 ){
138
+ close ( m_sub_process[i].m_pipefd [1 ] );
139
+ continue ;
140
+ }else {
141
+ close ( m_sub_process[i].m_pipefd [0 ] );
142
+ m_idx = i;
143
+ break ;
144
+ }
145
+ }
146
+ }
147
+
148
+ /* 统一事件源 */
149
+ template < typename T >
150
+ void processpool< T >::setup_sig_pipe(){
151
+ /* 创建epoll事件监听表和信号管道 */
152
+ m_epollfd = epoll_create ( 5 );
153
+ assert ( m_epollfd != -1 );
154
+
155
+ int ret = socketpair ( PF_UNIX, SOCK_STREAM, 0 , sig_pipefd );
156
+ assert ( ret != -1 );
157
+
158
+ setnonblocking ( sig_pipefd[1 ] );
159
+ addfd ( m_epollfd, sig_pipefd[0 ] );
160
+
161
+ /* 设置信号处理函数 */
162
+ addsig ( SIGCHLD, sig_handler );
163
+ addsig ( SIGTERM, sig_handler );
164
+ addsig ( SIGINT, sig_handler );
165
+ addsig ( SIGPIPE, SIG_IGN );
166
+ }
167
+
168
+ /* 父进程中m_idx值为-1,子进程中m_idx值大于等于0,我们据此判断接下来要运行的是父进程代码还是子进程代码 */
169
+ template < typename T >
170
+ void processpool< T >::run(){
171
+ if ( m_idx != -1 ){
172
+ run_child ();
173
+ return ;
174
+ }
175
+ run_parent ();
176
+ }
177
+
178
+
179
+ template < typename T >
180
+ void processpool< T >::run_child(){
181
+ setup_sig_pipe ();
182
+
183
+ /* 每个子进程都通过其在进程池中的序号值m_idx找到与父进程通信的管道 */
184
+ int pipefd = m_sub_process[m_idx].m_pipefd [ 1 ];
185
+ /* 子进程需要监听管道文件描述符pipefd,因为父进程将通过它来通知子进程accept新连接 */
186
+ addfd ( m_epollfd, pipefd );
187
+
188
+ epoll_event events[ MAX_EVENT_NUMBER ];
189
+ T* users = new T [ USER_PER_PROCESS ];
190
+ assert ( users );
191
+ int number = 0 ;
192
+ int ret = -1 ;
193
+
194
+ while ( ! m_stop ){
195
+ number = epoll_wait ( m_epollfd, events, MAX_EVENT_NUMBER, -1 );
196
+ if ( ( number < 0 ) && ( errno != EINTR ) ){
197
+ printf ( " epoll failure\n " );
198
+ break ;
199
+ }
200
+
201
+ for ( int i = 0 ; i < number; ++i ){
202
+ int sockfd = events[i].data .fd ;
203
+ if ( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) ){
204
+ int client = 0 ;
205
+ /* 从父,子进程之间的管道读取数据,并将结果保存在变量client中。如果读取成功,则表示有新客户连接到来 */
206
+ ret = recv ( sockfd, ( char * )client, sizeof ( client ), 0 );
207
+ if ( ( ( ( ret < 0 ) && ( errno != EAGAIN ) ) ) || ( ret == 0 ) ){
208
+ continue ;
209
+ }else {
210
+ struct sockaddr_in client_address;
211
+ socklen_t client_addrlength = sizeof ( client_address );
212
+ int connfd = accept ( m_listenfd, ( struct sockaddr * )&client_address, &client_addrlength );
213
+ if ( connfd < 0 ){
214
+ printf ( " errno is: %d\n " ,errno );
215
+ continue ;
216
+ }
217
+ addfd ( m_epollfd, connfd );
218
+ /* 模板类T必须实现init方法,以初始化一个客户连接。我们直接使用connfd来索引逻辑处理对象(T类型的对象),以提高效率 */
219
+ users[ connfd ].init ( m_epollfd, connfd, client_address );
220
+ }
221
+ }
222
+ /* 下面处理子进程接收到的信号 */
223
+ else if ( ( sockfd == sig_pipefd[0 ] ) && ( events[i].events & EPOLLIN ) ){
224
+ int sig;
225
+ char signals[1024 ];
226
+ ret = recv ( sig_pipefd[0 ], signals, sizeof ( signals ), 0 );
227
+ if ( ret <= 0 ){
228
+ continue ;
229
+ }else {
230
+ for ( int i = 0 ; i < ret; ++i ){
231
+ switch ( signals[i] ){
232
+ case SIGCHLD:{
233
+ pid_t pid;
234
+ int stat;
235
+ while ( ( pid==waitpid ( -1 , &stat, WNOHANG ) ) > 0 ){
236
+ continue ;
237
+ }
238
+ break ;
239
+ }
240
+ case SIGTERM:
241
+ case SIGINT:{
242
+ m_stop = true ;
243
+ break ;
244
+ }default :{
245
+ break ;
246
+ }
247
+ }
248
+ }
249
+ }
250
+ }
251
+ /* 如果是其他可读数据,那么必然是客户请求到来。调用逻辑处理对象的process方法处理之 */
252
+ else if ( events[i].events & EPOLLIN ){
253
+ users[sockfd].process ();
254
+ }else {
255
+ continue ;
256
+ }
257
+ }
258
+ }
259
+
260
+ delete [] users;
261
+ users = NULL ;
262
+ close ( pipefd );
263
+ // close( m_listenfd ); /* 应该有m_listenfd的创建者来关闭这个文件描述符,即所谓的“对象(比如一个文件描述符,又或者是一段堆内存)由哪个函数创建,就应该由哪个函数销毁” */
264
+ close ( m_epollfd );
265
+ }
266
+
267
+
268
+ template < typename T >
269
+ void processpool< T >::run_parent(){
270
+ setup_sig_pipe ();
271
+
272
+ /* 父进程监听m_listenfd */
273
+ addfd ( m_epollfd, m_listenfd );
274
+
275
+ epoll_event events[ MAX_EVENT_NUMBER ];
276
+ int sub_process_counter = 0 ;
277
+ int new_conn = 1 ;
278
+ int number = 0 ;
279
+ int ret = -1 ;
280
+
281
+ while ( ! m_stop ){
282
+ number = epoll_wait ( m_epollfd, events, MAX_EVENT_NUMBER, -1 );
283
+ if ( ( number < 0 ) && ( errno != EINTR ) ){
284
+ printf ( " epoll failure\n " );
285
+ break ;
286
+ }
287
+
288
+ for ( int i = 0 ; i < number; ++i ){
289
+ int sockfd = events[i].data .fd ;
290
+ if ( sockfd == m_listenfd ){
291
+ /* 如果有新的连接到来,就采用Round Robin方式将其分配给一个子进程处理 */
292
+ int i = sub_process_counter;
293
+ do {
294
+ if ( m_sub_process[i].m_pid != -1 ){
295
+ break ;
296
+ }
297
+ i = (i+1 )%m_process_number;
298
+ }while ( i != sub_process_counter );
299
+
300
+ if ( m_sub_process[i].m_pid == -1 ){
301
+ m_stop = true ;
302
+ break ;
303
+ }
304
+ sub_process_counter = (i+1 )%m_process_number;
305
+ send ( m_sub_process[i].m_pipefd [0 ], ( char * )&new_conn, sizeof ( new_conn ), 0 );
306
+ printf ( " send request to child %d\n " , i );
307
+ }
308
+ /* 下面处理父进程收到的信号 */
309
+ else if ( ( sockfd == sig_pipefd[0 ] ) && ( events[i].events & EPOLLIN ) ){
310
+ int sig;
311
+ char signals[1024 ];
312
+ ret = recv ( sig_pipefd[0 ], signals, sizeof ( signals ), 0 );
313
+ if ( ret <= 0 ){
314
+ continue ;
315
+ }else {
316
+ for ( int i = 0 ; i < ret; ++i ){
317
+ switch ( signals[i] ){
318
+ case SIGCHLD:{
319
+ pid_t pid;
320
+ int stat;
321
+ while ( ( pid == waitpid ( -1 , &stat, WNOHANG ) ) > 0 ){
322
+ for ( int i = 0 ; i < m_process_number; ++i ){
323
+ /* 如果进程池中第i个子进程退出了,则主进程关闭相应的通信管道,并设置相应的m_pid为-1,以标记该子进程已经退出 */
324
+ if ( m_sub_process[i].m_pid == pid ){
325
+ printf ( " child %d join\n " , i );
326
+ close ( m_sub_process[i].m_pipefd [0 ] );
327
+ m_sub_process[i].m_pid = -1 ;
328
+ }
329
+ }
330
+ }
331
+ /* 如果所有子进程都已经退出了,则父进程也退出 */
332
+ m_stop = true ;
333
+ for ( int i = 0 ; i < m_process_number; ++i ){
334
+ if ( m_sub_process[i].m_pid != -1 ){
335
+ m_stop = false ;
336
+ }
337
+ }
338
+ break ;
339
+ }
340
+ case SIGTERM:
341
+ case SIGINT:{
342
+ /* 如果父进程接收到终止信号,那么久杀死所有子进程,并等待它们全部结束。当然,通知子进程结束更好的方法是向父,子进程之间的通信管道发送特殊数据 */
343
+ printf ( " kill all the child now\n " );
344
+ for ( int i = 0 ; i < m_process_number; ++i ){
345
+ int pid = m_sub_process[i].m_pid ;
346
+ if ( pid != -1 ){
347
+ kill ( pid, SIGTERM );
348
+ }
349
+ }
350
+ break ;
351
+ }default :{
352
+ break ;
353
+ }
354
+ }
355
+ }
356
+ }
357
+ }else {
358
+ continue ;
359
+ }
360
+ }
361
+ }
362
+
363
+ // close( m_listenfd ); /* 由创建者关闭这个文件描述符 */
364
+ close ( m_epollfd );
365
+ }
366
+
367
+ #endif
0 commit comments