Skip to content
Merged
2 changes: 2 additions & 0 deletions docs/agents/hwp_gripper.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ An example site-config-file block::
'instance-id': 'hwp-gripper',
'arguments': [ '--mcu_ip', '10.10.10.115',
'--control_port', 8041,
'--warm-grip-distance, 9.2, 10.6, 9.8,
'--adjustment-distance, 0., 0., 0.,
'--supervisor-id', 'hwp-supervisor',
'--no-data-warn-time', 60,
'--no-data-shutdown-time', 300, # 5 minutes
Expand Down
236 changes: 199 additions & 37 deletions socs/agents/hwp_gripper/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def __init__(self, agent, args):
self._initialized = False
self.mcu_ip = args.mcu_ip
self.control_port = args.control_port
self.warm_grip_distance = args.warm_grip_distance
self.adjustment_distance = args.adjustment_distance

self.shutdown_mode = False
self.supervisor_id = args.supervisor_id
Expand Down Expand Up @@ -203,7 +205,7 @@ def move(self, session, params=None):
"""

if self._get_hwp_freq() > 0.1:
self.log.warning("Not moving actuators while HWP is spinning")
self.log.warn("Not moving actuators while HWP is spinning")
return False, "HWP is spinning, not moving actuators"

return_dict = self._run_client_func(
Expand Down Expand Up @@ -315,14 +317,16 @@ def act(self, session, params=None):
def is_cold(self, session, params=None):
"""is_cold(value=False)

**Task** - Set the code to operate in warm/cold grip configuration
**Task** - Set the limit switches to warm/cold grip configuration

Parameters:
value (bool): Set to warm grip (False) or cold grip (True)

Notes:
Configures the software to query the correct set of limit switches. The
maximum extension of the actuators depends on the cryostat temperature.
Configures the software to query the correct set of limit switches.
Warm grip configuration enables both warm and cold limit switches.
Cold grip configuration enables only cold limit switches. The maximum
extension of the actuators depends on the cryostat temperature.

The most recent data collected is stored in session data in the
structure:
Expand Down Expand Up @@ -384,27 +388,11 @@ def shutdown(self, session, params=None):
def grip(self, session, params=None):
"""grip()

**Task** - Series of commands to automatically grip the HWP.
This will return grippers to their home position, then move them each
inwards incrementally. If the HWP is spinning, this will not run.
"""
if self._get_hwp_freq() > 0.1:
return False, "Not gripping HWP because HWP is spinning"

self._grip_hwp(session, check_shutdown=True)
return True, "Finished gripping procedure"

def _grip_hwp(self, session, check_shutdown=True):
"""
Helper function to grip the HWP that can be called by the grip_hwp
task or by the shutdown procedure. This will return grippers to their
home position, then move them each inwards incrementally.

Args
------
session (OpSession):
Session object for current operation. This function will add to
session data.
**Task** - Series of commands to automatically warm grip the HWP. This
assumes that HWP is cold. This will return grippers to their home position,
then move them each inwards incrementally until warm limit switches are
tiggered. If the HWP is spinning, this will not run. If this fails to grip
hwp, this will return grippers to their home position.

Notes:
The most recent data collected is stored in session data in the
Expand All @@ -415,38 +403,94 @@ def _grip_hwp(self, session, check_shutdown=True):
..
{'result': True, 'log': [.., .., etc]}]
"""
if self._get_hwp_freq() > 0.1:
return False, "Not gripping HWP because HWP is spinning"

result, session.data = self._grip_hwp(check_shutdown=True)
return result, "Finished gripping procedure"

def _grip_hwp(self, check_shutdown=True):
"""
Helper function to grip the HWP that can be called by the grip_hwp
task or by the shutdown procedure. This will return grippers to their
home position, then move them each inwards incrementally.
"""
data = {
'responses': [],
}
session.data = data

def run_and_append(func, *args, **kwargs):
return_dict = self._run_client_func(func, *args, **kwargs)
data['responses'].append(return_dict)
return return_dict

# We should check if hwp is already gripper or not
return_dict = run_and_append(self.client.get_state, job='grip',
check_shutdown=check_shutdown)
act_results = return_dict['result']['actuators']
limit_switch_state = act_results[0]['limits']['warm_grip']['state'] | \
act_results[1]['limits']['warm_grip']['state'] | \
act_results[2]['limits']['warm_grip']['state']
if limit_switch_state:
self.log.warn("HWP is already gripped. Do nothing.")
return True, data

# Reset alarms
run_and_append(self.client.reset, job='grip', check_shutdown=check_shutdown)
time.sleep(1)

# Enable power to actuators
run_and_append(self.client.power, True, job='grip', check_shutdown=check_shutdown)
time.sleep(1)

# Disable brake
run_and_append(self.client.brake, False, job='grip', check_shutdown=check_shutdown)
time.sleep(1)

# Ignore limit switches to move grippers backwards
run_and_append(self.client.force, True, job='grip', check_shutdown=check_shutdown)

# Ensure that the limit switches are in warm configuration
run_and_append(self.client.is_cold, False, job='grip', check_shutdown=check_shutdown)

# Send grippers to their home position
run_and_append(self.client.home, job='grip', check_shutdown=check_shutdown)

# Ensure that we do not ignore limit switches
run_and_append(self.client.force, False, job='grip', check_shutdown=check_shutdown)

finished = [False, False, False]
for actuator, _ in enumerate(finished):
while not finished[actuator]:

# Move to the warm_grip_distance - 5 mm.
for actuator in range(3):
distance = self.warm_grip_distance[actuator] - 5.
return_dict = run_and_append(self.client.move, 'POS', actuator + 1, distance,
job='grip', check_shutdown=check_shutdown)
time.sleep(1)

# Move actator inwards by 0.1 mm step, warm_grip_distance + 1 is absolute maximum
for i in range(60):
if all(finished):
break
for actuator, _ in enumerate(finished):
if finished[actuator]:
continue

# Move actuator inwards until warm-limit is hit
run_and_append(self.client.move, 'POS', actuator + 1, 0.2,
job='grip', check_shutdown=check_shutdown)
# If the warm limit is hit, return_dict['result'] will be False
return_dict = run_and_append(self.client.move, 'POS', actuator + 1, 0.1,
job='grip', check_shutdown=check_shutdown)

# Reset alarms. If the warm-limit is hit, the alarm will be triggered
# and return_dict['result'] will be True
return_dict = run_and_append(self.client.reset, job='grip',
check_shutdown=check_shutdown)
if not return_dict['result']:
# Reset alarms.
run_and_append(self.client.reset, job='grip',
check_shutdown=check_shutdown)
time.sleep(1)

if return_dict['result']:
# If the warm-limit is hit, move the actuator outwards bit
# If the warm-limit is hit, move the actuator outwards by 0.5 mm
# to un-trigger the warm-limit. This is because gripper sligthly
# overshoot the limit switches. The outward movement compensates
# for the overshoot and hysteresys of the limit switches.
run_and_append(self.client.is_cold, True, job='grip',
check_shutdown=check_shutdown)
time.sleep(1)
Expand All @@ -460,6 +504,35 @@ def run_and_append(func, *args, **kwargs):

finished[actuator] = True

# Return grippers back to home is something is wrong
if not all(finished):
self.log.error('Failed to grip HWP. Retract grippers.')
run_and_append(self.client.force, True, job='grip',
check_shutdown=check_shutdown)
time.sleep(1)

run_and_append(self.client.home, job='grip',
check_shutdown=check_shutdown)
time.sleep(1)

run_and_append(self.client.force, False, job='grip',
check_shutdown=check_shutdown)

# Adjust gripper position if you need
else:
run_and_append(self.client.is_cold, True, job='grip',
check_shutdown=check_shutdown)
time.sleep(1)

for actuator in range(3):
run_and_append(self.client.move, 'POS', actuator + 1,
self.adjustment_distance[actuator],
job='grip', check_shutdown=check_shutdown)

run_and_append(self.client.is_cold, False, job='grip',
check_shutdown=check_shutdown)
time.sleep(1)

# Enable breaks
run_and_append(self.client.brake, True, job='grip',
check_shutdown=check_shutdown)
Expand All @@ -470,7 +543,87 @@ def run_and_append(func, *args, **kwargs):
check_shutdown=check_shutdown)
time.sleep(1)

