Skip to content

Commit

Permalink
Add new generic junction and connecting road operators
Browse files Browse the repository at this point in the history
Also do some refactoring along the way.
  • Loading branch information
johschmitz committed Oct 3, 2022
1 parent 95710dc commit db4c2e4
Show file tree
Hide file tree
Showing 34 changed files with 1,155 additions and 306 deletions.
33 changes: 24 additions & 9 deletions addon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import os

from . export import DSC_OT_export
from . junction import DSC_OT_junction
from . junction_four_way import DSC_OT_junction_four_way
from . modal_junction_generic import DSC_OT_junction_generic
from . junction_connection import DSC_OT_junction_connection
from . object_bicycle import DSC_OT_object_bicycle
from . object_car import DSC_OT_object_car
from . object_motorbike import DSC_OT_object_motorbike
Expand Down Expand Up @@ -65,8 +67,9 @@ def draw(self, context):
global custom_icons

layout = self.layout
layout.label(text='OpenDRIVE')
box = layout.box()
box.label(text='Road primitives (OpenDRIVE)')
box.label(text='Roads')
row = box.row(align=True)
row.operator('dsc.road_properties_popup', text='Straight',
icon_value=custom_icons['road_straight'].icon_id).operator = 'road_straight'
Expand All @@ -83,11 +86,20 @@ def draw(self, context):
row.operator('dsc.road_parametric_polynomial', text='Parametric polynomial',
icon_value=custom_icons['road_parametric_polynomial'].icon_id)
row = box.row(align=True)
row.operator('dsc.junction', text='Junction', icon_value=custom_icons['junction'].icon_id)
row.label(text='Junctions')
row = box.row(align=True)
row.operator('dsc.junction_four_way', text='4-way junction',
icon_value=custom_icons['junction_4way'].icon_id)
row = box.row(align=True)
row.operator('dsc.junction_generic', text='Generic junction (area)',
icon_value=custom_icons['junction_area'].icon_id)
row = box.row(align=True)
row.operator('dsc.road_properties_popup', text='Junction connection',
icon_value=custom_icons['junction_connection'].icon_id).operator = 'junction_connection'

layout.label(text='OpenSCENARIO')
box = layout.box()
box.label(text='Objects (OpenSCENARIO)')
box.label(text='Objects')
row = box.row(align=True)
row.operator('dsc.object_properties_popup', text='Car').operator = 'object_car'
row = box.row(align=True)
Expand All @@ -99,15 +111,14 @@ def draw(self, context):
row = box.row(align=True)
row.operator('dsc.object_pedestrian')

box = layout.box()
box.label(text='Trajectories (OpenSCENARIO)')
box.label(text='Trajectories')
row = box.row(align=True)
row.operator('dsc.trajectory_polyline', icon_value=custom_icons['trajectory_polyline'].icon_id)
row = box.row(align=True)
row.operator('dsc.trajectory_nurbs', icon_value=custom_icons['trajectory_nurbs'].icon_id)

layout.label(text='Export (Track, Scenario, Mesh)')
box = layout.box()
box.label(text='Export (Track, Scenario, Mesh)')
row = box.row(align=True)
row.operator('dsc.export_driving_scenario', icon='EXPORT')

Expand All @@ -117,7 +128,9 @@ def menu_func_export(self, context):
classes = (
DSC_enum_lane,
DSC_OT_export,
DSC_OT_junction,
DSC_OT_junction_four_way,
DSC_OT_junction_generic,
DSC_OT_junction_connection,
DSC_OT_object_bicycle,
DSC_OT_object_car,
DSC_OT_object_motorbike,
Expand Down Expand Up @@ -145,7 +158,9 @@ def register():
custom_icons.load('road_arc', os.path.join(icons_dir, 'road_arc.png'), 'IMAGE')
custom_icons.load('road_clothoid', os.path.join(icons_dir, 'road_clothoid.png'), 'IMAGE')
custom_icons.load('road_parametric_polynomial', os.path.join(icons_dir, 'road_parametric_polynomial.png'), 'IMAGE')
custom_icons.load('junction', os.path.join(icons_dir, 'junction.png'), 'IMAGE')
custom_icons.load('junction_4way', os.path.join(icons_dir, 'junction_4way.png'), 'IMAGE')
custom_icons.load('junction_area', os.path.join(icons_dir, 'junction_area.png'), 'IMAGE')
custom_icons.load('junction_connection', os.path.join(icons_dir, 'junction_connection.png'), 'IMAGE')
custom_icons.load('trajectory_nurbs', os.path.join(icons_dir, 'trajectory_nurbs.png'), 'IMAGE')
custom_icons.load('trajectory_polyline', os.path.join(icons_dir, 'trajectory_polyline.png'), 'IMAGE')

Expand Down
146 changes: 105 additions & 41 deletions addon/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from scenariogeneration import xosc
from scenariogeneration import xodr
from scenariogeneration import ScenarioGenerator

from mathutils import Vector
from math import pi
Expand Down Expand Up @@ -69,6 +68,13 @@
'car': xosc.VehicleCategory.car,
}

