forked from mami-project/pto-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsupervisor.py
177 lines (132 loc) · 6.12 KB
/
supervisor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import asyncio
import json
import traceback
from typing import Set
import argparse
import os
from functools import partial
from pymongo.database import Database
from pymongo.collection import Collection
from .agent import OnlineAgent, ModuleAgent
from .analyzerstate import AnalyzerState
from .jsonprotocol import JsonProtocol
from .mongoutils import AutoIncrementFactory
from .coreconfig import CoreConfig
class SupervisorServer(JsonProtocol):
def __init__(self, supervisor):
self.supervisor = supervisor
def connection_made(self, transport):
super().connection_made(transport)
def received(self, obj):
try:
identifier = str(obj['identifier'])
token = str(obj['token'])
action = str(obj['action'])
payload = obj['payload']
except KeyError:
print("request is missing one or more fields: {token, identifier, action, payload}")
self.send({'error': 'request is missing one or more fields: {token, identifier, action, payload}'})
return
ans = self.supervisor.analyzer_request(identifier, token, action, payload)
self.send(ans)
class Supervisor:
def __init__(self, core_config: CoreConfig, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.core_config = core_config
# the supervisor is the only component generating agent_ids, therefore create_if_missing=True is not a problem.
idfactory = AutoIncrementFactory(self.core_config.idfactory_coll)
self._agent_id_creator = idfactory.get_incrementor('agent_id', create_if_missing=True)
self.analyzer_state = AnalyzerState('supervisor', self.core_config.analyzers_coll)
self.agents = {}
self.server = None
# todo delete users and collections
server_coro = self.loop.create_server(lambda: SupervisorServer(self),
host='localhost',
port=self.core_config.supervisor_port)
self.server = self.loop.run_until_complete(server_coro)
def analyzer_request(self, identifier, token, action, payload):
try:
agent = self.agents[identifier]
except KeyError:
print("no analyzer with this identifier")
return {'error': 'authentication failed, analyzer not on record with this identifier'}
if agent.token == token:
return agent._handle_request(action, payload)
else:
return {'error': 'authentication failed, token incorrect'}
def shutdown_online_agent(self, agent):
agent.teardown()
del self.agents[agent.identifier]
def create_online_agent(self):
print("creating online supervisor")
# create agent
identifier = 'online_'+str(self._agent_id_creator())
token = os.urandom(16).hex()
agent = OnlineAgent(identifier, token, self.core_config)
self.agents[agent.identifier] = agent
credentials = { 'identifier': agent.identifier, 'token': token,
'host': 'localhost', 'port': self.core_config.supervisor_port }
return credentials, agent
def script_agent_done(self, agent: ModuleAgent, fut: asyncio.Future):
print("module agent done")
agent.teardown()
del self.agents[agent.identifier]
try:
# raise exceptions that happened in the future
fut.result()
except Exception as e:
# an error happened
traceback.print_exc()
# set state accordingly
self.analyzer_state.transition_to_error(agent.analyzer_id,
"error when exeucting analyzer module:\n" + traceback.format_exc())
else:
# everything went well, so give to validator
transition_args = {'execution_result': {
'temporary_coll': agent.identifier,
'max_action_id': agent.result_max_action_id,
'timespans': agent.result_timespans
}}
self.analyzer_state.transition(agent.analyzer_id, 'executing', 'executed', transition_args)
def check_for_work(self):
planned = self.analyzer_state.planned_analyzers()
print("supervisor: check for work")
for analyzer in planned:
# check for wish
if self.analyzer_state.check_wish(analyzer, 'cancel'):
print("supervisor: cancelled {} upon request".format(analyzer['_id']))
continue
print("planned", analyzer)
# create agent
identifier = 'module_'+str(self._agent_id_creator())
token = os.urandom(16).hex()
agent = ModuleAgent(analyzer['_id'], identifier, token, self.core_config,
analyzer['input_formats'], analyzer['input_types'], analyzer['output_types'],
analyzer['command_line'], analyzer['working_dir'], analyzer['rebuild_all'])
self.agents[agent.identifier] = agent
# change analyzer state
self.analyzer_state.transition(agent.analyzer_id, 'planned', 'executing')
# schedule for execution
task = asyncio.ensure_future(agent.execute())
task.add_done_callback(partial(self.script_agent_done, agent))
print("module agent started")
async def run(self):
while True:
self.check_for_work()
await asyncio.sleep(4)
def main():
desc = 'Manage execution of analyzer modules.'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('config_file', type=argparse.FileType('rt'))
args = parser.parse_args()
cc = CoreConfig('supervisor', args.config_file)
loop = asyncio.get_event_loop()
sup = Supervisor(cc, loop)
# create online supervisor and print account details
credentials, agent = sup.create_online_agent()
print(json.dumps(credentials))
print("export PTO_CREDENTIALS=\"{}\"".format(json.dumps(credentials).replace('"', '\\"')))
asyncio.ensure_future(sup.run())
loop.run_forever()
if __name__ == "__main__":
main()