return data
# We should stop schedule if we have an error in this task
if not all(finished):
self.log.error('Failed to grip HWP. Grippers are retracted.')
return False, data

return True, data

def ungrip(self, session, params=None):
"""ungrip()

**Task** - Series of commands to automatically ungrip the HWP.
This will return grippers to their home position, and retract
grippers as much as possible.

Notes:
The most recent data collected is stored in session data in the
structure:

>>> response.session['responses']
[{'result': True, 'log': [.., .., etc]},
..
{'result': True, 'log': [.., .., etc]}]
"""

result, session.data = self._ungrip_hwp(check_shutdown=True)
return result, "Finished ungripping procedure"

def _ungrip_hwp(self, check_shutdown=True):
"""
Helper function to ungrip the HWP that can be called by the ungrip_hwp
task or by the shutdown procedure. This will return grippers to their
home position, and retract as much as possible.
"""
data = {
'responses': [],
}

def run_and_append(func, *args, **kwargs):
return_dict = self._run_client_func(func, *args, **kwargs)
data['responses'].append(return_dict)
return return_dict

run_and_append(self.client.reset, job='ungrip', check_shutdown=check_shutdown)
# Enable power to actuators
run_and_append(self.client.power, True, job='ungrip', check_shutdown=check_shutdown)
# Disable breaks
run_and_append(self.client.brake, False, job='ungrip', check_shutdown=check_shutdown)
# Ignore limit switches
run_and_append(self.client.force, True, job='ungrip', check_shutdown=check_shutdown)

