Skip to content

Commit

Permalink
添加了stage的编程,完成了初步4个模块的编写,接下来需要实现一个网络I/O核心模块,event_core模块
Browse files Browse the repository at this point in the history
  • Loading branch information
shilei authored and shilei committed Jul 11, 2017
1 parent ae88480 commit f155b80
Show file tree
Hide file tree
Showing 29 changed files with 645 additions and 17 deletions.
11 changes: 11 additions & 0 deletions IElement.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#ifndef _IELEMENT_H
#define _IELEMENT_H

class IElement {
public:
void *getElement();
private:
void *element;
};

#endif /* _IELEMENT_H */
14 changes: 14 additions & 0 deletions config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef _CONFIG_H
#define _CONFIG_H

class Config {
private:
int listen_port;

public:
int getListenPort() {
return listen_port;
}
};

#endif /* _CONFIG_H */
13 changes: 13 additions & 0 deletions event_core.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include "event_core.h"



void event_core::run() {
l->loop();
}

bool event_core::init() {
s = new Socket(config.getListenPort());
l = new Loop(s);
return true;
}
21 changes: 21 additions & 0 deletions event_core.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#ifndef _EVENT_CORE_H
#define _EVENT_CORE_H
#include "config.h"
#include "socket.h"
#include "loop.h"

class event_core {
public:
event_core(Config &_config) : config(_config) {
init();
}
void run();
bool init();

private:
Socket *s;
Loop *l;
Config config;
};

#endif /*_EVENT_CORE_H*/
22 changes: 22 additions & 0 deletions function.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#ifndef _FUNCTION_H
#define _FUNCTION_H
#include "struct.h"

class Function {
public:
Function(function fun, void *_arg) : f(fun), arg(_arg) {}
function getFunction() {
return f;
}

void *getArg() {
return arg;
}

private:
function f;
void *arg;

};

#endif /* _FUNCTION_H */
3 changes: 3 additions & 0 deletions job_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ typedef void * (*worker_task)(void *);
struct queue_element {
worker_task _cb;
void *arg;
queue_element() { }
queue_element(worker_task wt, void *_arg) : _cb(wt), arg(_arg) {
}
};

class task_queue {
Expand Down
14 changes: 7 additions & 7 deletions log.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ struct async_log_content {
}
};

class _log {
class LogUtil {
public:
_log();
~_log();
_log(std::string _log_output, int _log_level);
LogUtil();
~LogUtil();
LogUtil(std::string _log_output, int _log_level);
void error(std::string _error);
void debug(std::string _debug);
void info(std::string _info);
void _error(const char *fmt, ...);
void _debug(const char *fmt, ...);
void _info(const char *fmt, ...);
static void error(const char *fmt, ...);
static void debug(const char *fmt, ...);
static void info(const char *fmt, ...);
bool set_async(bool async);
void stop();

Expand Down
63 changes: 63 additions & 0 deletions loop.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include "loop.h"
#include "log.h"

void Loop::accept_conn() {
LogUtil::debug("acceptor : [accept_conn]\n");
struct sockaddr_in actual_addr;
int len = sizeof(struct sockaddr);
int peer_sock = -1;
while ((peer_sock = accept(s->getListenPort(), (sockaddr *)&actual_addr, (socklen_t *)&len)) > 0) {
fcntl(peer_sock, F_SETFL, SOCK_NONBLOCK);
struct epoll_event event;
memset(&event, 0, sizeof(event));
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_ADD, peer_sock, &event);
}

if (peer_sock == -1) {
if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR)
perror("accept");
}
return NULL;
}

void Loop::read_msg() {
int nread = -1;
char buf[BUFSIZE] = { 0 };
while ((nread = read(conn->fd, _buf, BUFSIZE - 1)) > 0) {
IElement *ie =
}
}

void Loop::loop() {
int epfd = epoll_create1(EPOLL_CLOEXEC);
struct epoll_event event;
memset(&event, 0, sizeof(event));
event.events = EPOLLIN | EPOLLET;
event.data.fd = listenfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event);
struct epoll_event events[MAXEVENTS];
//log->info("acceptor looping\n");
LogUtil::debug("Socket : [loop] running");

for (;;) {
int rd_fds = epoll_wait(epfd, events, MAXEVENTS, 1000);
for (int i = 0; i < rd_fds; ++i) {
if (events[i].data.fd == listenfd) {
//acceptor *ac = this;
accept_conn();
} else {
// TODO.
if (events[i].events & EPOLLIN) {
read_msg();
LogUtil::debug("acceptor : [epoll_loop]. tq->push read_conn");
} else {
if (events[i].events & EPOLLOUT) {
LogUtil::debug("acceptor : [epoll_loop]. tq->push write_conn");
}
}
}
}
}
return true;
}
15 changes: 15 additions & 0 deletions loop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef _LOOP_H
#define _LOOP_H
#include "socket.h"

class Loop {
public:
Loop(Socket *_s) : s(_s) { }
void accept_conn();
void loop();

private:
Socket *s;
};

#endif /* _LOOP_H */
48 changes: 48 additions & 0 deletions seda.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include "seda.h"
#include "log.h"

