-
Notifications
You must be signed in to change notification settings - Fork 6
/
ScenarioRunner.py
170 lines (143 loc) · 5.52 KB
/
ScenarioRunner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import threading
import time
from logging import Logger
from typing import List, Optional, Tuple
from apollo.ApolloContainer import ApolloContainer
from apollo.ApolloRunner import ApolloRunner
from apollo.CyberBridge import Topics
from apollo.MessageBroker import MessageBroker
from apollo.utils import clean_appolo_dir
from config import SCENARIO_UPPER_LIMIT
from framework.scenario import Scenario
from framework.scenario.ad_agents import ADAgent
from framework.scenario.PedestrianManager import PedestrianManager
from framework.scenario.TrafficControlManager import TrafficControlManager
from modules.map.proto.map_pb2 import Map
from utils import (get_logger, get_scenario_logger, random_numeric_id,
save_record_files_and_chromosome)
class ScenarioRunner:
"""
Executes a scenario based on the specification
:param List[ApolloContainer] containers: containers to be used for scenario
"""
logger: Logger
containers: List[ApolloContainer]
curr_scenario: Optional[Scenario]
pm: Optional[PedestrianManager]
tm: Optional[TrafficControlManager]
is_initialized: bool
__instance = None
__runners: List[ApolloRunner]
def __init__(self, containers: List[ApolloContainer]) -> None:
"""
Constructor
"""
self.logger = get_logger('ScenarioRunner')
self.containers = containers
self.curr_scenario = None
self.is_initialized = False
ScenarioRunner.__instance = self
@staticmethod
def get_instance() -> 'ScenarioRunner':
"""
Returns the singleton instance
:returns: an instance of runner
:rtype: ScenarioRunner
"""
return ScenarioRunner.__instance
def set_scenario(self, s: Scenario):
"""
Set the scenario for this runner
:param Scenario s: scenario representation
"""
self.curr_scenario = s
self.is_initialized = False
def init_scenario(self):
"""
Initialize the scenario
"""
nids = random_numeric_id(len(self.curr_scenario.ad_section.adcs))
self.__runners = list()
for i, c, a in zip(nids, self.containers, self.curr_scenario.ad_section.adcs):
a.apollo_container = c.container_name
self.__runners.append(
ApolloRunner(
nid=i,
ctn=c,
start=a.initial_position,
waypoints=a.waypoints,
start_time=a.start_t
)
)
# initialize Apollo instances
threads = list()
for index in range(len(self.__runners)):
threads.append(threading.Thread(
target=self.__runners[index].initialize
))
for t in threads:
t.start()
for t in threads:
t.join()
# remove Apollo logs
clean_appolo_dir()
# initialize pedestrian and traffic control manager
self.pm = PedestrianManager(self.curr_scenario.pd_section)
self.tm = TrafficControlManager(self.curr_scenario.tc_section)
self.is_initialized = True
def run_scenario(self, generation_name: str, scenario_name: str, save_record=False) -> List[Tuple[ApolloRunner, ADAgent]]:
"""
Execute the scenario based on the specification
:param str generation_name: name of the generation
:param str scenario_name: name of the scenario
:param bool save_record: whether to save records or not
"""
num_adc = len(self.curr_scenario.ad_section.adcs)
self.logger.info(
f'{num_adc} agents running a scenario G{self.curr_scenario.gid}S{self.curr_scenario.cid}.'
)
if self.curr_scenario is None or not self.is_initialized:
print('Error: No chromosome or not initialized')
return
mbk = MessageBroker(self.__runners)
mbk.spin()
runner_time = 0
scenario_logger = get_scenario_logger()
# starting scenario
if save_record:
for r in self.__runners:
r.container.start_recorder(scenario_name)
# Begin Scenario Cycle
while True:
# Publish TrafficLight
tld = self.tm.get_traffic_configuration(runner_time/1000)
mbk.broadcast(Topics.TrafficLight, tld.SerializeToString())
# Send Routing
for ar in self.__runners:
if ar.should_send_routing(runner_time/1000):
ar.send_routing()
# Print Scenario Time
if runner_time % 100 == 0:
scenario_logger.info(
f'Scenario time: {round(runner_time / 1000, 1)}.')
# Check if scenario exceeded upper limit
if runner_time / 1000 >= SCENARIO_UPPER_LIMIT:
scenario_logger.info('\n')
break
time.sleep(0.1)
runner_time += 100
if save_record:
for r in self.__runners:
r.container.stop_recorder()
# buffer period for recorders to stop
time.sleep(2)
save_record_files_and_chromosome(
generation_name, scenario_name, self.curr_scenario.to_dict())
# scenario ended
mbk.stop()
for runner in self.__runners:
runner.stop('MAIN')
self.logger.debug(
f'Scenario ended. Length: {round(runner_time/1000, 2)} seconds.')
self.is_initialized = False
return list(zip(self.__runners, self.curr_scenario.ad_section.adcs))