-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAction.py
202 lines (177 loc) · 7.23 KB
/
Action.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# -*- coding: utf-8 -*-
"""
Action类
每个Action自带一个Queue
Created on 03/30/2016
@author: Wen Gu
@contact: emptyset110@gmail.com
"""
import multiprocessing
import time
import threading
import logging
import ThreadManager as ThreadManager
import util
from abc import ABCMeta
from Functions import *
class Action(threading.Thread):
__metaclass__ = ABCMeta
def __init__( self
, name = None
, producer_list = list()
, num_start = 1
, num_min = 1
, num_max = 10
, need_new_thread = None
, cancel_thread = None
, on_finished = None
, set_daemon = True
, lower_threshold = 0
, upper_threshold = 3000 # 当消息队列数量超过upper_threshold时候,会动态添加
, log_level = "INFO" # "DEBUG","INFO","WARNING"
, **kwargs
):
threading.Thread.__init__(self)
self.logger = self.get_logger( level = log_level )
self._name = name
self._kwargs = kwargs
self._queue = multiprocessing.Queue()
self._producers = set()
self._lock = multiprocessing.Lock()
self._producer_list = producer_list
self.lower_threshold = lower_threshold
self.upper_threshold = upper_threshold
self._manager = ThreadManager.Manager( target = self.thread_target
, set_daemon = set_daemon # 将线程设置为守护线程
, num_start = num_start # 初始化action时候一次性创建的线程数
, num_min = num_min # 允许最少用于执行handler的线程数
, num_max = num_max # 允许用于执行handler最多线程数
, need_new_thread = self.need_new_thread # 用于判断是否需要增加线程的函数,默认
, cancel_thread = self.cancel_thread # 用于需要减少线程的判断函数,默认为
, on_finished = self.handler_callback # handler的回调函数
)
# _running决定action是否开启“定时处理”
# _active决定action是否会执行数据处理过程
# -----------------------------------
# 二者既不充分也不必要:
# _running==False & _active==True : "事件触发"状态
# _running==True & _active==True : "定时触发"状态
# _running==True & _active==False : 定时线程开启但是暂停处理数据
# _running==False & _active==False : Action完全停止
self._active = False # _active表示:是否处理数据
self._running = False # _running表示:( Deprecated ) Action是否开启,这个flag无用
self.mutex = threading.Lock()
def _init(self):
self._auto_load_producers()
# 开启定时打印消息队列堆积数据量的线程
t = threading.Thread(target=self.log_queue)
t.setDaemon(True)
t.start()
def log_queue(self):
while True:
self.logger.debug( u"消息队列中消息数量: {}".format( self._queue.qsize() ) )
time.sleep(10)
def get_logger(self, level):
logger = logging.getLogger(self.__class__.__name__)
if level is "DEBUG":
logger.setLevel(10)
elif level is "INFO":
logger.setLevel(20)
elif level is "WARNING":
logger.setLevel(30)
elif level is "ERROR":
logger.setLevel(40)
elif level is "CRITICAL":
logger.setLevel(50)
else:
logger.setLevel(20)
return logger
def handler_callback( self, result ):
return None
# print( result )
def need_new_thread( self ):
"""
需要动态新增线程的判断函数
--------
return: True/False
"""
if (self._queue.qsize() > self.upper_threshold):
self.logger.info( u"消息队列中消息数量: {}, 需要增加线程".format( self._queue.qsize() ) )
return True
else:
return False
# 需要减少线程的判断函数
def cancel_thread( self ):
"""
需要动态减少线程的判断函数
-------
return: True/False
"""
if ( self._queue.qsize() < self.lower_threshold ):
self.logger.debug( u"消息队列中消息数量: {}, 需要减少线程".format( self._queue.qsize() ) )
return True
else:
return False
# action订阅producer
def _subscribe(self, instance):
instance._add_subscriber(queue = self._queue)
# action取消订阅producer
def _unsubscribe(self, instance):
instance._remove_subscriber(queue = self._queue)
def _activate(self):
self.logger.info(u'[激活Action]\t{}'.format( self._name ) )
self._active = True
def _deactivate(self):
self._active = False
for producer in list(self._producers):
producer._remove_subscriber( self._queue )
self.logger.info( u'[暂停Action]\t{}'.format( self._name ) )
# 运行action即相当于采用“定时处理”的模式
def run(self):
self._auto_start_producers()
self._running = True
self._activate()
self.logger.info(u'[开启Action]\t{}'.format( self._name ) )
self._manager.start()
while self._running is True:
if self._active is True:
time.sleep(3)
self._end()
def is_active(self):
return self._active
def is_running(self):
return self._running
# 通知action进行一次数据处理,这是“事件触发”的模式
def _notify(self):
self._handler()
def _end(self):
for producer in list(self._producers):
producer._remove_subscriber(self._queue)
self.logger.info(u'[结束Action]\t{}'.format( self._name ) )
def _stop(self):
self._running = False
# action根据self._producer_list来自动加载producers
def _auto_load_producers(self):
for kwargs in self._producer_list:
instance = P(**kwargs)
self._producers.add( instance )
self._subscribe(instance)
def _auto_start_producers(self):
for producer in list(self._producers):
if not producer.is_running():
producer.start()
def thread_target(self):
try:
event = self._queue.get(True, timeout = 10)
result =self.handler(event = event)
return result
except Exception as e:
# self.logger.error("{}".format(e) )
return None
# 需要在子类中重写的数据处理方法
def handler(self, event):
print( "event.data: {}".format( event.data ) )
return
if __name__ == "__main__":
logging.basicConfig()
A("PrintSinaL2")