-
Notifications
You must be signed in to change notification settings - Fork 97
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Hello] Add continuous trajectory tracking & teleop (#1371)
* Velocity control * Add keyboard teleop script * Debug * Hack * Debug controller * debug controller * Disable stow * Fix teleop script * Debug teleop * Wheel control test * tmp * Fix teleop once and for all * Clean up, adapt ros version * Fix * Pub to cmd_vel * Debug * debug * remove debug print * remove debug print * Clean up teleop API * Expose vel and rvel * Clean up teleop API * Initial goto controller impl * Add params * Yaw tracking modes * typing * Set tolerances * Make static methods * Add tolerances to control * Debug * Tune heading tracking * Remove acceleration and tol * Debug * Tune tol * Add turn rate limit and update tol * Tune controller * Update comments * Improve teleop * Add comment * Add docstrings and input checks * Debug docstring * Improve yaw tracking performance * Add option to pause controller * Tune teleop * Tune rate * Tune hz
- Loading branch information
Showing
5 changed files
with
318 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import sys | ||
import time | ||
|
||
from pynput import keyboard | ||
import numpy as np | ||
|
||
|
||
UP = keyboard.Key.up | ||
DOWN = keyboard.Key.down | ||
LEFT = keyboard.Key.left | ||
RIGHT = keyboard.Key.right | ||
ESC = keyboard.Key.esc | ||
|
||
HZ_DEFAULT = 15 | ||
|
||
# 6 * v_max + w_max <= 1.8 (computed from max wheel vel & vel diff required for w) | ||
VEL_MAX_DEFAULT = 0.20 | ||
RVEL_MAX_DEFAULT = 0.45 | ||
|
||
|
||
class RobotController: | ||
def __init__( | ||
self, | ||
mover, | ||
vel_max=None, | ||
rvel_max=None, | ||
hz=HZ_DEFAULT, | ||
): | ||
# Params | ||
self.dt = 1.0 / hz | ||
self.vel_max = vel_max or VEL_MAX_DEFAULT | ||
self.rvel_max = rvel_max or RVEL_MAX_DEFAULT | ||
|
||
# Robot | ||
print("Connecting to robot...") | ||
self.robot = mover.bot | ||
print("Connected.") | ||
|
||
# Keyboard | ||
self.key_states = {key: 0 for key in [UP, DOWN, LEFT, RIGHT]} | ||
|
||
# Controller states | ||
self.alive = True | ||
self.vel = 0 | ||
self.rvel = 0 | ||
|
||
def on_press(self, key): | ||
if key in self.key_states: | ||
self.key_states[key] = 1 | ||
elif key == ESC: | ||
self.alive = False | ||
return False # returning False from a callback stops listener | ||
|
||
def on_release(self, key): | ||
if key in self.key_states: | ||
self.key_states[key] = 0 | ||
|
||
def run(self): | ||
print( | ||
"(+[__]o) Teleoperation started. Use arrow keys to control robot, press ESC to exit. ^o^" | ||
) | ||
while self.alive: | ||
# Map keystrokes | ||
vert_sign = self.key_states[UP] - self.key_states[DOWN] | ||
hori_sign = self.key_states[LEFT] - self.key_states[RIGHT] | ||
|
||
# Compute velocity commands | ||
self.vel = self.vel_max * vert_sign | ||
self.rvel = self.rvel_max * hori_sign | ||
|
||
# Command robot | ||
self.robot.set_velocity(self.vel, self.rvel) | ||
|
||
# Spin | ||
time.sleep(self.dt) | ||
|
||
|
||
def run_teleop(mover, vel=None, rvel=None): | ||
robot_controller = RobotController(mover, vel, rvel) | ||
listener = keyboard.Listener( | ||
on_press=robot_controller.on_press, | ||
on_release=robot_controller.on_release, | ||
suppress=True, # suppress terminal outputs | ||
) | ||
|
||
# Start teleop | ||
listener.start() | ||
robot_controller.run() | ||
|
||
# Cleanup | ||
listener.join() | ||
print("(+[__]o) Teleoperation ended. =_=") | ||
|
||
|
||
if __name__ == "__main__": | ||
from droidlet.lowlevel.hello_robot.hello_robot_mover import HelloRobotMover | ||
|
||
mover = HelloRobotMover(ip=sys.argv[1]) | ||
run_teleop(mover) |
157 changes: 157 additions & 0 deletions
157
droidlet/lowlevel/hello_robot/remote/goto_controller.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
from typing import List, Optional | ||
import time | ||
import threading | ||
|
||
import numpy as np | ||
import rospy | ||
|
||
V_MAX_DEFAULT = 0.15 # base.params["motion"]["default"]["vel_m"] | ||
W_MAX_DEFAULT = 0.45 # (vel_m_max - vel_m_default) / wheel_separation_m | ||
|
||
|
||
class GotoVelocityController: | ||
def __init__( | ||
self, | ||
robot, | ||
hz: float, | ||
v_max: Optional[float] = None, | ||
w_max: Optional[float] = None, | ||
): | ||
self.robot = robot | ||
self.hz = hz | ||
self.dt = 1.0 / self.hz | ||
|
||
# Params | ||
self.v_max = v_max or V_MAX_DEFAULT | ||
self.w_max = w_max or W_MAX_DEFAULT | ||
self.lin_error_tol = self.v_max / hz | ||
self.ang_error_tol = self.w_max / hz | ||
|
||
# Initialize | ||
self.loop_thr = None | ||
self.control_lock = threading.Lock() | ||
self.active = False | ||
|
||
self.xyt_err = np.zeros(3) | ||
self.track_yaw = True | ||
|
||
@staticmethod | ||
def _error_velocity_multiplier(x_err, tol=0.0): | ||
""" | ||
Computes velocity multiplier based on distance from target. | ||
Used for both linear and angular motion. | ||
Current implementation: Simple thresholding | ||
Output = 1 if linear error is larger than the tolerance, 0 otherwise. | ||
""" | ||
assert x_err >= 0.0 | ||
return float(x_err - tol > 0) | ||
|
||
@staticmethod | ||
def _projection_velocity_multiplier(theta_err, tol=0.0): | ||
""" | ||
Compute velocity muliplier based on yaw (faster if facing towards target). | ||
Used to control linear motion. | ||
Current implementation: | ||
Output = 1 when facing target, gradually decreases to 0 when angle to target is pi/3. | ||
""" | ||
assert theta_err >= 0.0 | ||
return 1.0 - np.sin(min(max(theta_err - tol, 0.0) * 2.0, np.pi / 3.0)) | ||
|
||
@staticmethod | ||
def _turn_rate_limit(w_max, lin_err, heading_err): | ||
""" | ||
Computed velocity limit based on the turning radius required to reach goal. | ||
""" | ||
assert lin_err >= 0.0 | ||
assert heading_err >= 0.0 | ||
return w_max * lin_err / np.sin(heading_err) + 1e-5 / 2.0 | ||
|
||
def _integrate_state(self, v, w): | ||
""" | ||
Predict error in the next timestep with current commanded velocity | ||
""" | ||
dx = v * self.dt | ||
dtheta = w * self.dt | ||
|
||
x_err_f0 = self.xyt_err[0] - dx * np.cos(dtheta / 2.0) | ||
y_err_f0 = self.xyt_err[1] - dx * np.sin(dtheta / 2.0) | ||
ct = np.cos(-dtheta) | ||
st = np.sin(-dtheta) | ||
|
||
self.xyt_err[0] = ct * x_err_f0 - st * y_err_f0 | ||
self.xyt_err[1] = st * x_err_f0 + ct * y_err_f0 | ||
self.xyt_err[2] = self.xyt_err[2] - dtheta if self.track_yaw else 0.0 | ||
|
||
def _run(self): | ||
rate = rospy.Rate(self.hz) | ||
|
||
while True: | ||
v_cmd = w_cmd = 0 | ||
|
||
lin_err_abs = np.linalg.norm(self.xyt_err[0:2]) | ||
ang_err = self.xyt_err[2] | ||
ang_err_abs = abs(ang_err) | ||
|
||
# Go to goal XY position if not there yet | ||
if lin_err_abs > self.lin_error_tol: | ||
heading_err = np.arctan2(self.xyt_err[1], self.xyt_err[0]) | ||
heading_err_abs = abs(heading_err) | ||
|
||
# Compute linear velocity | ||
k_t = self._error_velocity_multiplier(lin_err_abs, tol=self.lin_error_tol) | ||
k_p = self._projection_velocity_multiplier(heading_err_abs, tol=self.ang_error_tol) | ||
v_limit = self._turn_rate_limit(self.w_max, lin_err_abs, heading_err_abs) | ||
v_cmd = min(k_t * k_p * self.v_max, v_limit) | ||
|
||
# Compute angular velocity | ||
k_t_ang = self._error_velocity_multiplier(heading_err_abs, tol=self.ang_error_tol) | ||
w_cmd = np.sign(heading_err) * k_t_ang * self.w_max | ||
|
||
# Rotate to correct yaw if yaw tracking is on and XY position is at goal | ||
elif ang_err_abs > self.ang_error_tol: | ||
# Compute angular velocity | ||
k_t_ang = self._error_velocity_multiplier(ang_err_abs, tol=self.ang_error_tol) | ||
w_cmd = np.sign(ang_err) * k_t_ang * self.w_max | ||
|
||
# Command robot | ||
with self.control_lock: | ||
self.robot.set_velocity(v_cmd, w_cmd) | ||
|
||
# Update odometry prediction | ||
self._integrate_state(v_cmd, w_cmd) | ||
|
||
# Spin | ||
rate.sleep() | ||
|
||
def check_at_goal(self) -> bool: | ||
xy_fulfilled = np.linalg.norm(self.xyt_err[0:2]) <= self.lin_error_tol | ||
|
||
t_fulfilled = True | ||
if self.track_yaw: | ||
t_fulfilled = abs(self.xyt_err[2]) <= self.ang_error_tol | ||
|
||
return xy_fulfilled and t_fulfilled | ||
|
||
def set_goal( | ||
self, | ||
xyt_position: List[float], | ||
): | ||
self.xyt_err = xyt_position | ||
if not self.track_yaw: | ||
self.xyt_err[2] = 0.0 | ||
|
||
def enable_yaw_tracking(self, value: bool = True): | ||
self.track_yaw = value | ||
|
||
def start(self): | ||
if self.loop_thr is None: | ||
self.loop_thr = threading.Thread(target=self._run) | ||
self.loop_thr.start() | ||
self.active = True | ||
|
||
def pause(self): | ||
self.active = False | ||
with self.control_lock: | ||
self.robot.set_velocity(0.0, 0.0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters