-
Notifications
You must be signed in to change notification settings - Fork 0
/
epoll.cpp
195 lines (180 loc) · 5.98 KB
/
epoll.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#include "epoll.h"
#include "threadpool.h"
#include "util.h"
#include <sys/epoll.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <queue>
#include <deque>
int TIMER_TIME_OUT = 500;
epoll_event *Epoll::events;
std::unordered_map<int, std::shared_ptr<RequestData>> Epoll::fd2req;
int Epoll::epoll_fd = 0;
const std::string Epoll::PATH = "/";
TimerManager Epoll::timer_manager;
int Epoll::epoll_init(int maxevents, int listen_num)
{
epoll_fd = epoll_create(listen_num + 1);
if(epoll_fd == -1)
return -1;
//events.reset(new epoll_event[maxevents], [](epoll_event *data){delete [] data;});
events = new epoll_event[maxevents];
return 0;
}
// 注册新描述符
int Epoll::epoll_add(int fd, SP_ReqData request, __uint32_t events)
{
struct epoll_event event;
event.data.fd = fd;
event.events = events;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0)
{
perror("epoll_add error");
return -1;
}
fd2req[fd] = request;
return 0;
}
// 修改描述符状态
int Epoll::epoll_mod(int fd, SP_ReqData request, __uint32_t events)
{
struct epoll_event event;
event.data.fd = fd;
event.events = events;
if(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0)
{
perror("epoll_mod error");
return -1;
}
fd2req[fd] = request;
return 0;
}
// 从epoll中删除描述符
int Epoll::epoll_del(int fd, __uint32_t events)
{
struct epoll_event event;
event.data.fd = fd;
event.events = events;
if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &event) < 0)
{
perror("epoll_del error");
return -1;
}
auto fd_ite = fd2req.find(fd);
if (fd_ite != fd2req.end())
fd2req.erase(fd_ite);
return 0;
}
// 返回活跃事件数
void Epoll::my_epoll_wait(int listen_fd, int max_events, int timeout)
{
// printf("fd2req.size()==%d\n", fd2req.size());
int event_count = epoll_wait(epoll_fd, events, max_events, timeout);
if (event_count < 0)
perror("epoll wait error");
std::vector<SP_ReqData> req_data = getEventsRequest(listen_fd, event_count, PATH);
if (req_data.size() > 0)
{
for (auto &req: req_data)
{
//cout<<"req_data timer use_count: "<<req->timer.use_count()<<endl;
if (ThreadPool::threadpool_add(req) < 0)
{
// 线程池满了或者关闭了等原因,抛弃本次监听到的请求。
break;
}
}
}
timer_manager.handle_expired_event();
}
#include <iostream>
#include <arpa/inet.h>
using namespace std;
void Epoll::acceptConnection(int listen_fd, int epoll_fd, const std::string path)
{
struct sockaddr_in client_addr;
char str[INET_ADDRSTRLEN];
memset(&client_addr, 0, sizeof(struct sockaddr_in));
//socklen_t client_addr_len = 0;
socklen_t client_addr_len = sizeof(client_addr);
int accept_fd = 0;
while((accept_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len)) > 0)
{
cout<<"Accept Connection from ("<<inet_ntop(AF_INET,&client_addr.sin_addr,str,sizeof(str))<<","<<ntohs(client_addr.sin_port)<<") "<<"at --listenfd:["<<listen_fd<<"]-- --acceptfd:["<<accept_fd<<"]--"<<endl;
/*
// TCP的保活机制默认是关闭的
int optval = 0;
socklen_t len_optval = 4;
getsockopt(accept_fd, SOL_SOCKET, SO_KEEPALIVE, &optval, &len_optval);
cout << "optval ==" << optval << endl;
*/
// 设为非阻塞模式
int ret = setSocketNonBlocking(accept_fd);
if (ret < 0)
{
perror("Set non block failed!");
return;
}
SP_ReqData req_info(new RequestData(epoll_fd, accept_fd, path));
// 文件描述符可以读,边缘触发(Edge Triggered)模式,保证一个socket连接在任一时刻只被一个线程处理
__uint32_t _epo_event = EPOLLIN | EPOLLET | EPOLLONESHOT;
Epoll::epoll_add(accept_fd, req_info, _epo_event);
// 新增时间信息
timer_manager.addTimer(req_info, TIMER_TIME_OUT);
//cout<<"req_info use_count: "<<req_info->timer.use_count()<<endl;
}
//if(accept_fd == -1)
// perror("accept");
}
// 分发处理函数
std::vector<std::shared_ptr<RequestData>> Epoll::getEventsRequest(int listen_fd, int events_num, const std::string path)
{
std::vector<SP_ReqData> req_data;
for(int i = 0; i < events_num; ++i)
{
// 获取有事件产生的描述符
int fd = events[i].data.fd;
// 有事件发生的描述符为监听描述符
if(fd == listen_fd)
{
//cout << "This is listen_fd" << endl;
acceptConnection(listen_fd, epoll_fd, path);
}
else if (fd < 3)
{
break;
}
else
{
// 排除错误事件
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)
|| (!(events[i].events & EPOLLIN)))
{
//printf("error event\n");
auto fd_ite = fd2req.find(fd);
if (fd_ite != fd2req.end())
fd2req.erase(fd_ite);
continue;
}
// 将请求任务加入到线程池中
// 加入线程池之前将Timer和request分离
SP_ReqData cur_req(fd2req[fd]);
if(cur_req)
{
//printf("cur_req.use_count=%d\n", cur_req.use_count());
cur_req->seperateTimer();
req_data.push_back(cur_req);
auto fd_ite = fd2req.find(fd);
if (fd_ite != fd2req.end())
fd2req.erase(fd_ite);
}
}
}
return req_data;
}
void Epoll::add_timer(shared_ptr<RequestData> request_data, int timeout)
{
timer_manager.addTimer(request_data, timeout);
}