Skip to content

Commit

Permalink
Merge pull request #129 from decargroup/modify_residuals_for_mixtures
Browse files Browse the repository at this point in the history
Slight batch modifications + Make PointRelativeMeasurement work with SE2.
  • Loading branch information
vkorotkine authored Jan 16, 2025
2 parents 67a7813 + 42101bf commit a8fe24b
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 87 deletions.
1 change: 1 addition & 0 deletions navlie/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
"""

from .estimator import BatchEstimator
from .problem import Problem
2 changes: 1 addition & 1 deletion navlie/batch/gaussian_mixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def mix_errors(

nonlinear_part = np.array(np.log(alpha_max / alpha_k)).reshape(-1)
nonlinear_part = np.sqrt(2) * np.sqrt(nonlinear_part)
e_mix = np.concatenate([linear_part, nonlinear_part])
e_mix = np.concatenate([linear_part.reshape(-1), nonlinear_part])

reused_values = {"alphas": alphas, "res_values": res_values}

Expand Down
32 changes: 25 additions & 7 deletions navlie/batch/residuals.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]:
Parameters
----------
states : List[State]
Evaluation point of Jacobians, a list of states that
Evaluation point of Jacobians, a list of states that
the residual is a function of.
Returns
-------
List[np.ndarray]
Expand All @@ -78,7 +78,7 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]:
w.r.t states[0], the second element is the Jacobian of the residual w.r.t states[1], etc.
"""
jac_list: List[np.ndarray] = [None] * len(states)

# Compute the Jacobian for each state via finite difference
for state_num, X_bar in enumerate(states):
e_bar = self.evaluate(states)
Expand All @@ -100,13 +100,14 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]:
jac_list[state_num] = jac_fd

return jac_list

def sqrt_info_matrix(self, states: List[State]):
"""
Returns the information matrix
"""
pass


class PriorResidual(Residual):
"""
A generic prior error.
Expand Down Expand Up @@ -157,6 +158,7 @@ def sqrt_info_matrix(self, states: List[State]):
"""
return self._L


