forked from rancher/python-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrency.py
69 lines (52 loc) · 1.71 KB
/
concurrency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import logging
from cattle import Config
log = logging.getLogger('concurrency')
__all__ = ['Queue', 'Empty', 'Full', 'Worker', 'run', 'spawn', 'blocking']
if Config.is_eventlet():
import eventlet
eventlet.monkey_patch()
from eventlet.queue import Queue, Empty, Full
from eventlet import tpool
pool = eventlet.GreenPool(size=Config.workers() * 2)
class Worker:
def __init__(self, target=None, args=None):
self._target = target
self._args = args
def start(self):
pool.spawn_n(self._target, *self._args)
log.info('Using eventlet')
port = Config.eventlet_backdoor()
if port:
from eventlet import backdoor
log.info('Launching eventlet backdoor on port %s', port)
eventlet.spawn(backdoor.backdoor_server,
eventlet.listen(('localhost', port)))
elif Config.is_multi_proc():
from Queue import Empty, Full
from multiprocessing import Queue, Process
Worker = Process
log.info('Using multiprocessing')
elif Config.is_multi_thread():
from Queue import Queue, Empty, Full
from threading import Thread
Worker = Thread
log.info('Using threading')
else:
raise Exception('Could not determine concurrency style set '
'CATTLE_AGENT_MULTI to eventlet, thread, or '
'proc')
def spawn(**kw):
p = Worker(**kw)
p.daemon = True
p.start()
return p
def run(method, *args):
if Config.is_eventlet():
pool.spawn(method, *args).wait()
else:
method(*args)
def blocking(method, *args, **kw):
if Config.is_eventlet():
return tpool.execute(method, *args, **kw)
else:
return method(*args, **kw)