stage *SEDA::create_stage(string stage_name) {
stage *sg = new stage(stage_name);
map<string, stage *>::iterator it = stages.find(stage_name);
if (it != stages.end()) {
LogUtil::debug("The stage : %s is already existed", stage_name.c_str());
return NULL;
}
stages[stage_name] = sg;
stages_v.push_back(sg);
return sg;
}

bool SEDA::add_msg_edge(stage *from, stage *to, string msg) {
stage_message *sm = new stage_message(msg);
if (msgExist2Stages(from, to, sm)) {
LogUtil::debug("The msg : %s is already exsited from the stage : %s to stage : %s", msg.c_str(), from->get_stage_name().c_str(), to->get_stage_name().c_str());
return false;
}
addSM(from, to, sm);
return true;
}

bool SEDA::msgExist2Stages(stage *from, stage *to, stage_message *sm) {
map<stage_message *, pair<stage *, stage *> >::iterator it = msg_relat.find(sm);
if (it != msg_relat.end()) {
if ((it->second.first == from) && it->second.second == to) {
return false;
}
}
return true;
}

void SEDA::addSM(stage *from, stage *to, stage_message *sm) {
msg_relat.insert(pair<stage_message *, pair<stage *, stage *> >(sm, pair<stage *, stage *>(from, to)));
}

bool SEDA::run() {
for (size_t i = 0; i < stages_v.size(); ++i) {
if (!stages_v[i]->run()) {
LogUtil::debug("stage : %s run fail", stages_v[i]->get_stage_name().c_str());
return false;
}
}
return true;
}
30 changes: 30 additions & 0 deletions seda.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef _SEDA_H
#define _SEDA_H
#include <vector>
#include <map>
#include "stage.h"
#include "config.h"
#include "stage_message.h"

using namespace std;

class SEDA {
public:
Config config_sys(string config_filename);
void init();
stage *create_stage(string stage_name);
bool add_msg_edge(stage *from, stage *to, string msg);
bool run();

private:
void stop();
vector<stage *> stages_v;
map<string, stage *> stages;
map<string, stage_message *> messages;
map<stage_message *, pair<stage *, stage *> > msg_relat;

bool msgExist2Stages(stage *from, stage *to, stage_message *sm);
void addSM(stage *from, stage *to, stage_message *sm);
};

#endif /* _SEDA_H */
106 changes: 106 additions & 0 deletions socket.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include "socket.h"
#include "log.h"
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>

/*
* 完成所有的客户端socket初始化动作
*/
Socket::Socket(std::string ip, int port) {
connect(ip, port);
}

/*
* 完成所有的服务端socket初始化动作
*/
Socket::Socket(int listen_port) {
this->listen_port = listen_port;
bool ret = initServer();
if (ret) {
LogUtil::debug("Socket [initServer] error");
return ;
}
}

bool Socket::initServer() {
int ret = 0;
if ((ret = (listenfd = socket(AF_INET, SOCK_STREAM, 0))) == -1) {
// log here.
LogUtil::debug("Socket : [initServer] create socket error");
//fprintf(stderr, "create socket error\n");
return false;
}
struct sockaddr_in actual_addr;
actual_addr.sin_family = AF_INET;
actual_addr.sin_addr.s_addr = htonl(INADDR_ANY);
actual_addr.sin_port = htons(listen_port);

// set the listenfd socket to be nonblock.
fcntl(listenfd, F_SETFL, SOCK_NONBLOCK);

//set listenfd reused.
int option = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));

struct sockaddr *addr = (struct sockaddr *)&actual_addr;
//addr.sa_family.
if ((ret = bind(listenfd, addr, sizeof(actual_addr))) == -1) {
fprintf(stderr, "bind socket error : %s\n", strerror(errno));
return false;
}
listen(listenfd, 50);
return true;
}

/*
void Socket::loop() {
epfd = epoll_create1(EPOLL_CLOEXEC);
struct epoll_event event;
memset(&event, 0, sizeof(event));
event.events = EPOLLIN | EPOLLET;
event.data.fd = listenfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event);
struct epoll_event events[MAXEVENTS];
//log->info("acceptor looping\n");
LogUtil::debug("Socket : [loop] running");
for (;;) {
int rd_fds = epoll_wait(epfd, events, MAXEVENTS, 1000);
for (int i = 0; i < rd_fds; ++i) {
if (events[i].data.fd == listenfd) {
//acceptor *ac = this;
accept_conn();
} else {
// TODO.
if (events[i].events & EPOLLIN) {
connection *conn = (connection *)events[i].data.ptr;
conn->cb = read_conn;
queue_element *qe = new queue_element();
qe->_cb = conn->cb;
qe->arg = (void *)conn;
tq->push((void *)qe);
log->debug("acceptor : [epoll_loop]. tq->push read_conn\n");
} else {
if (events[i].events & EPOLLOUT) {
connection *conn = (connection *)events[i].data.ptr;
conn->cb = write_conn;
queue_element *qe = new queue_element();
qe->_cb = write_conn;
qe->arg = (void *)conn;
tq->push((void *)qe);
log->debug("acceptor : [epoll_loop]. tq->push write_conn\n");
}
}
}
}
}
return true;
}
*/
Loading

0 comments on commit f155b80

Please sign in to comment.