Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions lib/bb/sensor/mimic.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# SPDX-FileCopyrightText: 2026 James Harton
#
# SPDX-License-Identifier: Apache-2.0

defmodule BB.Sensor.Mimic do
@moduledoc """
A sensor that derives joint state from another joint.

Subscribes to sensor messages from a source joint and re-publishes
transformed messages for the mimic joint. This is useful for modelling
parallel jaw grippers and other mechanically-linked joint pairs.

## Options

* `:source` - (required) The name of the source joint to follow
* `:multiplier` - (optional, default 1.0) Scale factor applied to position values
* `:offset` - (optional, default 0.0) Constant offset added after scaling
* `:message_types` - (optional, default [JointState]) List of message types to forward

For JointState messages: `mimic_position = source_position * multiplier + offset`

## Example

joint :right_finger do
type(:prismatic)
sensor(:mimic, {BB.Sensor.Mimic,
source: :left_finger,
multiplier: 1.0,
message_types: [JointState]
})
end

## URDF Mimic Joints

This sensor implements the equivalent of URDF mimic joints:

<joint name="right_finger_joint" type="prismatic">
<mimic joint="left_finger_joint" multiplier="1" offset="0"/>
</joint>

Forward kinematics and visualisation automatically work since they
consume JointState messages published by this sensor.
"""

use BB.Sensor,
options_schema: [
source: [type: :atom, required: true, doc: "Name of the source joint to follow"],
multiplier: [type: :float, default: 1.0, doc: "Scale factor for position values"],
offset: [type: :float, default: 0.0, doc: "Constant offset added after scaling"],
message_types: [
type: {:list, :atom},
default: [BB.Message.Sensor.JointState],
doc: "Message types to forward"
]
]

alias BB.Message
alias BB.Message.Sensor.JointState

@impl BB.Sensor
def init(opts) do
{:ok, state} = build_state(opts)

BB.subscribe(state.bb.robot, [:sensor | state.source_path],
message_types: state.message_types
)

{:ok, state}
end

defp build_state(opts) do
opts = Map.new(opts)
[sensor_name, joint_name | rest] = Enum.reverse(opts.bb.path)

source_path = build_source_path(rest, opts.source)

state = %{
bb: opts.bb,
sensor_name: sensor_name,
joint_name: joint_name,
source: opts.source,
source_path: source_path,
multiplier: Map.get(opts, :multiplier, 1.0),
offset: Map.get(opts, :offset, 0.0),
message_types: Map.get(opts, :message_types, [JointState])
}

{:ok, state}
end

defp build_source_path(parent_path, source_joint) do
Enum.reverse(parent_path) ++ [source_joint]
end

@impl BB.Sensor
def handle_info(
{:bb, _source_path, %Message{payload: %JointState{} = payload} = message},
state
) do
transformed_payload = transform_joint_state(payload, state)
transformed_message = %{message | payload: transformed_payload}
BB.publish(state.bb.robot, [:sensor | state.bb.path], transformed_message)
{:noreply, state}
end

def handle_info({:bb, _source_path, %Message{} = message}, state) do
BB.publish(state.bb.robot, [:sensor | state.bb.path], message)
{:noreply, state}
end

defp transform_joint_state(%JointState{} = payload, state) do
positions =
Enum.map(payload.positions, fn pos ->
pos * state.multiplier + state.offset
end)

names =
Enum.map(payload.names, fn _name ->
state.joint_name
end)

%{payload | positions: positions, names: names}
end
end
204 changes: 204 additions & 0 deletions test/bb/sensor/mimic_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# SPDX-FileCopyrightText: 2026 James Harton
#
# SPDX-License-Identifier: Apache-2.0

defmodule BB.Sensor.MimicTest do
use ExUnit.Case, async: true
use Mimic

alias BB.Message
alias BB.Message.Sensor.JointState
alias BB.Sensor.Mimic

@source_joint :left_finger
@mimic_joint :right_finger
@sensor_name :mimic

defp joint_state_message(names, positions) do
Message.new!(JointState, :gripper,
names: names,
positions: positions
)
end

defp default_bb_context do
%{robot: TestRobot, path: [:gripper_link, @mimic_joint, @sensor_name]}
end

