-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpthreadPool.h
152 lines (121 loc) · 3.54 KB
/
pthreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#ifndef _PTHREAD_POOL_H_
#define _PTHREAD_POOL_H_
#include "Singleton.h"
#include "locker.h"
#include <pthread.h>
#include <iostream>
#include <list>
template <typename T>
class pthreadPool: public Singleton<pthreadPool<T> >
{
friend class Singleton<pthreadPool>; //friend,because Singleton need to call private constructor
private:
//constructor
pthreadPool(int maxPthreadNum,int maxRequest);
~pthreadPool();
void run();
private:
static void* worker(void *args); //worker for thread
public:
bool appendRequest(T* request);
private:
int maxPthreadNum = 0; //max number of threads in pool
int maxRequest = 0;
pthread_t* pthreadArray; //array to store pthread_t
bool stopPoll = false; //if to stop all pthreads in pool
std::list<T*> workQueue; //work queue
locker myLocker; //mutex lock for protect queue
sem mySem; //number of request to process
};
template< typename T >
bool pthreadPool<T>::appendRequest(T * request)
{
//need to lock
myLocker.lock();
if(maxRequest < workQueue.size())
{
//need to unlock
myLocker.unlock();
std::cout<<"append error: size over"<<std::endl;
return false;
}
workQueue.push_back(request);
mySem.post(); //add sem
myLocker.unlock(); //unlock
return true;
}
//deconstructor
template<typename T>
pthreadPool<T>::~pthreadPool()
{
delete []pthreadArray;
stopPoll = true; //stop all pthreads in pool
}
//run for thread, can access member variable
template <typename T>
void pthreadPool<T>::run()
{
//std::cout<<"pthread"<<pthread_self()<<"are processing ";
while(!stopPoll)
{
mySem.wait(); //wether have request to process
myLocker.lock(); //lock
if(workQueue.empty())
{
myLocker.unlock();
continue;
}
myLocker.unlock();
T* request = workQueue.front(); //take request from work queue
workQueue.pop_front(); //pop the request
if(!request)
{
continue;
}
request->process(); //process request
}
}
//static method,can't access member variable
template<typename T>
void* pthreadPool<T>::worker(void *args)
{
pthreadPool<T>* pool = (pthreadPool<T>*)args;
//std::cout<<"in worker now..."<<std::endl;
pool->run();
return NULL;
}
//constructor
template<typename T>
pthreadPool<T>::pthreadPool(int maxPthreadNum,int maxRequest):maxPthreadNum(maxPthreadNum),maxRequest(maxRequest),pthreadArray(NULL),stopPoll(false)
{
//argument error
if(maxPthreadNum <= 0 || maxRequest < 0)
{
std::cout<<"maxPthreadNum error or maxRequest error"<<std::endl;
throw std::exception();
}
pthreadArray = new pthread_t[maxPthreadNum]; //memory leak maybe
if(!pthreadArray)
{
std::cout<<"pthreadArray new error"<<std::endl;
throw std::exception();
}
//create pthread and detach self
for(int i = 0; i < maxPthreadNum; ++i)
{
std::cout<<"start create pthreads in pool..."<<std::endl;
if((pthread_create(pthreadArray+i,NULL,worker,this)) != 0)
{
std::cout<<"pthread_create error"<<std::endl;
delete []pthreadArray; //delete
throw std::exception();
}
if(pthread_detach(pthreadArray[i]))
{
std::cout<<"pthread_detach error"<<std::endl;
delete []pthreadArray;
throw std::exception();
}
}
}
#endif