Skip to content

Commit

Permalink
Fix code 0.1.0, A maybe correctable version.
Browse files Browse the repository at this point in the history
  • Loading branch information
sl950313 committed Aug 2, 2017
1 parent a125b65 commit da55fb9
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 24 deletions.
8 changes: 6 additions & 2 deletions IElement.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
#ifndef _IELEMENT_H
#define _IELEMENT_H
#include <string.h>

class IElement {
public:
IElement(void *ele) : element(ele) {}
IElement(char *ele) {
memset(element, 0, 256);
strncpy(element, ele, strlen(ele));
}
void *getElement() {
return element;
}
private:
void *element;
char element[256];
};

#endif /* _IELEMENT_H */
5 changes: 4 additions & 1 deletion function.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#ifndef _FUNCTION_H
#define _FUNCTION_H
#include "struct.h"
#include <stdlib.h>

class Function {
public:
Function() { }
Function() {
Function(NULL, NULL);
}
Function(function fun, void *_arg) : f(fun), arg(_arg) {}
function getFunction() {
return f;
Expand Down
3 changes: 2 additions & 1 deletion main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
using namespace std;

void *print(void *arg) {
printf("main [print] : %s", (char *)arg);
//printf("main [print] : %s", (char *)arg);
LogUtil::debug("main [print] : %s", (char *)arg);
return NULL;
}

Expand Down
3 changes: 2 additions & 1 deletion marcos.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
#define MAXEVENTS 1024
#define BUFSIZE 1024
#define FORCE_CLOSE 0
#define MAX_QUEUE_SIZE 51200

/*
* For worker pool
*/
#define DEFAULT_WORKER_NUM 4
#define DEFAULT_WORKER_NUM 8

/*
* For mq.
Expand Down
41 changes: 29 additions & 12 deletions receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ using namespace std;
void *receiver::run(void *arg) {
receiver *rc = (receiver *)arg;
void *context = zmq_ctx_new();
assert(context != NULL);
//int r = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);
//assert(ret == 0);
void *subscriber = zmq_socket(context, ZMQ_SUB);
assert(subscriber != NULL);

LogUtil::debug("here ? ");
LogUtil::debug("res : %s", rc->res[0].c_str());
int r = zmq_connect(subscriber, rc->res[0].c_str());
if (r != 0) {
LogUtil::debug("receiver | run | zmq_connect error.");
Expand All @@ -21,39 +23,54 @@ void *receiver::run(void *arg) {
//assert(rc == 0);
LogUtil::debug("receiver : [run], subscriber : %s", rc->res[0].c_str());

r = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, NULL, 0);
r = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
if (r != 0) {
LogUtil::debug("receiver | run | zmq_setsockopt error.");
rc->running = false;
return NULL;
}
//assert(rc == 0);
LogUtil::debug("receiver : [run] zmq finish!");
char buf[256] = {
0
};
while (rc->running) {
memset(buf, 0, 256);
LogUtil::debug("receiver : [run] recving...");
int nrecv = zmq_recv(subscriber, buf, 256, 0);
if (nrecv == -1) {
LogUtil::debug("receiver [run] error");
LogUtil::debug("receiver [run] recving error");
continue;
}
LogUtil::debug("receiver : [run] already recv");
buf[255] = 0;
IElement ie(buf);
pthread_mutex_lock(&rc->lock);
if (rc->elements.size() > MAX_QUEUE_SIZE) {
LogUtil::debug("receiver [run], the rc->queue is less");
}
bool is_signal = false;
if (rc->elements.empty()) {
is_signal = true;
}
rc->elements.push(ie);
pthread_mutex_unlock(&rc->lock);
if (is_signal)
pthread_cond_signal(&rc->empty_queue);
//pthread_cond_broadcast(&rc->empty_queue);
}
return rc;
}

/*
void receiver::setResources(vector<string> &res) {
for (size_t i = 0; i < res.size(); ++i) {
}
}
*/

IElement receiver::fetchOne() {
pthread_mutex_lock(&lock);
while (elements.empty()) {
LogUtil::debug("receiver : [fetchOne] waiting...");
pthread_cond_wait(&empty_queue, &lock);
}
IElement ie = elements.front();
elements.pop();
pthread_mutex_unlock(&lock);
LogUtil::debug("receiver : [fetchOne] fetchOne success. %s", ie.getElement());
return ie;
}
5 changes: 5 additions & 0 deletions receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class receiver {
receiver() {
running = true;
pthread_mutex_init(&lock, NULL);
pthread_mutex_init(&queue_lock, NULL);
pthread_cond_init(&empty_queue, NULL);
}
static void *run(void *);
IElement fetchOne();
Expand All @@ -30,6 +32,9 @@ class receiver {

std::vector<std::string> res;
pthread_mutex_t lock;

pthread_mutex_t queue_lock;
pthread_cond_t empty_queue;
std::queue<IElement > elements;
bool running;
};
Expand Down
4 changes: 2 additions & 2 deletions stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ bool stage::init(Config &config) {
Function func_rc(receiver::run, rc);
Function func_sh(stage_handler::run, sh);
Function fun_sc(stage_control::run, sc);
LogUtil::debug("running : %d", sc->getRunning());
//LogUtil::debug("running : %d", sc->getRunning());

//rc->run();
wp->run(func_rc);
wp->run(func_sh);
wp->run(fun_sc);
wp->run(func_sh);

//ec = new event_core(config);
return true;
Expand Down
8 changes: 7 additions & 1 deletion stage_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

void *stage_control::run(void *arg) {
stage_control *sc = (stage_control *)arg;
if (sc == NULL) {
LogUtil::debug("stage_control : [run]. arg=NULL");
return NULL;
}
LogUtil::debug("stage_control : %s", (sc->running == true) ? "running" : "stopping");
while (sc->running) {
LogUtil::debug("stage_control running");
//LogUtil::debug("stage_control running");
LogUtil::debug("stage_control : worker_pool [worker_num=%d]", sc->wp->getRunnningWorkerNum());
sleep(3);
}
return NULL;
Expand Down
1 change: 1 addition & 0 deletions stage_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class stage_control {
public:
stage_control(stage_queue *_sq, worker_pool *_wp) : sq(_sq), wp(_wp){
running = true;
}
static void *run(void *);
bool getRunning() {
Expand Down
11 changes: 11 additions & 0 deletions stage_handler.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include "stage_handler.h"
#include "log.h"

stage_handler::stage_handler(receiver *_rec, stage_queue *_sq, worker_pool *_wp) {
rec = _rec;
sq = _sq;
wp = _wp;
running = true;
//fun = NULL;
}

bool stage_handler::setHandler(Function f) {
Expand All @@ -18,6 +20,15 @@ void stage_handler::setStageQueue(stage_queue *sq) {

void *stage_handler::run(void *arg) {
stage_handler *sh = (stage_handler *)arg;
if (!sh) {
LogUtil::debug("stage_handler : [run]. error. arg=NULL");
return NULL;
}
if (sh->getf().getFunction() == NULL) {
LogUtil::debug("stage_handler : [run]. the setHandler should be involved first.");
return NULL;
}
LogUtil::debug("stage_handler [run]. running");
while (sh->running) {
//IElement ie = sq->qpop();
IElement ie = sh->rec->fetchOne();
Expand Down
1 change: 1 addition & 0 deletions stage_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class stage_handler {
void setStageQueue(stage_queue *sq);
static void *run(void *);
bool sendMsg(IElement *ie, std::string stage);
Function getf() {return fun;}

private:
Function fun;
Expand Down
24 changes: 21 additions & 3 deletions worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,51 @@ void worker_pool::run(Function func) {
*/
void worker_pool::init() {
jq = new mutex_task_queue();
running_num = 0;
pthread_mutex_init(&statics_lock, NULL);
}

void *worker_pool::per_worker_task(void *arg) {
LogUtil::debug("thread %ld running", (long)pthread_self());
//LogUtil::debug("thread %ld running", (long)pthread_self());
worker_pool *wp = (worker_pool *)arg;
/*
if (wp->worker_init_callback) {
wp->worker_init_callback(NULL);
}
*/
while (wp->running) {
LogUtil::debug("%ld waiting element from queue", (long)pthread_self());
LogUtil::debug("worker_pool : thread %ld waiting element from queue", (long)pthread_self());
queue_element* qe = (queue_element *)wp->jq->pop();
LogUtil::debug("%ld getting element from queue", (long)pthread_self());
LogUtil::debug("worker_pool : thread %ld getting element from queue", (long)pthread_self());

pthread_mutex_lock(&wp->statics_lock);
wp->running_num++;
pthread_mutex_unlock(&wp->statics_lock);

if (wp->worker_init_callback) {
wp->worker_init_callback(NULL);
} else {
//wp->exe_work((void *)qe);
qe->_cb(qe->arg);
delete qe;
}
pthread_mutex_lock(&wp->statics_lock);
wp->running_num--;
pthread_mutex_unlock(&wp->statics_lock);
//TODO
//wp->worker_init_callback(qe);
}
return NULL;
}

int worker_pool::getRunnningWorkerNum() {
/*
* Here we can not do decision based on a current value of running num.
* as the value is changing all the time.
*/
return running_num;
}

void thread_worker_pool::start() {
threads.resize(worker_num);
running = true;
Expand Down
4 changes: 3 additions & 1 deletion worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ class worker_pool {
bool running;
void set_task_queue(task_queue *_jq) {jq = _jq;}
worker_task worker_init_callback;
int getRunnningWorkerNum();

private:
virtual void init();
virtual void stop() {}
virtual void exe_work(void *arg) {}
task_queue *jq;

int running_num;
pthread_mutex_t statics_lock;
};

class thread_worker_pool : public worker_pool {
Expand Down

0 comments on commit da55fb9

Please sign in to comment.