defp init_sensor(opts \\ []) do
stub(BB.PubSub, :subscribe, fn _robot, _path, _opts -> :ok end)

default_opts = [bb: default_bb_context(), source: @source_joint]
{:ok, state} = Mimic.init(Keyword.merge(default_opts, opts))

state
end

describe "init/1" do
test "subscribes to source joint's sensor topic" do
test_pid = self()

expect(BB.PubSub, :subscribe, fn robot, path, opts ->
send(test_pid, {:subscribed, robot, path, opts})
:ok
end)

opts = [bb: default_bb_context(), source: @source_joint]
{:ok, _state} = Mimic.init(opts)

assert_receive {:subscribed, TestRobot, [:sensor, :gripper_link, @source_joint], opts}
assert opts[:message_types] == [JointState]
end

test "defaults to multiplier of 1.0" do
state = init_sensor()

assert state.multiplier == 1.0
end

test "defaults to offset of 0.0" do
state = init_sensor()

assert state.offset == 0.0
end

test "accepts custom multiplier" do
state = init_sensor(multiplier: -1.0)

assert state.multiplier == -1.0
end

test "accepts custom offset" do
state = init_sensor(offset: 0.01)

assert state.offset == 0.01
end

test "stores source joint name" do
state = init_sensor()

assert state.source == @source_joint
end

test "stores mimic joint name" do
state = init_sensor()

assert state.joint_name == @mimic_joint
end
end

describe "handle_info/2 with JointState" do
test "transforms position with default multiplier and offset" do
state = init_sensor()
test_pid = self()

expect(BB.PubSub, :publish, fn robot, path, message ->
send(test_pid, {:published, robot, path, message})
:ok
end)

source_message = joint_state_message([@source_joint], [0.025])

{:noreply, _state} =
Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state)

assert_receive {:published, TestRobot, [:sensor, :gripper_link, @mimic_joint, @sensor_name],
message}

assert message.payload.names == [@mimic_joint]
assert message.payload.positions == [0.025]
end

test "applies multiplier to position" do
state = init_sensor(multiplier: -1.0)
test_pid = self()

expect(BB.PubSub, :publish, fn _robot, _path, message ->
send(test_pid, {:position, message.payload.positions})
:ok
end)

source_message = joint_state_message([@source_joint], [0.025])

{:noreply, _state} =
Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state)

assert_receive {:position, [-0.025]}
end

test "applies offset to position" do
state = init_sensor(offset: 0.01)
test_pid = self()

expect(BB.PubSub, :publish, fn _robot, _path, message ->
send(test_pid, {:position, message.payload.positions})
:ok
end)

source_message = joint_state_message([@source_joint], [0.025])

{:noreply, _state} =
Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state)

assert_receive {:position, [0.035]}
end

test "applies both multiplier and offset correctly" do
state = init_sensor(multiplier: 2.0, offset: 0.01)
test_pid = self()

expect(BB.PubSub, :publish, fn _robot, _path, message ->
send(test_pid, {:position, message.payload.positions})
:ok
end)

source_message = joint_state_message([@source_joint], [0.025])

{:noreply, _state} =
Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state)

assert_receive {:position, [position]}
assert_in_delta position, 0.06, 0.0001
end

test "replaces joint name with mimic joint name" do
state = init_sensor()
test_pid = self()

expect(BB.PubSub, :publish, fn _robot, _path, message ->
send(test_pid, {:names, message.payload.names})
:ok
end)

source_message = joint_state_message([@source_joint], [0.025])

{:noreply, _state} =
Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state)

assert_receive {:names, [@mimic_joint]}
end
end

describe "handle_info/2 with non-JointState messages" do
test "forwards message without transformation" do
state = init_sensor(message_types: [JointState, SomeOtherType])
test_pid = self()

other_message = %Message{
timestamp: System.monotonic_time(:nanosecond),
frame_id: :gripper,
payload: %{__struct__: SomeOtherType, value: 42}
}

expect(BB.PubSub, :publish, fn _robot, _path, message ->
send(test_pid, {:forwarded, message})
:ok
end)

{:noreply, _state} =
Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], other_message}, state)

assert_receive {:forwarded, ^other_message}
end
end
end