mapping_contact_point = {
'cp_start_l': xodr.ContactPoint.start,
'cp_start_r': xodr.ContactPoint.start,
'cp_end_l': xodr.ContactPoint.end,
'cp_end_r': xodr.ContactPoint.end,
}

class DSC_OT_export(bpy.types.Operator):
bl_idname = 'dsc.export_driving_scenario'
bl_label = 'Export driving scenario'
Expand Down Expand Up @@ -276,7 +282,7 @@ def export_openscenario(self):
cp_type = xodr.ContactPoint.end
else:
cp_type = None
if not 'id_xodr_direct_junction_start' in obj:
if not 'id_direct_junction_start' in obj:
road.add_predecessor(element_type, obj['link_predecessor_id_l'], cp_type)
if 'link_predecessor_id_r' in obj:
element_type = self.get_element_type_by_id(obj['link_predecessor_id_r'])
Expand All @@ -288,7 +294,7 @@ def export_openscenario(self):
cp_type = xodr.ContactPoint.end
else:
cp_type = None
if not 'id_xodr_direct_junction_start' in obj:
if not 'id_direct_junction_start' in obj:
road.add_predecessor(element_type, obj['link_predecessor_id_r'], cp_type)
if 'link_successor_id_l' in obj:
element_type = self.get_element_type_by_id(obj['link_successor_id_l'])
Expand All @@ -300,7 +306,7 @@ def export_openscenario(self):
cp_type = xodr.ContactPoint.end
else:
cp_type = None
if not 'id_xodr_direct_junction_end' in obj:
if not 'id_direct_junction_end' in obj:
road.add_successor(element_type, obj['link_successor_id_l'], cp_type)
if 'link_successor_id_r' in obj:
element_type = self.get_element_type_by_id(obj['link_successor_id_r'])
Expand All @@ -312,14 +318,14 @@ def export_openscenario(self):
cp_type = xodr.ContactPoint.end
else:
cp_type = None
if not 'id_xodr_direct_junction_end' in obj:
if not 'id_direct_junction_end' in obj:
road.add_successor(element_type, obj['link_successor_id_r'], cp_type)
if 'id_xodr_direct_junction_start' in obj:
if 'id_direct_junction_start' in obj:
# Connect to direction junction attached to the other (split) road
road.add_predecessor(xodr.ElementType.junction, obj['id_xodr_direct_junction_start'])
if 'id_xodr_direct_junction_end' in obj:
road.add_predecessor(xodr.ElementType.junction, obj['id_direct_junction_start'])
if 'id_direct_junction_end' in obj:
# Connect to direction junction attached to the other (split) road
road.add_successor(xodr.ElementType.junction, obj['id_xodr_direct_junction_end'])
road.add_successor(xodr.ElementType.junction, obj['id_direct_junction_end'])
print('Add road with ID', obj['id_xodr'])
odr.add_road(road)
roads.append(road)
Expand All @@ -330,15 +336,15 @@ def export_openscenario(self):
if ('link_predecessor_id_l' in obj and 'link_predecessor_id_r' in obj) \
or ('link_successor_id_l' in obj and 'link_successor_id_r' in obj):
if obj['road_split_type'] == 'end':
junction_id = obj['id_xodr_direct_junction_end']
junction_id = obj['id_direct_junction_end']
road_out_id_l = obj['link_successor_id_l']
road_out_cp_l = obj['link_successor_cp_l']
road_out_id_r = obj['link_successor_id_r']
road_out_cp_r = obj['link_successor_cp_r']
road_in_cp_l = 'cp_end_l'
road_in_cp_r = 'cp_end_r'
elif obj['road_split_type'] == 'start':
junction_id = obj['id_xodr_direct_junction_start']
junction_id = obj['id_direct_junction_start']
road_out_id_l = obj['link_predecessor_id_l']
road_out_cp_l = obj['link_predecessor_cp_l']
road_out_id_r = obj['link_predecessor_id_r']
Expand Down Expand Up @@ -368,13 +374,15 @@ def export_openscenario(self):
num_junctions = 0
if helpers.collection_exists(['OpenDRIVE']):
for obj in bpy.data.collections['OpenDRIVE'].objects:
if obj.name.startswith('junction'):
# Export simple 4-way junctions
# TODO later unify with generic junction
if obj.name.startswith('junction_4way'):
incoming_roads = []
angles = []
junction_id = obj['id_xodr']
# Create junction roads based on incoming road angles (simple 4-way for now)
# 0 angle road must point in 'right' direction
for idx,cp in enumerate(['cp_right','cp_up','cp_left','cp_down']):
for idx, cp in enumerate(['cp_right','cp_up','cp_left','cp_down']):
if cp in obj['incoming_roads']:
inc_road = xodr.get_road_by_id(roads, obj['incoming_roads'][cp])
if(inc_road != None):
Expand All @@ -383,36 +391,11 @@ def export_openscenario(self):
else:
self.report({'WARNING'}, 'Junction with ID {} is missing a connection.'.format(obj['id_xodr']))
# Create connecting roads and link them to incoming roads
# TODO use API to calculate junction road parameters
junction_roads = xodr.create_junction_roads_standalone(angles, 3.75, junction_id,
spiral_part=0.01, arc_part=0.99, startnum=1000+6*num_junctions, lane_width=3.75)
self.add_junction_roads_elevation(junction_roads, obj['elevation_level'])
i = 0
for j in range(len(incoming_roads) - 1):
for k in range(j + 1, len(incoming_roads)):
# FIXME this will create problems when a single road is
# connected to a junction twice
if incoming_roads[j].predecessor:
if incoming_roads[j].predecessor.element_id == junction_id:
cp_type_j = xodr.ContactPoint.start
if incoming_roads[j].successor:
if incoming_roads[j].successor.element_id == junction_id:
cp_type_j = xodr.ContactPoint.end
if incoming_roads[k].predecessor:
if incoming_roads[k].predecessor.element_id == junction_id:
cp_type_k = xodr.ContactPoint.start
if incoming_roads[k].successor:
if incoming_roads[k].successor.element_id == junction_id:
cp_type_k = xodr.ContactPoint.end
# Link incoming with connecting road
junction_roads[i].add_predecessor(
xodr.ElementType.road, incoming_roads[j].id, cp_type_j)
# FIXME is redundant lane linking needed?
xodr.create_lane_links(junction_roads[i], incoming_roads[j])
junction_roads[i].add_successor(
xodr.ElementType.road, incoming_roads[k].id, cp_type_k)
# FIXME is redundant lane linking needed?
xodr.create_lane_links(junction_roads[i], incoming_roads[k])
i += 1
self.add_junction_roads_connections_4way(incoming_roads, junction_roads, junction_id)
# Finally create the junction
junction = xodr.create_junction(
junction_roads, junction_id, incoming_roads, 'junction_' + str(junction_id))
Expand All @@ -421,6 +404,54 @@ def export_openscenario(self):
odr.add_junction(junction)
for road in junction_roads:
odr.add_road(road)
# Export generic junctions
if obj.name.startswith('junction_area'):
incoming_roads = []
junction_id = obj['id_xodr']
for joint in obj['joints']:
inc_road = xodr.get_road_by_id(roads, joint['id_incoming'])
if(inc_road != None):
incoming_roads.append(inc_road)
else:
self.report({'WARNING'}, 'Junction with ID {} is missing a connection.'.format(obj['id_xodr']))
# Find and export connecting roads of this junction
junction_roads = []
for obj_jcr in bpy.data.collections['OpenDRIVE'].objects:
if obj_jcr.name.startswith('junction_connecting_road'):
if obj_jcr['id_junction'] == junction_id:
# Create a junction connecting road
# TODO for now we use a single spiral, later we should use arc - spiral - arc
planview = xodr.PlanView()
planview.set_start_point(obj_jcr['geometry']['point_start'][0],
obj_jcr['geometry']['point_start'][1],obj_jcr['geometry']['heading_start'])
geometry = xodr.Spiral(obj_jcr['geometry']['curvature_start'],
obj_jcr['geometry']['curvature_end'], length=obj_jcr['geometry']['length'])
planview.add_geometry(geometry)
lanes = self.create_lanes(obj_jcr)
road = xodr.Road(obj_jcr['id_xodr'],planview,lanes, road_type=junction_id)
self.add_elevation_profiles(obj_jcr, road)
# Connect the junction connecting road to incoming and connecting roads
incoming_road = self.get_road_by_id(roads, obj_jcr['link_predecessor_id_l'])
contact_point = mapping_contact_point[obj_jcr['link_predecessor_cp_l']]
road.add_predecessor(xodr.ElementType.road, incoming_road.id, contact_point)
# FIXME esmini currently requires redundant lane linking (in road and junction)
xodr.create_lane_links(road, incoming_road)
incoming_road = self.get_road_by_id(roads, obj_jcr['link_successor_id_l'])
contact_point = mapping_contact_point[obj_jcr['link_successor_cp_l']]
road.add_successor(xodr.ElementType.road, incoming_road.id, contact_point)
# FIXME esmini currently requires redundant lane linking (in road and junction)
xodr.create_lane_links(road, incoming_road)
junction_roads.append(road)
# Create lane links with incoming roads
# Finally create the junction
junction = xodr.create_junction(
junction_roads, junction_id, incoming_roads, 'junction_' + str(junction_id))
num_junctions += 1
print('Add junction with ID', junction_id)
odr.add_junction(junction)
# Junction connecting roads also need to be registered as "normal" roads
for road in junction_roads:
odr.add_road(road)
odr.adjust_startpoints()
odr.write_xml(str(xodr_path))

