15
15
from concurrent .futures import ThreadPoolExecutor , ProcessPoolExecutor
16
16
from functools import wraps
17
17
from typing import Generator , Iterable , List , Tuple
18
+ from venv import logger
18
19
19
20
20
21
from loguru import logger as default_logger
@@ -160,20 +161,33 @@ def wrapped(*args, **kwargs):
160
161
161
162
162
163
# 线程池批量跑function
163
- def multi_thread (work_num , return_list = False , use_process = False ):
164
+ def multi_thread (work_num , return_list = False , safe_execute = True ):
165
+ """多线程跑某个function的装饰器
166
+ Args:
167
+ work_num (_type_): 线程数
168
+ return_list (bool, optional): 结果是否返回list. Defaults to False.
169
+ safe_execute (bool, optional): 是不是catch住function中的exception并返回None. Defaults to True.
170
+ """
164
171
def wrapper (func ):
165
172
@wraps (func )
166
173
def wrapped (data : Iterable , * args , ** kwargs ):
167
174
def _func (x ):
168
- return func (x , * args , ** kwargs )
169
- with ThreadPoolExecutor (work_num ) as executors :
170
- rs_iter = executors .map (_func , data )
171
- rs = rs_iter
172
- total = None if not hasattr (data , '__len__' ) else len (data )
173
- rs = tqdm (rs_iter , total = total )
174
- if return_list :
175
- return list (rs )
176
- return rs
175
+ try :
176
+ return func (x , * args , ** kwargs )
177
+ except Exception as e :
178
+ if safe_execute :
179
+ logger .warning (f"function { func .__name__ } failed with exception" )
180
+ return None
181
+ else :
182
+ raise e
183
+
184
+ executors = ThreadPoolExecutor (work_num )
185
+ rs_iter = executors .map (_func , data )
186
+ total = None if not hasattr (data , '__len__' ) else len (data )
187
+ rs_iter = tqdm (rs_iter , total = total )
188
+ rs_iter = (e for e in rs_iter if e is not None )
189
+
190
+ return list (rs_iter ) if return_list else rs_iter
177
191
return wrapped
178
192
return wrapper
179
193
@@ -184,14 +198,13 @@ def multi_process(work_num, return_list=False):
184
198
def wrapper (func ):
185
199
@wraps (func )
186
200
def wrapped (data : Iterable ):
187
- with ProcessPoolExecutor (work_num ) as executors :
188
- rs_iter = executors .map (func , data )
189
- rs = rs_iter
190
- total = None if not hasattr (data , '__len__' ) else len (data )
191
- rs = tqdm (rs_iter , total = total )
192
- if return_list :
193
- return list (rs )
194
- return rs
201
+ executors = ProcessPoolExecutor (work_num )
202
+ rs_iter = executors .map (func , data )
203
+ rs = rs_iter
204
+ total = None if not hasattr (data , '__len__' ) else len (data )
205
+ rs_iter = tqdm (rs_iter , total = total )
206
+ return list (rs_iter ) if return_list else rs
207
+
195
208
return wrapped
196
209
return wrapper
197
210
0 commit comments