Skip to content

Added support for baked collision checking. #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 12, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 42 additions & 34 deletions src/or_trajopt/planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
IkParameterizationType)
import os
import prpy.util
from prpy.collision import DefaultRobotCollisionCheckerFactory
from prpy.planning.retimer import HauserParabolicSmoother
from prpy.planning.base import (BasePlanner,
MetaPlanner,
Expand Down Expand Up @@ -58,18 +59,23 @@ class CostType(enum.Enum):


class TrajoptWrapper(MetaPlanner):
def __init__(self, planner):
def __init__(self, planner, robot_checker_factory=None):
"""
Create a PrPy binding that wraps an existing planner and calls
its planning methods followed by Trajopt's OptimizeTrajectory.

@param planner the PrPy plan wrapper that will be wrapped
"""
assert planner

if robot_checker_factory is None:
robot_checker_factory = DefaultRobotCollisionCheckerFactory

# TODO: this should be revisited once the MetaPlanners are not assuming
# self._planners must exist.
self._planners = (planner,)
self._trajopt = TrajoptPlanner()
self._trajopt = TrajoptPlanner(
robot_checker_factory=robot_checker_factory)
self._simplifier = HauserParabolicSmoother(timelimit=0.25)

def __str__(self):
Expand Down Expand Up @@ -116,7 +122,7 @@ def plan(self, method, args, kwargs):


class TrajoptPlanner(BasePlanner):
def __init__(self):
def __init__(self, robot_checker_factory=None):
"""
Create a PrPy binding to the Trajopt motion optimization package.

Expand All @@ -125,6 +131,11 @@ def __init__(self):
"""
super(TrajoptPlanner, self).__init__()

if robot_checker_factory is None:
robot_checker_factory = DefaultRobotCollisionCheckerFactory

self.robot_checker_factory = robot_checker_factory

def __str__(self):
return 'Trajopt'

Expand Down Expand Up @@ -164,7 +175,7 @@ def _addFunction(problem, timestep, i_dofs, n_dofs, fnargs):
ValueError('Invalid cost or constraint type: {:s}'
.format(str(fntype)))

def _Plan(self, robot, request,
def _Plan(self, robot, robot_checker, request,
Copy link
Member

@psigen psigen Aug 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do the internal methods all pass robot_checker, but the public methods all use the self reference?

Copy link
Member Author

@mkoval mkoval Aug 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.robot_collision_checker is a factory that creates a robot_checker context manager. This additional level of indirection allows us to do baking in __enter__ and cleanup baking in __exit__. It also lets us wrap the __enter__ and __exit__ methods of a CollisionOptionsStateSaver to enable CollisionOptions.ActiveDOFs.

All of the internal methods take robot_checker so we only do the baking once per query. The usage looks pattern looks like this:

with self.robot_collision_checker(robot) as robot_checker:
    return self._Plan(robot, robot_checker, request, **kwargs)
    # _Plan internally forwards robot_checker to other functions.

The name robot_collision_checker and robot_checker is confusing, but is chosen to be consistent with the other planners in PrPy. @cdellin and I discussed this in personalrobotics/prpy#343, but did not come up with a better name. I would prefer to stick with these names for now and refactor everything (including all of the planners in PrPy) at once.

traj_constraints=(), goal_constraints=(),
traj_costs=(), goal_costs=(),
interactive=False, constraint_threshold=1e-4,
Expand Down Expand Up @@ -291,7 +302,8 @@ def _Plan(self, robot, request,
sampling_func=sampling_func)

for _, q_check in checkpoints:
self._checkCollisionForIKSolutions(robot, [q_check])
self._checkCollisionForIKSolutions(
robot, robot_checker, [q_check])

# Convert the waypoints to a trajectory.
prpy.util.SetTrajectoryTags(traj, {
Expand Down Expand Up @@ -350,7 +362,9 @@ def PlanToConfiguration(self, robot, goal, **kwargs):
"endpoint": goal.tolist()
}
}
return self._Plan(robot, request, **kwargs)

with self.robot_checker_factory(robot) as robot_checker:
return self._Plan(robot, robot_checker, request, **kwargs)

@ClonedPlanningMethod
def PlanToIK(self, robot, pose, **kwargs):
Expand All @@ -367,7 +381,8 @@ def PlanToIK(self, robot, pose, **kwargs):
or press escape to disable further plotting
@return traj a trajectory from current configuration to specified pose
"""
return self._PlanToIK(robot, pose, **kwargs)
with self.robot_checker_factory(robot) as robot_checker:
return self._PlanToIK(robot, robot_checker, pose, **kwargs)

@ClonedPlanningMethod
def PlanToEndEffectorPose(self, robot, pose, **kwargs):
Expand All @@ -382,19 +397,17 @@ def PlanToEndEffectorPose(self, robot, pose, **kwargs):
or press escape to disable further plotting
@return traj a trajectory from current configuration to specified pose
"""
return self._PlanToIK(robot, pose, **kwargs)

def _PlanToIK(self, robot, pose,
ranker=None, **kwargs):
with self.robot_checker_factory(robot) as robot_checker:
return self._PlanToIK(robot, robot_checker, pose, **kwargs)

def _PlanToIK(self, robot, robot_checker, pose, ranker=None, **kwargs):
# Plan using the active manipulator.
with robot.GetEnv():
manipulator = robot.GetActiveManipulator()
manipulator = robot.GetActiveManipulator()

# Distance from current configuration is default ranking.
if ranker is None:
from prpy.ik_ranking import NominalConfiguration
ranker = NominalConfiguration(manipulator.GetArmDOFValues())
# Distance from current configuration is default ranking.
if ranker is None:
from prpy.ik_ranking import NominalConfiguration
ranker = NominalConfiguration(manipulator.GetArmDOFValues())

# Find initial collision-free IK solution.
ik_param = IkParameterization(
Expand All @@ -403,7 +416,7 @@ def _PlanToIK(self, robot, pose,
ik_param, IkFilterOptions.CheckEnvCollisions)
if not len(ik_solutions):
# Identify collision and raise error.
self._raiseCollisionErrorForPose(robot, pose)
self._raiseCollisionErrorForPose(robot, robot_checker, pose)

# Sort the IK solutions in ascending order by the costs returned by the
# ranker. Lower cost solutions are better and infinite cost solutions
Expand Down Expand Up @@ -461,11 +474,11 @@ def _PlanToIK(self, robot, pose,
}
}

# Set active DOFs to match active manipulator and plan.

p = openravepy.KinBody.SaveParameters
with robot.CreateRobotStateSaver(p.ActiveDOF):
robot.SetActiveDOFs(manipulator.GetArmIndices())
return self._Plan(robot, request, **kwargs)
return self._Plan(robot, robot_checker, request, **kwargs)

@ClonedPlanningMethod
def OptimizeTrajectory(self, robot, traj,
Expand Down Expand Up @@ -519,7 +532,8 @@ def OptimizeTrajectory(self, robot, traj,
"data": waypoints
}
}
return self._Plan(robot, request, **kwargs)
with self.robot_collision_checker(robot) as robot_checker:
return self._Plan(robot, robot_checker, request, **kwargs)

def _WaypointsToTraj(self, robot, waypoints):
"""Converts a list of waypoints to an OpenRAVE trajectory."""
Expand All @@ -530,7 +544,7 @@ def _WaypointsToTraj(self, robot, waypoints):
traj.Insert(i, waypoint)
return traj

def _raiseCollisionErrorForPose(self, robot, pose):
def _raiseCollisionErrorForPose(self, robot, robot_checker, pose):
""" Identify collision for pose and raise error.
It should be called only when there is no IK solution and collision is expected.
"""
Expand All @@ -556,23 +570,17 @@ def _raiseCollisionErrorForPose(self, robot, pose):
releasegil = True)
raise PlanningError(str(ik_return.GetAction())) #this most likely is JointLimit

self._checkCollisionForIKSolutions(robot, map(lambda x: x.GetSolution(), ik_returns))
self._checkCollisionForIKSolutions(robot, robot_checker,
map(lambda x: x.GetSolution(), ik_returns))
raise Exception('Collision/JointLimit error expected but not found.')

def _checkCollisionForIKSolutions(self, robot, ik_solutions):
def _checkCollisionForIKSolutions(self, robot, robot_checker, ik_solutions):
""" Raise collision/joint limit error if there is one in ik_solutions
Should be called while saving robot's current state
"""
from openravepy import CollisionReport
manipulator = robot.GetActiveManipulator()
p = openravepy.KinBody.SaveParameters

with robot.CreateRobotStateSaver(p.LinkTransformation):
for q in ik_solutions:
robot.SetActiveDOFValues(q)
report = CollisionReport()
env = robot.GetEnv()
if env.CheckCollision(robot, report=report):
raise CollisionPlanningError.FromReport(report)
elif robot.CheckSelfCollision(report=report):
raise SelfCollisionPlanningError.FromReport(report)
for q in ik_solutions:
robot.SetActiveDOFValues(q)
robot_checker.VerifyCollisionFree()