Expand Down Expand Up @@ -726,7 +757,6 @@ def add_elevation_profiles(self, obj, road):
Add elevation profiles to road
'''
z_global = obj['geometry']['point_start'][2]
print("z_global:", z_global)
for profile in obj['geometry']['elevation']:
# Shift each elevation profile to start at s=0 (use substitution)
# SageMath code:
Expand All @@ -748,6 +778,40 @@ def add_junction_roads_elevation(self, junction_roads, elevation_level):
for road in junction_roads:
road.add_elevation(s=0, a=elevation_level, b=0, c=0, d=0)

def add_junction_roads_connections_4way(self, incoming_roads, junction_roads, junction_id):
'''
Connectin all incoming roads with each other
'''
i = 0
for j in range(len(incoming_roads) - 1):
for k in range(j + 1, len(incoming_roads)):
# FIXME this will create problems when a single road is
# connected to a junction twice
if incoming_roads[j].predecessor:
if incoming_roads[j].predecessor.element_id == junction_id:
cp_type_j = xodr.ContactPoint.start
if incoming_roads[j].successor:
if incoming_roads[j].successor.element_id == junction_id:
cp_type_j = xodr.ContactPoint.end
if incoming_roads[k].predecessor:
if incoming_roads[k].predecessor.element_id == junction_id:
cp_type_k = xodr.ContactPoint.start
if incoming_roads[k].successor:
if incoming_roads[k].successor.element_id == junction_id:
cp_type_k = xodr.ContactPoint.end
# Link incoming with connecting road
junction_roads[i].add_predecessor(
xodr.ElementType.road, incoming_roads[j].id, cp_type_j)
# FIXME is redundant lane linking needed?
xodr.create_lane_links(junction_roads[i], incoming_roads[j])
junction_roads[i].add_successor(
xodr.ElementType.road, incoming_roads[k].id, cp_type_k)
# FIXME is redundant lane linking needed?
xodr.create_lane_links(junction_roads[i], incoming_roads[k])
i += 1



def get_lane_offset(self, road_obj, id_split_road):
'''
Return lane offset of road connected to the split road via direct
Expand Down
Loading

0 comments on commit db4c2e4

Please sign in to comment.