class ProcessResidual(Residual):
"""
A generic process residual.
Expand Down Expand Up @@ -211,16 +213,24 @@ def evaluate(
if compute_jacobians:
jac_list = [None] * len(states)
if compute_jacobians[0]:
jac_list[0] = -L.T @ self._process_model.jacobian(
x_km1, self._u, dt
)
jac_list[0] = -L.T @ self._process_model.jacobian(x_km1, self._u, dt)
if compute_jacobians[1]:
jac_list[1] = L.T @ x_k.minus_jacobian(x_k_hat)

return e, jac_list

return e

def sqrt_info_matrix(self, states: List[State]):
"""
Returns the square root of the information matrix
"""
x_km1 = states[0]
x_k = states[1]
dt = x_k.stamp - x_km1.stamp
L = self._process_model.sqrt_information(x_km1, self._u, dt)
return L


class MeasurementResidual(Residual):
"""
Expand Down Expand Up @@ -268,3 +278,11 @@ def evaluate(
return e, jacobians

return e

def sqrt_info_matrix(self, states: List[State]):
"""
Returns the square root of the information matrix
"""
x = states[0]
L = self._y.model.sqrt_information(x)
return L
98 changes: 34 additions & 64 deletions navlie/lib/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from navlie.lib.states import SE2State, SE3State, SE23State
from navlie.types import (
Measurement,
ProcessModel,
Expand Down Expand Up @@ -154,9 +155,7 @@ def __init__(self, Q: np.ndarray):
self._Q = Q
self.dim = int(Q.shape[0] / 2)

def evaluate(
self, x: VectorState, u: VectorInput, dt: float
) -> VectorState:
def evaluate(self, x: VectorState, u: VectorInput, dt: float) -> VectorState:
x = x.copy()
u = u.copy()
Ad = super().jacobian(x, u, dt)
Expand Down Expand Up @@ -244,9 +243,7 @@ def evaluate(
x.value = x.value @ x.group.Exp(u.value * dt)
return x

def jacobian(
self, x: MatrixLieGroupState, u: VectorInput, dt: float
) -> np.ndarray:
def jacobian(self, x: MatrixLieGroupState, u: VectorInput, dt: float) -> np.ndarray:
if x.direction == "right":
return x.group.adjoint(x.group.Exp(-u.value * dt))
elif x.direction == "left":
Expand Down Expand Up @@ -321,16 +318,12 @@ def evaluate(
x.value = x.group.Exp(-u[0] * dt) @ x.value @ x.group.Exp(u[1] * dt)
return x

def jacobian(
self, x: MatrixLieGroupState, u: VectorInput, dt: float
) -> np.ndarray:
def jacobian(self, x: MatrixLieGroupState, u: VectorInput, dt: float) -> np.ndarray:
u = u.value.reshape((2, round(u.value.size / 2)))
if x.direction == "right":
return x.group.adjoint(x.group.Exp(-u[1] * dt))
else:
raise NotImplementedError(
"TODO: left jacobian not yet implemented."
)
raise NotImplementedError("TODO: left jacobian not yet implemented.")

def covariance(
self, x: MatrixLieGroupState, u: VectorInput, dt: float
Expand All @@ -347,9 +340,7 @@ def covariance(
L2 = dt * x.group.left_jacobian(-dt * u2)
return L1 @ self._Q1 @ L1.T + L2 @ self._Q2 @ L2.T
else:
raise NotImplementedError(
"TODO: left covariance not yet implemented."
)
raise NotImplementedError("TODO: left covariance not yet implemented.")


class LinearMeasurement(MeasurementModel):
Expand Down Expand Up @@ -479,16 +470,21 @@ def jacobian(self, x: MatrixLieGroupState) -> np.ndarray:
r_zw_a = x.position.reshape((-1, 1))
C_ab = x.attitude
r_pw_a = self._landmark_position.reshape((-1, 1))
y = C_ab.T @ (r_pw_a - r_zw_a)
y = C_ab.T @ (r_pw_a - r_zw_a).reshape(-1, 1)

if C_ab.shape[0] == 2:
group = SO2
if C_ab.shape[0] == 3:
group = SO3

if x.direction == "right":
return x.jacobian_from_blocks(
attitude=-SO3.odot(y), position=-np.identity(r_zw_a.shape[0])
attitude=-group.odot(y), position=-np.identity(r_zw_a.shape[0])
)

elif x.direction == "left":
return x.jacobian_from_blocks(
attitude=-C_ab.T @ SO3.odot(r_pw_a), position=-C_ab.T
attitude=-C_ab.T @ group.odot(r_pw_a), position=-C_ab.T
)

def covariance(self, x: MatrixLieGroupState) -> np.ndarray:
Expand Down Expand Up @@ -541,7 +537,9 @@ def jacobian(self, x: CompositeState) -> np.ndarray:
r_pw_a = landmark.value.reshape((-1, 1))
y = C_ab.T @ (r_pw_a - r_zw_a)

# Compute Jacobian of measurement model with respect to the state
# Compute Jacobian of measurement model with respect to the stated
# TODO: Implement SE2 version.

if pose.direction == "right":
pose_jacobian = pose.jacobian_from_blocks(
attitude=-SO3.odot(y), position=-np.identity(r_zw_a.shape[0])
Expand Down Expand Up @@ -624,9 +622,7 @@ def jacobian(self, x: CompositeState) -> np.ndarray:

# Compute the Jacobian of the measurement model
if pose.direction == "right":
r_pc_c = self._camera.resolve_landmark_in_cam_frame(
pose, landmark.value
)
r_pc_c = self._camera.resolve_landmark_in_cam_frame(pose, landmark.value)
dg_dr = self._compute_jac_of_projection_model(r_pc_c)
# Extract relevant states
C_bc = self._camera.T_bc.attitude
Expand Down Expand Up @@ -660,15 +656,11 @@ def jacobian(self, x: CompositeState) -> np.ndarray:
def covariance(self, x: CompositeState) -> np.ndarray:
return self._R

def _compute_jac_of_projection_model(
self, r_pc_c: np.ndarray
) -> np.ndarray:
def _compute_jac_of_projection_model(self, r_pc_c: np.ndarray) -> np.ndarray:
x, y, z = r_pc_c.ravel()
fu = self._camera.fu
fv = self._camera.fv
dg_dr = (1.0 / z) * np.array(
[[fu, 0, -fu * x / z], [0, fv, -fv * y / z]]
)
dg_dr = (1.0 / z) * np.array([[fu, 0, -fu * x / z], [0, fv, -fv * y / z]])

return dg_dr

Expand Down Expand Up @@ -805,9 +797,7 @@ class RangePoseToPose(MeasurementModel):
Compatible with ``SE2State, SE3State, SE23State, IMUState``.
"""

def __init__(
self, tag_body_position1, tag_body_position2, state_id1, state_id2, R
):
def __init__(self, tag_body_position1, tag_body_position2, state_id1, state_id2, R):
"""
Parameters
----------
Expand Down Expand Up @@ -842,9 +832,7 @@ def evaluate(self, x: CompositeState) -> np.ndarray:
C_a2 = x2.attitude
r_t1_1 = self.tag_body_position1.reshape((-1, 1))
r_t2_2 = self.tag_body_position2.reshape((-1, 1))
r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (
C_a2 @ r_t2_2 + r_2w_a
)
r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (C_a2 @ r_t2_2 + r_2w_a)
return np.array(np.linalg.norm(r_t1t2_a.flatten()))

def jacobian(self, x: CompositeState) -> np.ndarray:
Expand All @@ -856,18 +844,16 @@ def jacobian(self, x: CompositeState) -> np.ndarray:
C_a2 = x2.attitude
r_t1_1 = self.tag_body_position1.reshape((-1, 1))
r_t2_2 = self.tag_body_position2.reshape((-1, 1))
r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (
C_a2 @ r_t2_2 + r_2w_a
)
r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (C_a2 @ r_t2_2 + r_2w_a)

if C_a1.shape == (2, 2):
att_group = SO2
elif C_a1.shape == (3, 3):
att_group = SO3

rho: np.ndarray = (
r_t1t2_a / np.linalg.norm(r_t1t2_a.flatten())
).reshape((-1, 1))
rho: np.ndarray = (r_t1t2_a / np.linalg.norm(r_t1t2_a.flatten())).reshape(
(-1, 1)
)

if x1.direction == "right":
jac1 = x1.jacobian_from_blocks(
Expand All @@ -891,9 +877,7 @@ def jacobian(self, x: CompositeState) -> np.ndarray:
position=-rho.T @ np.identity(r_t2_2.size),
)

return x.jacobian_from_blocks(
{self.state_id1: jac1, self.state_id2: jac2}
)
return x.jacobian_from_blocks({self.state_id1: jac1, self.state_id2: jac2})

def covariance(self, x: CompositeState) -> np.ndarray:
return self._R
Expand Down Expand Up @@ -1061,9 +1045,7 @@ def evaluate(self, x: MatrixLieGroupState):

def jacobian(self, x: MatrixLieGroupState):
if x.direction == "right":
return x.jacobian_from_blocks(
position=x.attitude[2, :].reshape((1, -1))
)
return x.jacobian_from_blocks(position=x.attitude[2, :].reshape((1, -1)))
elif x.direction == "left":
return x.jacobian_from_blocks(
attitude=SO3.odot(x.position)[2, :].reshape((1, -1)),
Expand Down Expand Up @@ -1107,13 +1089,9 @@ def evaluate(self, x: MatrixLieGroupState):

def jacobian(self, x: MatrixLieGroupState):
if x.direction == "right":
return x.jacobian_from_blocks(
attitude=-SO3.odot(x.attitude.T @ self._g_a)
)
return x.jacobian_from_blocks(attitude=-SO3.odot(x.attitude.T @ self._g_a))
elif x.direction == "left":
return x.jacobian_from_blocks(
attitude=x.attitude.T @ -SO3.odot(self._g_a)
)
return x.jacobian_from_blocks(attitude=x.attitude.T @ -SO3.odot(self._g_a))

def covariance(self, x: MatrixLieGroupState) -> np.ndarray:
if np.isscalar(self.R):
Expand Down Expand Up @@ -1156,13 +1134,9 @@ def evaluate(self, x: MatrixLieGroupState):

def jacobian(self, x: MatrixLieGroupState):
if x.direction == "right":
return x.jacobian_from_blocks(
attitude=-SO3.odot(x.attitude.T @ self._m_a)
)
return x.jacobian_from_blocks(attitude=-SO3.odot(x.attitude.T @ self._m_a))
elif x.direction == "left":
return x.jacobian_from_blocks(
attitude=-x.attitude.T @ SO3.odot(self._m_a)
)
return x.jacobian_from_blocks(attitude=-x.attitude.T @ SO3.odot(self._m_a))

def covariance(self, x: MatrixLieGroupState) -> np.ndarray:
if np.isscalar(self.R):
Expand All @@ -1172,9 +1146,7 @@ def covariance(self, x: MatrixLieGroupState) -> np.ndarray:


class _InvariantInnovation(MeasurementModel):
def __init__(
self, y: np.ndarray, model: MeasurementModel, direction="right"
):
def __init__(self, y: np.ndarray, model: MeasurementModel, direction="right"):
self.measurement_model = model
self.y = y.ravel()
self.direction = direction
Expand Down Expand Up @@ -1225,9 +1197,7 @@ def _compute_direction(self, x: MatrixLieGroupState) -> str:
elif x.direction == "right":
direction = "left"
else:
raise ValueError(
"Invalid direction. Must be 'left', 'right' or 'auto'"
)
raise ValueError("Invalid direction. Must be 'left', 'right' or 'auto'")
return direction


Expand Down
Loading

0 comments on commit a8fe24b

Please sign in to comment.