Skip to content

Commit

Permalink
Opendrive map support from SUMO (#744)
Browse files Browse the repository at this point in the history
* sumo mpa support

* format

* add doc string

* format
  • Loading branch information
QuanyiLi authored Jul 21, 2024
1 parent 05ec6c8 commit fa62fc6
Show file tree
Hide file tree
Showing 5 changed files with 452 additions and 1 deletion.
2 changes: 1 addition & 1 deletion metadrive/component/map/scenario_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ScenarioMap(BaseMap):
def __init__(self, map_index, map_data, random_seed=None):
self.map_index = map_index
self.map_data = map_data
self.need_lane_localization = self.engine.global_config["need_lane_localization"]
self.need_lane_localization = self.engine.global_config.get("need_lane_localization", False)
super(ScenarioMap, self).__init__(dict(id=self.map_index), random_seed=random_seed)

def show_coordinates(self):
Expand Down
42 changes: 42 additions & 0 deletions metadrive/manager/sumo_map_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from metadrive.component.map.scenario_map import ScenarioMap
from metadrive.manager.base_manager import BaseManager
from metadrive.utils.sumo.map_utils import extract_map_features, RoadLaneJunctionGraph


class SumoMapManager(BaseManager):
"""
It currently only support load one map into the simulation.
"""
PRIORITY = 0 # Map update has the most high priority

def __init__(self, sumo_map_path):
"""
Init the map manager. It can be extended to manage more maps
"""
super(SumoMapManager, self).__init__()
self.current_map = None
self.graph = RoadLaneJunctionGraph(sumo_map_path)
self.map_feature = extract_map_features(self.graph)

def destroy(self):
"""
Delete the map manager
"""
self.current_map.destroy()
super(SumoMapManager, self).destroy()
self.current_map = None

def before_reset(self):
"""
Detach existing maps
"""
if self.current_map:
self.current_map.detach_from_world()

def reset(self):
"""
Rebuild the map and load it into the scene
"""
if not self.current_map:
self.current_map = ScenarioMap(map_index=0, map_data=self.map_feature)
self.current_map.attach_to_world()
50 changes: 50 additions & 0 deletions metadrive/tests/vis_env/vis_sumo_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""use netconvert --opendrive-files CARLA_town01.net.xml first"""

from metadrive.envs import BaseEnv
from metadrive.obs.observation_base import DummyObservation
import logging
from metadrive.manager.sumo_map_manager import SumoMapManager
from metadrive.engine.asset_loader import AssetLoader


class MyEnv(BaseEnv):
def reward_function(self, agent):
"""Dummy reward function."""
return 0, {}

def cost_function(self, agent):
"""Dummy cost function."""
return 0, {}

def done_function(self, agent):
"""Dummy done function."""
return False, {}

def get_single_observation(self):
"""Dummy observation function."""
return DummyObservation()

def setup_engine(self):
"""Register the map manager"""
super().setup_engine()
map_path = AssetLoader.file_path("carla", "CARLA_town01.net.xml", unix_style=False)
self.engine.register_manager("map_manager", SumoMapManager(map_path))


if __name__ == "__main__":
# create env
env = MyEnv(
dict(
use_render=True,
# if you have a screen and OpenGL suppor, you can set use_render=True to use 3D rendering
vehicle_config={"spawn_position_heading": [(0, 0), 0]},
manual_control=True, # we usually manually control the car to test environment
use_mesh_terrain=True,
log_level=logging.CRITICAL
)
) # suppress logging message
env.reset()
for i in range(10000):
# step
obs, reward, termination, truncate, info = env.step(env.action_space.sample())
env.close()
Empty file.
Loading

0 comments on commit fa62fc6

Please sign in to comment.