# Send grippers to their home position
run_and_append(self.client.home, job='ungrip', check_shutdown=check_shutdown)

# Move actuator outwards as much as possible
for actuator in range(1, 4):
run_and_append(self.client.move, 'POS', actuator, -1.9,
job='ungrip', check_shutdown=check_shutdown)
time.sleep(1)

# Enable brake
run_and_append(self.client.brake, True, job='ungrip', check_shutdown=check_shutdown)
# Power off actuators
run_and_append(self.client.power, False, job='ungrip', check_shutdown=check_shutdown)
# Enable limit switches
run_and_append(self.client.force, False, job='ungrip', check_shutdown=check_shutdown)
time.sleep(1)

# check limit switch state
return_dict = run_and_append(self.client.get_state, job='ungrip',
check_shutdown=check_shutdown)
act_results = return_dict['result']['actuators']
limit_switch_state = act_results[0]['limits']['warm_grip']['state'] | \
act_results[1]['limits']['warm_grip']['state'] | \
act_results[2]['limits']['warm_grip']['state']

# Stop schedule if the limit switch state is wrong
if limit_switch_state:
self.log.error("Failed to ungrip HWP. Limit switch state is not as expected.")
return False, data

return True, data

def cancel_shutdown(self, session, params=None):
"""cancel_shutdown()
Expand Down Expand Up @@ -659,6 +812,14 @@ def make_parser(parser=None):
help='IP of Gripper Beaglebone')
pgroup.add_argument('--control-port', type=int, default=8041,
help='Port for actuator control as set by the Beaglebone code')
pgroup.add_argument('--warm-grip-distance', action='store', type=float, nargs=3,
default=[10.0, 10.0, 10.0],
help='Nominal distance for warm grip position (mm). This needs'
'to be multiple of 0.1')
pgroup.add_argument('--adjustment-distance', action='store', type=float, nargs=3,
default=[0, 0, 0],
help='Adjustment distance to compensate the misalignment of '
'limit switches (mm). This needs to be multiple of 0.1')
pgroup.add_argument('--supervisor-id', type=str,
help='Instance ID for HWP Supervisor agent')
pgroup.add_argument('--no-data-warn-time', type=float, default=60,
Expand Down Expand Up @@ -698,6 +859,7 @@ def main(args=None):
agent.register_task('force', gripper_agent.force)
agent.register_task('shutdown', gripper_agent.shutdown)
agent.register_task('grip', gripper_agent.grip)
agent.register_task('ungrip', gripper_agent.ungrip)
agent.register_task('cancel_shutdown', gripper_agent.cancel_shutdown)

runner.run(agent, auto_reconnect=True)
Expand Down