diff --git a/config/level0.toml b/config/level0.toml index d6f67b51..cbec9007 100644 --- a/config/level0.toml +++ b/config/level0.toml @@ -6,6 +6,15 @@ [controller] file = "trajectory_controller.py" # Put your controller file name here. Specifying a controller as argument to scripts will override this setting. Controllers are located in `lsy_drone_racing/control/` +[deploy] +### Settings only relevant for deployment +# Whether to check if gate and obstacle positions observed by vicon are within the limits defined down below. +check_race_track = true +# Whether to check if the drone start position is within the limits specified down below. +check_drone_start_pos = true +# Lets you practice your controller without putting up gates & obstacles, assumes nominal positions given below. +practice_without_track_objects = false + [sim] # Physics options: # "pyb": PyBullet diff --git a/config/level1.toml b/config/level1.toml index 4be5bc6e..bc02b77e 100644 --- a/config/level1.toml +++ b/config/level1.toml @@ -6,6 +6,15 @@ [controller] file = "trajectory_controller.py" # Put your controller file name here. Specifying a controller as argument to scripts will override this setting. Controllers are located in `lsy_drone_racing/control/` +[deploy] +### Settings only relevant for deployment +# Whether to check if gate and obstacle positions observed by vicon are within the limits defined down below. +check_race_track = true +# Whether to check if the drone start position is within the limits specified down below. +check_drone_start_pos = true +# Lets you practice your controller without putting up gates & obstacles, assumes nominal positions given below. +practice_without_track_objects = false + [sim] # Physics options: # "pyb": PyBullet diff --git a/config/level2.toml b/config/level2.toml index ea49bdb3..cababf59 100644 --- a/config/level2.toml +++ b/config/level2.toml @@ -6,6 +6,15 @@ [controller] file = "trajectory_controller.py" # Put your controller file name here. Specifying a controller as argument to scripts will override this setting. Controllers are located in `lsy_drone_racing/control/` +[deploy] +### Settings only relevant for deployment +# Whether to check if gate and obstacle positions observed by vicon are within the limits defined down below. +check_race_track = true +# Whether to check if the drone start position is within the limits specified down below. +check_drone_start_pos = true +# Lets you practice your controller without putting up gates & obstacles, assumes nominal positions given below. +practice_without_track_objects = false + [sim] # Physics options: # "pyb": PyBullet diff --git a/config/level3.toml b/config/level3.toml index 91e560f1..db710a78 100644 --- a/config/level3.toml +++ b/config/level3.toml @@ -4,7 +4,16 @@ # | :-----------------: | :-----------------------: | :-------------------------: | :--------------------: | :---------------: | # | `level3.toml` | **Yes** | **Yes** | **Yes** | Robustness | [controller] -file = "thrust_controller.py" # Put your controller file name here. Specifying a controller as argument to scripts will override this setting. Controllers are located in `lsy_drone_racing/control/` trajectory_controller or thrust_controller.py +file = "trajectory_controller.py" # Put your controller file name here. Specifying a controller as argument to scripts will override this setting. Controllers are located in `lsy_drone_racing/control/` + +[deploy] +### Settings only relevant for deployment +# Whether to check if gate and obstacle positions observed by vicon are within the limits defined down below. +check_race_track = true +# Whether to check if the drone start position is within the limits specified down below. +check_drone_start_pos = true +# Lets you practice your controller without putting up gates & obstacles, assumes nominal positions given below. +practice_without_track_objects = false [sim] # Physics options: @@ -38,12 +47,12 @@ std = 0.01 # high = [0.001, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001] [env] -id = "DroneRacingThrust-v0" # Either "DroneRacing-v0" or "DroneRacingThrust-v0". If using "DroneRacingThrust-v0", the drone will use the thrust controller instead of the position controller. -reseed = false # Whether to re-seed the random number generator between episodes -seed = 1337 # Random seed -freq = 60 # Frequency of the environment's step function, in Hz -symbolic = false # Whether to include symbolic expressions in the info dict. Note: This can interfere with multiprocessing! If you want to parallelize your training, set this to false. -sensor_range = 0.45 # Range at which the exact location of gates and obstacles become visible to the drone. Objects that are not in the drone's sensor range report their nominal position. +id = "DroneRacing-v0" # Either "DroneRacing-v0" or "DroneRacingThrust-v0". If using "DroneRacingThrust-v0", the drone will use the thrust controller instead of the position controller. +reseed = false # Whether to re-seed the random number generator between episodes +seed = 1337 # Random seed +freq = 60 # Frequency of the environment's step function, in Hz +symbolic = false # Whether to include symbolic expressions in the info dict. Note: This can interfere with multiprocessing! If you want to parallelize your training, set this to false. +sensor_range = 0.45 # Range at which the exact location of gates and obstacles become visible to the drone. Objects that are not in the drone's sensor range report their nominal position. [env.track] # Tall gates: 1.0m height. Short gates: 0.525m height. Height is measured from the ground to the diff --git a/lsy_drone_racing/control/closing_controller.py b/lsy_drone_racing/control/closing_controller.py index 8cba544f..f915dee8 100644 --- a/lsy_drone_racing/control/closing_controller.py +++ b/lsy_drone_racing/control/closing_controller.py @@ -8,6 +8,8 @@ from __future__ import annotations # Python 3.10 type hints +import logging +from enum import Enum from typing import TYPE_CHECKING, Union import numpy as np @@ -19,6 +21,17 @@ if TYPE_CHECKING: from numpy.typing import NDArray +logger = logging.getLogger("rosout." + __name__) # Get ROS compatible logger + + +class Mode(Enum): + """Enum class for the different modes of the controller.""" + + STOP = 0 + HOVER = 1 + RETURN = 2 + LAND = 3 + class ClosingController(BaseController): """Controller that creates and follows a braking and homing trajectory.""" @@ -36,28 +49,18 @@ def __init__(self, initial_obs: dict[str, NDArray[np.floating]], initial_info: d self._tick = 0 self.info = initial_info self._freq = self.info["env_freq"] - self._t_step_ctrl = 1/self._freq # control step + self._t_step_ctrl = 1 / self._freq # control step - self.debug = False # print statements + makes the tracks plot in sim - self._obs_x = [] - self._obs_v = [] - self._cmd_x = [] - self._cmd_v = [] - self._cmd_a = [] + self.debug = False # Plot the trajectories in sim. TODO: Make configurable + self._return_height = 2.0 # height of the return path - self._x_end = np.linalg.norm(initial_obs["vel"]) - self._x_end = np.clip(self._x_end, 0.5, 2.0) # distance the drone is supposed to stop behind the last gate - self._z_homing = 2.0 # height of the return path - - self._mode = 0 # 0=stopping, 1=hover, 2=RTH => order will be 0-1-2-1 self._target_pos = initial_obs["pos"] - self._trajectory, self._t_brake = self._generate_stop_trajectory(initial_obs) + self._brake_trajectory, self._t_brake = self._generate_brake_trajectory(initial_obs) + self._return_trajectory = None # Gets generated on the fly based on the current state - self._t_hover = 2.0 # time the drone hovers before RTH and before landing - self._t_RTH = 9.0 # time it takes to get back to start (return to home) - - self.t_total = self._t_brake + self._t_hover + self._t_RTH + self._t_hover - + self._t_hover = 2.0 # time the drone hovers before returning home and before landing + self._t_return = 9.0 # time it takes to get back to start (return to home) + self.t_total = self._t_brake + self._t_hover + self._t_return + self._t_hover def compute_control( self, obs: dict[str, NDArray[np.floating]], info: dict | None = None @@ -73,51 +76,42 @@ def compute_control( The drone state [x, y, z, vx, vy, vz, ax, ay, az, yaw, rrate, prate, yrate] as a numpy array. """ - # t = min(self._tick * self._t_step_ctrl, self._t_total) t = self._tick * self._t_step_ctrl - - # check if we need to switch modes - if t <= self._t_brake and self._mode != 0: - self._mode = 0 - elif t > self._t_brake and t <= self._t_brake+self._t_hover and self._mode != 1: - self._mode = 1 - elif t > self._t_brake+self._t_hover and t <= self._t_brake+self._t_hover+self._t_RTH and self._mode != 2: - self._trajectory = self._generate_RTH_trajectory(obs, t) - self._mode = 2 - elif t > self._t_brake+self._t_hover+self._t_RTH and self._mode != 3: - self._mode = 3 - - # Sample the correct trajectory - if self._mode == 0: # stopping - target_pos = self._trajectory(t, order=0) - target_vel = self._trajectory(t, order=1) - target_acc = self._trajectory(t, order=2) - self._target_pos = obs["pos"] # store for a switch to mode 1 - elif self._mode == 1: # hover + if self.mode == Mode.STOP: + target_pos = self._brake_trajectory(t, order=0) + target_vel = self._brake_trajectory(t, order=1) + target_acc = self._brake_trajectory(t, order=2) + self._target_pos = obs["pos"] # store for hover mode + elif self.mode == Mode.HOVER: target_pos = self._target_pos - target_vel = [0,0,0] - target_acc = [0,0,0] - elif self._mode == 2: # RTH - target_pos = self._trajectory(t) - trajectory_v = self._trajectory.derivative() + target_vel = [0, 0, 0] + target_acc = [0, 0, 0] + elif self.mode == Mode.RETURN: + if self._return_trajectory is None: + self._return_trajectory = self._generate_return_trajectory(obs, t) + target_pos = self._return_trajectory(t) + trajectory_v = self._return_trajectory.derivative() trajectory_a = trajectory_v.derivative() - target_vel = trajectory_v(t)*0 - target_acc = trajectory_a(t)*0 - elif self._mode == 3: # hover over landing pos + target_vel = trajectory_v(t) * 0 + target_acc = trajectory_a(t) * 0 + elif self.mode == Mode.LAND: target_pos = np.array(self.info["drone_start_pos"]) + np.array([0, 0, 0.25]) - target_vel = [0,0,0] - target_acc = [0,0,0] - - if self.debug: - self._obs_x.append(obs["pos"]) - self._obs_v.append(obs["vel"]) - self._cmd_x.append(target_pos) - self._cmd_v.append(target_vel) - self._cmd_a.append(target_acc) - - # return np.concatenate((target_pos, np.zeros(10))) + target_vel = [0, 0, 0] + target_acc = [0, 0, 0] return np.concatenate((target_pos, target_vel, target_acc, np.zeros(4))) + @property + def mode(self) -> Mode: + """Return the current mode of the controller.""" + t = self._tick * self._t_step_ctrl + if t <= self._t_brake: + return Mode.STOP + if self._t_brake < t <= self._t_brake + self._t_hover: + return Mode.HOVER + if self._t_brake + self._t_hover < t <= self._t_brake + self._t_hover + self._t_return: + return Mode.RETURN + return Mode.LAND + def step_callback( self, action: NDArray[np.floating], @@ -134,204 +128,193 @@ def episode_reset(self): """Reset the time step counter.""" self._tick = 0 - def _generate_stop_trajectory(self, obs: dict[str, NDArray[np.floating]])->Union[QuinticSpline,np.floating]: + def _generate_brake_trajectory( + self, obs: dict[str, NDArray[np.floating]] + ) -> Union[QuinticSpline, np.floating]: start_pos = obs["pos"] start_vel = obs["vel"] start_acc = obs["acc"] - direction = start_vel/np.linalg.norm(start_vel) # unit vector in the direction of travel - direction_angle = np.arccos( (np.dot(direction, [0,0,1])) / (1*1) ) - direction_angle = -(direction_angle-np.pi/2) # angle to the floor => negative means v_z < 0 - - # drone can actually go further to no reach the x_end limit if angle!=0 - self._x_end = self._x_end/np.cos(direction_angle) - # check if drone would crash into floor or ceiling - x_end_z = start_pos[2] + self._x_end*np.sin(direction_angle) - if x_end_z < 0.2: - if self.debug: - print("x_end_z<0.2") - self._x_end = (0.2 - start_pos[2])/np.sin(direction_angle) - elif x_end_z > 2.5: - if self.debug: - print("x_end_z>2.5") - self._x_end = (2.5 - start_pos[2])/np.sin(direction_angle) - - if self.debug: - print(f"start_pos_z={start_pos[2]}, x_end_z={start_pos[2] + self._x_end*np.sin(direction_angle)}, x_end={self._x_end}") - print(f"direction_angle={direction_angle*180/np.pi}°") - - const_acc = np.linalg.norm(start_vel)**2/(self._x_end) # this is just an estimate of what constant deceleration is necessary - t_brake_max = np.sqrt(4*self._x_end/const_acc) # the time it takes to brake completely - t_brake_max = max(1, t_brake_max) # minimum time in case the drone reaches the gate super slowly - t_brake = np.arange(0, t_brake_max, self._t_step_ctrl) + direction = start_vel / np.linalg.norm(start_vel) # Unit vector in the direction of travel + # Angle to the floor => negative means v_z < 0 + direction_angle = -(np.arccos((np.dot(direction, [0, 0, 1]))) - np.pi / 2) + # Calculate the distance the drone can travel before stopping + dist = np.clip(np.linalg.norm(start_vel), 0.5, 2.0) / np.cos(direction_angle) + # Check if drone would crash into floor or ceiling, and clip the values if necessary + pos_z = start_pos[2] + dist * np.sin(direction_angle) + dist = (np.clip(pos_z, 0.2, 2.5) - start_pos[2]) / np.sin(direction_angle) + + pos_z = start_pos[2] + dist * np.sin(direction_angle) # Recalculate pos_z after clipping + logger.debug(f"Start height: {start_pos[2]:.2f}, End height: {pos_z:.2f}, Dist: {dist:.2f}") + logger.debug(f"Angle: {direction_angle*180/np.pi:.0f}°") + + # Estimate the constant deceleration that is necessary for braking + const_acc = np.linalg.norm(start_vel) ** 2 / (dist) + # The time it takes to brake completely. Clip to 1s minimum to avoid short braking times + t_brake_max = max(1.0, np.sqrt(4 * dist / const_acc)) + + logger.debug(f"Gate velocity: {start_vel}, Braking time: {t_brake_max:.2f}s") + + brake_trajectory = QuinticSpline( + 0, + t_brake_max, + start_pos, + start_vel, + start_acc, + start_pos + direction * dist, + start_vel * 0, + start_acc * 0, + ) if self.debug: - print(f"t_brake={t_brake_max}") - print(f"v_gate = {start_vel}") - - trajectory_stop = QuinticSpline(0, t_brake_max, start_pos, start_vel, start_acc, - start_pos+direction*self._x_end, start_vel*0, start_acc*0) - ref_pos_stop = trajectory_stop(t_brake, order=0) + plot_trajectory(brake_trajectory, color=[0, 1, 0]) + return brake_trajectory, t_brake_max - if self.debug: - try: - step = 5 - for i in np.arange(0, len(ref_pos_stop[:,0]) - step, step): - p.addUserDebugLine( - ref_pos_stop[i,:], - ref_pos_stop[i + step,:], - lineColorRGB=[0, 1, 0], - lineWidth=2, - lifeTime=0, # 0 means the line persists indefinitely - physicsClientId=0, - ) - except p.error: - print("PyBullet not available") # Ignore errors if PyBullet is not available - - return trajectory_stop, t_brake_max - - def _generate_RTH_trajectory(self, obs: dict[str, NDArray[np.floating]], t: np.floating)->CubicSpline: + def _generate_return_trajectory( + self, obs: dict[str, NDArray[np.floating]], t: np.floating + ) -> CubicSpline: start_pos = obs["pos"] + # Landing position is 0.25m above the ground + landing_pos = np.array(self.info["drone_start_pos"]) + np.array([0, 0, 0.25]) - landing_pos = np.array(self.info["drone_start_pos"]) + np.array([0, 0, 0.25]) # 0.25m above actual landing pos - - intermed_delta = landing_pos - start_pos - intermed_pos1 = [start_pos[0] + intermed_delta[0]/4, start_pos[1] + intermed_delta[1]/4, self._z_homing] - intermed_pos2 = [intermed_pos1[0] + intermed_delta[0]/2, intermed_pos1[1] + intermed_delta[1]/2, intermed_pos1[2]] - intermed_pos3 = [0,0,0] - intermed_pos3[0] = (5*landing_pos[0]+intermed_pos2[0])/6 - intermed_pos3[1] = (5*landing_pos[1]+intermed_pos2[1])/6 - intermed_pos3[2] = (landing_pos[2]+intermed_pos2[2])/2 - - waypoints = np.array([ - start_pos, - intermed_pos1, - intermed_pos2, - intermed_pos3, - landing_pos, - ]) - trajectory_RTH = CubicSpline(np.linspace(t, t + self._t_RTH, len(waypoints)), waypoints, bc_type=((1, [0,0,0]), (1, [0,0,0]))) # bc type set boundary conditions for the derivative (here 1) - - if self.debug: - t_plot = np.arange(t, t + self._t_RTH, self._t_step_ctrl) # just for plotting - ref_pos_return = trajectory_RTH(t_plot) - try: - step = 5 - for i in np.arange(0, len(ref_pos_return) - step, step): - p.addUserDebugLine( - ref_pos_return[i], - ref_pos_return[i + step], - lineColorRGB=[0, 0, 1], - lineWidth=2, - lifeTime=0, # 0 means the line persists indefinitely - physicsClientId=0, - ) - p.addUserDebugText("x", start_pos, textColorRGB=[0,1,0]) - p.addUserDebugText("x", intermed_pos1, textColorRGB=[0,0,1]) - p.addUserDebugText("x", intermed_pos2, textColorRGB=[0,0,1]) - p.addUserDebugText("x", intermed_pos3, textColorRGB=[0,0,1]) - p.addUserDebugText("x", landing_pos, textColorRGB=[0,0,1]) - except p.error: - print("PyBullet not available") # Ignore errors if PyBullet is not available - - return trajectory_RTH + delta_pos = landing_pos - start_pos + intermed_pos1 = start_pos + delta_pos / 4 + intermed_pos1[2] = self._return_height + intermed_pos2 = intermed_pos1 + delta_pos / 2 + intermed_pos2[2] = self._return_height + + intermed_pos3 = 5 * landing_pos / 6 + intermed_pos2 / 6 + intermed_pos3[2] = landing_pos[2] / 2 + intermed_pos2[2] / 2 + + waypoints = np.array([start_pos, intermed_pos1, intermed_pos2, intermed_pos3, landing_pos]) + bc_type = ((1, [0, 0, 0]), (1, [0, 0, 0])) # Set boundary conditions for the derivative (1) + t = np.linspace(t, t + self._t_return, len(waypoints)) + return_trajectory = CubicSpline(t, waypoints, bc_type=bc_type) + + if self.debug: + plot_trajectory(return_trajectory) + return return_trajectory + + +def plot_trajectory(spline: CubicSpline | QuinticSpline, color: list[float] = [0, 0, 1]): + """Plot the drone's and the controller's trajectories.""" + n_segments = 20 + t = np.linspace(spline.x[0], spline.x[-1], n_segments) + pos = spline(t) + try: + for i in range(len(pos)): + p.addUserDebugLine( + pos[max(i - 1, 0)], + pos[i], + lineColorRGB=color, + lineWidth=2, + lifeTime=0, # 0 means the line persists indefinitely + physicsClientId=0, + ) + for x in spline(spline.x): + p.addUserDebugText("x", x, textColorRGB=color) + except p.error: + logger.warning("PyBullet not available") # Ignore errors if PyBullet is not available class QuinticSpline: """This class and the code below is mostly written by ChatGPT 4.0.""" - def __init__(self, t0: np.floating, t1: np.floating, x0: np.floating, v0: np.floating, a0: np.floating, x1: np.floating, v1: np.floating, a1: np.floating): + + def __init__( + self, + t0: np.floating, + t1: np.floating, + x0: np.floating, + v0: np.floating, + a0: np.floating, + x1: np.floating, + v1: np.floating, + a1: np.floating, + ): """Initialize the quintic spline for multidimensional conditions. - - Parameters: - - t0, t1: Start and end times - - x0, v0, a0: Lists of initial positions, velocities, and accelerations - - x1, v1, a1: Lists of final positions, velocities, and accelerations + + Params: + t0: Start time. + t1: End time. + x0: Initial position. + v0: Initial velocity. + a0: Initial acceleration. + x1: Final position. + v1: Final velocity. + a1: Final acceleration. """ self.t_points = (t0, t1) # Start and end time points self.dimensions = len(x0) # Number of dimensions - self.boundary_conditions = np.array([x0, v0, a0, x1, v1, a1]) # Boundary conditions per dimension - self.splines = [self._compute_spline(i) for i in range(self.dimensions)] + # Boundary conditions per dimension + self.boundary_conditions = np.array([x0, v0, a0, x1, v1, a1]) + self.splines = np.array([self._compute_spline(i) for i in range(self.dimensions)]) + self.x = np.array([t0, t1]) # Make compatible to CubicSpline API def _compute_spline(self, dim: int) -> NDArray: t0, t1 = self.t_points x0, v0, a0, x1, v1, a1 = self.boundary_conditions[:, dim] # Constructing the coefficient matrix for the quintic polynomial - M = np.array([ - [1, t0, t0**2, t0**3, t0**4, t0**5], # position @ t0 - [0, 1, 2*t0, 3*t0**2, 4*t0**3, 5*t0**4], # velocity @ t0 - [0, 0, 2, 6*t0, 12*t0**2, 20*t0**3], # acceleration @ t0 - [1, t1, t1**2, t1**3, t1**4, t1**5], # position @ t1 - [0, 1, 2*t1, 3*t1**2, 4*t1**3, 5*t1**4], # velocity @ t1 - [0, 0, 2, 6*t1, 12*t1**2, 20*t1**3], # acceleration @ t1 - ]) - + M = np.array( + [ + [1, t0, t0**2, t0**3, t0**4, t0**5], # position @ t0 + [0, 1, 2 * t0, 3 * t0**2, 4 * t0**3, 5 * t0**4], # velocity @ t0 + [0, 0, 2, 6 * t0, 12 * t0**2, 20 * t0**3], # acceleration @ t0 + [1, t1, t1**2, t1**3, t1**4, t1**5], # position @ t1 + [0, 1, 2 * t1, 3 * t1**2, 4 * t1**3, 5 * t1**4], # velocity @ t1 + [0, 0, 2, 6 * t1, 12 * t1**2, 20 * t1**3], # acceleration @ t1 + ] + ) # Construct the boundary condition vector b = np.array([x0, v0, a0, x1, v1, a1]) - # Solve for coefficients of the quintic polynomial coefficients = np.linalg.solve(M, b) return coefficients - def __call__(self, t: np.floating, order: int=0) -> NDArray: + def __call__(self, t: np.floating, order: int = 0) -> NDArray: """Evaluate the quintic spline or its derivatives at a given time t for all dimensions. - - Parameters: - - t: Time at which to evaluate - - order: Derivative order (0=position, 1=velocity, 2=acceleration) - - Returns: - - A list of evaluated values for each dimension - """ - results = [] - for coeffs in self.splines: - if order == 0: # Position - results.append(sum(c * t**i for i, c in enumerate(coeffs))) - elif order == 1: # Velocity - results.append(sum(i * c * t**(i-1) for i, c in enumerate(coeffs) if i >= 1)) - elif order == 2: # Acceleration - results.append(sum(i * (i-1) * c * t**(i-2) for i, c in enumerate(coeffs) if i >= 2)) - else: - raise ValueError("Only orders 0 (position), 1 (velocity), and 2 (acceleration) are supported.") - return np.array(results).T # Transpose to make time the first index + Params: + t: Time at which to evaluate + order: Derivative order (0=position, 1=velocity, 2=acceleration) -""" -Example code how to use the QuinticSpline class -""" -# # Define the boundary conditions for a 2D case -# t0, t1 = 0, 1 -# x0, v0, a0 = [0.0, 0.0], [0.0, 1.0], [0.0, 0.0] # Initial conditions for x and y -# x1, v1, a1 = [1.0, 1.0], [0.0, 0.0], [0.0, 0.0] # Final conditions for x and y - -# # Create the QuinticSpline object -# spline = QuinticSpline(t0, t1, x0, v0, a0, x1, v1, a1) - -# # Evaluate the spline and its derivatives -# t_values = np.linspace(t0, t1, 100) -# pos_values = np.array([spline(t, order=0) for t in t_values]) # Position -# vel_values = np.array([spline(t, order=1) for t in t_values]) # Velocity -# acc_values = np.array([spline(t, order=2) for t in t_values]) # Acceleration - -# # Plot the results for each dimension -# fig, axes = plt.subplots(3, 1, figsize=(10, 8)) - -# axes[0].plot(t_values, pos_values[:, 0], label="Position X") -# axes[0].plot(t_values, pos_values[:, 1], label="Position Y") -# axes[0].set_ylabel("Position") -# axes[0].legend() - -# axes[1].plot(t_values, vel_values[:, 0], label="Velocity X") -# axes[1].plot(t_values, vel_values[:, 1], label="Velocity Y") -# axes[1].set_ylabel("Velocity") -# axes[1].legend() - -# axes[2].plot(t_values, acc_values[:, 0], label="Acceleration X") -# axes[2].plot(t_values, acc_values[:, 1], label="Acceleration Y") -# axes[2].set_ylabel("Acceleration") -# axes[2].set_xlabel("Time") -# axes[2].legend() - -# plt.tight_layout() -# plt.show() + Examples: + >>> t0, t1 = 0, 1 + >>> x0, v0, a0 = [0.0, 0.0], [0.0, 1.0], [0.0, 0.0] # Initial conditions for x and y + >>> x1, v1, a1 = [1.0, 1.0], [0.0, 0.0], [0.0, 0.0] # Final conditions for x and y + >>> spline = QuinticSpline(t0, t1, x0, v0, a0, x1, v1, a1) + >>> t_values = np.linspace(t0, t1, 100) + >>> pos_values = spline(t_values, order=0) # Position + >>> vel_values = np.array([spline(t, order=1) for t in t_values]) # Velocity + >>> acc_values = np.array([spline(t, order=2) for t in t_values]) # Acceleration + + Returns: + A list of evaluated values for each dimension + """ + t = np.atleast_1d(t) + if order == 0: + return self._position(t).squeeze() + elif order == 1: + return self._velocity(t).squeeze() + elif order == 2: + return self._acceleration(t).squeeze() + raise ValueError(f"Spline orders (0, 1, 2) are supported, got {order}") + + def _position(self, t: NDArray[np.floating]) -> NDArray[np.floating]: + powers = t[:, None] ** np.arange(len(self.splines[0])) + return np.dot(powers, self.splines.T) + + def _velocity(self, t: NDArray[np.floating]) -> NDArray[np.floating]: + mult = np.arange(len(self.splines[0])) + powers = np.zeros((len(t), len(self.splines[0]))) + powers[1:, :] = t[1:, None] ** (mult - 1) + coeffs = self.splines * mult[None, :] # Multiply by i + return np.dot(powers, coeffs.T) + + def _acceleration(self, t: NDArray[np.floating]) -> NDArray[np.floating]: + mult = np.arange(len(self.splines[0])) + powers = np.zeros((len(t), len(self.splines[0]))) + powers[1:, :] = t[1:, None] ** (mult - 2) + coeffs = self.splines * (mult * (mult - 1))[None, :] # Multiply by i*(i-1) + return np.dot(powers, coeffs.T) diff --git a/lsy_drone_racing/envs/drone_racing_deploy_env.py b/lsy_drone_racing/envs/drone_racing_deploy_env.py index 87d57666..5b43fc7c 100644 --- a/lsy_drone_racing/envs/drone_racing_deploy_env.py +++ b/lsy_drone_racing/envs/drone_racing_deploy_env.py @@ -106,8 +106,10 @@ def __init__(self, config: dict | Munch): # pycrazyswarm expects strings, not Path objects, so we need to convert it first swarm = pycrazyswarm.Crazyswarm(str(crazyswarm_config_path)) self.cf = swarm.allcfs.crazyflies[0] - names = [f"gate{g}" for g in range(1, len(config.env.track.gates) + 1)] - names += [f"obstacle{g}" for g in range(1, len(config.env.track.obstacles) + 1)] + names = [] + if not self.config.deploy.practice_without_track_objects: + names += [f"gate{g}" for g in range(1, len(config.env.track.gates) + 1)] + names += [f"obstacle{g}" for g in range(1, len(config.env.track.obstacles) + 1)] self.vicon = Vicon(track_names=names, timeout=5) self.symbolic = None if config.env.symbolic: @@ -122,6 +124,13 @@ def __init__(self, config: dict | Munch): self.symbolic = sim.symbolic() self._last_pos = np.zeros(3) + self.gates_visited = np.array([False] * len(config.env.track.gates)) + self.obstacles_visited = np.array([False] * len(config.env.track.obstacles)) + + # Use internal variable to store results of self.obs that is updated every time + # self.obs is invoked in order to prevent calling it more often than necessary. + self._obs = None + def reset( self, *, seed: int | None = None, options: dict | None = None ) -> tuple[dict[str, NDArray[np.floating]], dict]: @@ -130,8 +139,13 @@ def reset( We cannot reset the track in the real world. Instead, we check if the gates, obstacles and drone are positioned within tolerances. """ - check_race_track(self.config) - check_drone_start_pos(self.config) + if ( + self.config.deploy.check_race_track + and not self.config.deploy.practice_without_track_objects + ): + check_race_track(self.config) + if self.config.deploy.check_drone_start_pos: + check_drone_start_pos(self.config) self._last_pos[:] = self.vicon.pos[self.vicon.drone_name] self.target_gate = 0 info = self.info @@ -165,19 +179,20 @@ def step( def close(self): """Close the environment by stopping the drone and landing back at the starting position.""" - return_home = True # makes the drone simulate the return to home after stopping + return_home = True # makes the drone simulate the return to home after stopping if return_home: # This is done to run the closing controller at a different frequency than the controller before # Does not influence other code, since this part is already in closing! # WARNING: When changing the frequency, you must also change the current _step!!! - freq_new = 100 # Hz - self._steps = int( self._steps / self.config.env.freq * freq_new ) + freq_new = 100 # Hz self.config.env.freq = freq_new - t_step_ctrl = 1/self.config.env.freq - + t_step_ctrl = 1 / self.config.env.freq + obs = self.obs - obs["acc"] = np.array([0,0,0]) # TODO, use actual value when avaiable or do one step to calculate from velocity + obs["acc"] = np.array( + [0, 0, 0] + ) # TODO, use actual value when avaiable or do one step to calculate from velocity info = self.info info["env_freq"] = self.config.env.freq info["drone_start_pos"] = self.config.env.track.drone.pos @@ -185,20 +200,25 @@ def close(self): controller = ClosingController(obs, info) t_total = controller.t_total - for i in np.arange(int(t_total/t_step_ctrl)): # hover for some more time + for i in np.arange(int(t_total / t_step_ctrl)): # hover for some more time action = controller.compute_control(obs) action = action.astype(np.float64) # Drone firmware expects float64 - pos, vel, acc, yaw, rpy_rate = action[:3], action[3:6], action[6:9], action[9], action[10:] + pos, vel, acc, yaw, rpy_rate = ( + action[:3], + action[3:6], + action[6:9], + action[9], + action[10:], + ) self.cf.cmdFullState(pos, vel, acc, yaw, rpy_rate) obs = self.obs - obs["acc"] = np.array([0,0,0]) + obs["acc"] = np.array([0, 0, 0]) controller.step_callback(action, obs, 0, True, False, info) - time.sleep(t_step_ctrl) + time.sleep(t_step_ctrl) self.cf.notifySetpointsStop() self.cf.land(0.05, 2.0) - @property def obs(self) -> dict: """Return the observation of the environment.""" @@ -216,28 +236,42 @@ def obs(self) -> dict: n_gates = len(self.config.env.track.gates) obs["target_gate"] = self.target_gate if self.target_gate < n_gates else -1 - # Add the gate and obstacle poses to the info. If gates or obstacles are in sensor range, - # use the actual pose, otherwise use the nominal pose. + drone_pos = self.vicon.pos[self.vicon.drone_name] + gates_pos = np.array([g.pos for g in self.config.env.track.gates]) - gate_names = [f"gate{g}" for g in range(1, len(gates_pos) + 1)] - real_gates_pos = np.array([self.vicon.pos[g] for g in gate_names]) - in_range = np.linalg.norm(real_gates_pos - drone_pos, axis=1) < sensor_range - gates_pos[in_range] = real_gates_pos[in_range] gates_rpy = np.array([g.rpy for g in self.config.env.track.gates]) - real_gates_rpy = np.array([self.vicon.rpy[g] for g in gate_names]) - gates_rpy[in_range] = real_gates_rpy[in_range] - obs["gates_pos"] = gates_pos.astype(np.float32) - obs["gates_rpy"] = gates_rpy.astype(np.float32) - obs["gates_in_range"] = in_range + gate_names = [f"gate{g}" for g in range(1, len(gates_pos) + 1)] obstacles_pos = np.array([o.pos for o in self.config.env.track.obstacles]) obstacle_names = [f"obstacle{g}" for g in range(1, len(obstacles_pos) + 1)] - real_obstacles_pos = np.array([self.vicon.pos[o] for o in obstacle_names]) - in_range = np.linalg.norm(real_obstacles_pos - drone_pos, axis=1) < sensor_range - obstacles_pos[in_range] = real_obstacles_pos[in_range] + + # Update objects position with vicon data if not in practice mode and object + # either is in range or was in range previously. + if not self.config.deploy.practice_without_track_objects: + real_gates_pos = np.array([self.vicon.pos[g] for g in gate_names]) + real_gates_rpy = np.array([self.vicon.rpy[g] for g in gate_names]) + real_obstacles_pos = np.array([self.vicon.pos[o] for o in obstacle_names]) + + # Use x-y distance to calucate sensor range, otherwise it would depend on the height of the drone + # and obstacle how early the obstacle is in range. + in_range = np.linalg.norm(real_gates_pos[:, :2] - drone_pos[:2], axis=1) < sensor_range + self.gates_visited = np.logical_or(self.gates_visited, in_range) + gates_pos[self.gates_visited] = real_gates_pos[self.gates_visited] + gates_rpy[self.gates_visited] = real_gates_rpy[self.gates_visited] + obs["gates_in_range"] = in_range + + in_range = ( + np.linalg.norm(real_obstacles_pos[:, :2] - drone_pos[:2], axis=1) < sensor_range + ) + self.obstacles_visited = np.logical_or(self.obstacles_visited, in_range) + obstacles_pos[self.obstacles_visited] = real_obstacles_pos[self.obstacles_visited] + obs["obstacles_in_range"] = in_range + + obs["gates_pos"] = gates_pos.astype(np.float32) + obs["gates_rpy"] = gates_rpy.astype(np.float32) obs["obstacles_pos"] = obstacles_pos.astype(np.float32) - obs["obstacles_in_range"] = in_range + self._obs = obs return obs @property @@ -254,12 +288,10 @@ def gate_passed(self, pos: NDArray[np.floating], prev_pos: NDArray[np.floating]) """ n_gates = len(self.config.env.track.gates) if self.target_gate < n_gates and self.target_gate != -1: - # Gate IDs go from 0 to N-1, but names go from 1 to N - gate_id = "gate" + str(self.target_gate + 1) # Real gates measure 0.4m x 0.4m, we account for meas. error gate_size = (0.56, 0.56) - gate_pos = self.vicon.pos[gate_id] - gate_rot = R.from_euler("xyz", self.vicon.rpy[gate_id]) + gate_pos = self._obs["gates_pos"][self.target_gate] + gate_rot = R.from_euler("xyz", self._obs["gates_rpy"][self.target_gate]) return check_gate_pass(gate_pos, gate_rot, gate_size, pos, prev_pos) return False diff --git a/lsy_drone_racing/envs/drone_racing_env.py b/lsy_drone_racing/envs/drone_racing_env.py index e166b6aa..07f14eb0 100644 --- a/lsy_drone_racing/envs/drone_racing_env.py +++ b/lsy_drone_racing/envs/drone_racing_env.py @@ -153,7 +153,7 @@ def reset( info = self.info info["sim_freq"] = self.config.sim.sim_freq info["low_level_ctrl_freq"] = self.config.sim.ctrl_freq - info["drone_mass"] = self.sim.drone.params.mass + info["drone_mass"] = self.sim.drone.nominal_params.mass info["env_freq"] = self.config.env.freq return self.obs, info @@ -224,7 +224,6 @@ def obs(self) -> dict[str, NDArray[np.floating]]: "vel": obs_drone[6:9], "ang_vel": obs_drone[9:], } - obs["ang_vel"][:] = R.from_euler("xyz", obs["rpy"]).apply(obs["ang_vel"], inverse=True) gates = self.sim.gates @@ -305,19 +304,21 @@ def gate_passed(self) -> bool: def close(self): """Close the environment by stopping the drone and landing back at the starting position.""" - return_home = False # makes the drone simulate the return to home after stopping + return_home = True # makes the drone simulate the return to home after stopping if return_home: # This is done to run the closing controller at a different frequency than the controller before # Does not influence other code, since this part is already in closing! # WARNING: When changing the frequency, you must also change the current _step!!! - freq_new = 100 # Hz - self._steps = int( self._steps / self.config.env.freq * freq_new ) + freq_new = 100 # Hz + self._steps = int(self._steps / self.config.env.freq * freq_new) self.config.env.freq = freq_new - t_step_ctrl = 1/self.config.env.freq - + t_step_ctrl = 1 / self.config.env.freq + obs = self.obs - obs["acc"] = np.array([0,0,0]) # TODO, use actual value when avaiable or do one step to calculate from velocity + obs["acc"] = np.array( + [0, 0, 0] + ) # TODO, use actual value when avaiable or do one step to calculate from velocity info = self.info info["env_freq"] = self.config.env.freq info["drone_start_pos"] = self.config.env.track.drone.pos @@ -325,21 +326,27 @@ def close(self): controller = ClosingController(obs, info) t_total = controller.t_total - for i in np.arange(int(t_total/t_step_ctrl)): # hover for some more time + for i in np.arange(int(t_total / t_step_ctrl)): # hover for some more time action = controller.compute_control(obs) action = action.astype(np.float64) # Drone firmware expects float64 if self.config.sim.physics == PhysicsMode.SYS_ID: print("[Warning] sys_id model not supported by return home script") break - pos, vel, acc, yaw, rpy_rate = action[:3], action[3:6], action[6:9], action[9], action[10:] + pos, vel, acc, yaw, rpy_rate = ( + action[:3], + action[3:6], + action[6:9], + action[9], + action[10:], + ) self.sim.drone.full_state_cmd(pos, vel, acc, yaw, rpy_rate) collision = self._inner_step_loop() terminated = self.terminated or collision obs = self.obs - obs["acc"] = np.array([0,0,0]) + obs["acc"] = np.array([0, 0, 0]) controller.step_callback(action, obs, self.reward, terminated, False, info) if self.config.sim.gui: # Only sync if gui is active - time.sleep(t_step_ctrl) + time.sleep(t_step_ctrl) self.sim.close() diff --git a/lsy_drone_racing/sim/noise.py b/lsy_drone_racing/sim/noise.py index 5b01efd3..f4a10e78 100644 --- a/lsy_drone_racing/sim/noise.py +++ b/lsy_drone_racing/sim/noise.py @@ -59,11 +59,7 @@ class UniformNoise(Noise): """I.i.d uniform noise ~ U(low, high) per time step.""" def __init__( - self, - dim: int, - mask: NDArray[np.bool] | None = None, - low: float = 0.0, - high: float = 1.0 + self, dim: int, mask: NDArray[np.bool] | None = None, low: float = 0.0, high: float = 1.0 ): """Initialize the uniform noise. diff --git a/lsy_drone_racing/utils/ros_utils.py b/lsy_drone_racing/utils/ros_utils.py index 52d197b7..241ef9f7 100644 --- a/lsy_drone_racing/utils/ros_utils.py +++ b/lsy_drone_racing/utils/ros_utils.py @@ -29,7 +29,7 @@ def check_race_track(config: Munch): if not rng_info: logger.error("Randomization info not found in the configuration.") raise RuntimeError("Randomization info not found in the configuration.") - ang_tol = 0.3 # TODO: Adapt value based on experience in the lab + ang_tol = config.env.track.gates[0].rpy[2] # Assume all gates have the same rotation assert rng_info.gate_pos.type == "uniform", "Race track checks expect uniform distributions" assert rng_info.obstacle_pos.type == "uniform", "Race track checks expect uniform distributions" for i, gate in enumerate(config.env.track.gates): diff --git a/lsy_drone_racing/vicon.py b/lsy_drone_racing/vicon.py index 95a109ce..2fc8bd4d 100644 --- a/lsy_drone_racing/vicon.py +++ b/lsy_drone_racing/vicon.py @@ -51,6 +51,7 @@ def __init__( except rospy.exceptions.ROSException: ... # ROS node is already running which is fine for us self.drone_name = None + self.auto_track_drone = auto_track_drone if auto_track_drone: with open(get_ros_package_path("crazyswarm") / "launch/crazyflies.yaml", "r") as f: config = yaml.load(f, yaml.SafeLoader) @@ -65,9 +66,10 @@ def __init__( self.time: dict[str, float] = {} self.tf_sub = rospy.Subscriber("/tf", TFMessage, self.tf_callback) - self.estimator_sub = rospy.Subscriber( - "/estimated_state", StateVector, self.estimator_callback - ) + if auto_track_drone: + self.estimator_sub = rospy.Subscriber( + "/estimated_state", StateVector, self.estimator_callback + ) if timeout: tstart = time.time() @@ -146,4 +148,5 @@ def active(self) -> bool: def close(self): """Unregister the ROS subscribers.""" self.tf_sub.unregister() - self.estimator_sub.unregister() + if self.auto_track_drone: + self.estimator_sub.unregister() diff --git a/scripts/deploy.py b/scripts/deploy.py index a50c71b4..7ab794cb 100644 --- a/scripts/deploy.py +++ b/scripts/deploy.py @@ -53,7 +53,7 @@ def main(config: str = "level3.toml", controller: str | None = None): start_time = time.perf_counter() while not rospy.is_shutdown(): t_loop = time.perf_counter() - obs, info = env.obs, env.info + obs, info = env.unwrapped.obs, env.unwrapped.info action = controller.compute_control(obs, info) next_obs, reward, terminated, truncated, info = env.step(action) controller.step_callback(action, next_obs, reward, terminated, truncated, info) diff --git a/tests/unit_tests/sim/test_noise.py b/tests/unit_tests/sim/test_noise.py index b393e3fe..14565a07 100644 --- a/tests/unit_tests/sim/test_noise.py +++ b/tests/unit_tests/sim/test_noise.py @@ -76,9 +76,9 @@ def test_noise_list_from_specs(): ] noise_list = NoiseList.from_specs(specs) - assert len(noise_list.noises) == 2 - assert isinstance(noise_list.noises[0], UniformNoise) - assert isinstance(noise_list.noises[1], GaussianNoise) + assert len(noise_list) == 2 + assert isinstance(noise_list[0], UniformNoise) + assert isinstance(noise_list[1], GaussianNoise) @pytest.mark.unit