Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions ReflectometryServer/ioc_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"CorrectedReadbackUpdate",
[
"value", # The new (corrected) readback value of the axis (float)
"alarm_severity", # The alarm severity of the axis, represented as an integer (see Channel Access doc)

Check failure on line 42 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (E501)

ReflectometryServer\ioc_driver.py:42:101: E501 Line too long (111 > 100)
"alarm_status",
],
) # The alarm status of the axis, represented as an integer (see Channel Access doc)
Expand Down Expand Up @@ -72,28 +72,28 @@
synchronised: bool = True,
engineering_correction: Optional[EngineeringCorrection] = None,
pv_wrapper_for_parameter: Optional[PVWrapperForParameter] = None,
ignore_soft_limits: bool = False
):
clamp_to_soft_limits: bool = False,
) -> None:
"""
Drive the IOC based on a component
Args:
component: Component for IOC driver
motor_axis: The PV that this driver controls.
out_of_beam_positions: Provides the out of beam status as configured for this pv_wrapper.

Check failure on line 82 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (E501)

ReflectometryServer\ioc_driver.py:82:101: E501 Line too long (101 > 100)
synchronised: If True then axes will set their velocities so they arrive at the end point at the same

Check failure on line 83 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (E501)

ReflectometryServer\ioc_driver.py:83:101: E501 Line too long (113 > 100)
time; if false they will move at their current speed.
engineering_correction: the engineering correction to apply to the value from the component before it is

Check failure on line 85 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (E501)

ReflectometryServer\ioc_driver.py:85:101: E501 Line too long (116 > 100)
sent to the pv. None for no correction
pv_wrapper_for_parameter: change the pv wrapper based on the value of a parameter
ignore_soft_limits: ignore soft limits for this axis when performing a compound move.
clamp_to_soft_limits: ignore soft limits for this axis when performing a compound move.
"""
self.component = component
self.component_axis = component_axis
self._default_motor_axis = motor_axis
self._motor_axis = None
self._set_motor_axis(motor_axis, False)
self._pv_wrapper_for_parameter = pv_wrapper_for_parameter
self._ignore_soft_limits = ignore_soft_limits
self._clamp_to_soft_limits = clamp_to_soft_limits
if pv_wrapper_for_parameter is not None:
pv_wrapper_for_parameter.parameter.add_listener(
ParameterSetpointReadbackUpdate, self._on_parameter_update
Expand All @@ -103,7 +103,7 @@
self._out_of_beam_lookup = None
else:
try:
# Check type here - this can be a list of OutOfBeamPositions or just an int/float to make things neater.

Check failure on line 106 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (E501)

ReflectometryServer\ioc_driver.py:106:101: E501 Line too long (120 > 100)
if isinstance(out_of_beam_positions, (int, float)):
out_of_beam_positions = [OutOfBeamPosition(out_of_beam_positions)]

Expand Down Expand Up @@ -165,15 +165,15 @@
if trigger_update:
self._retrigger_motor_axis_updates(None)

def set_observe_mode_change_on(self, mode_changer):
def set_observe_mode_change_on(self, mode_changer) -> None:

Check failure on line 168 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (ANN001)

ReflectometryServer\ioc_driver.py:168:42: ANN001 Missing type annotation for function argument `mode_changer`
"""
Allow this driver to listen to mode change events from the mode_changer. It signs up the engineering correction.

Check failure on line 170 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (E501)

ReflectometryServer\ioc_driver.py:170:101: E501 Line too long (120 > 100)
Args:
mode_changer: object that can be observed for mode change events
"""
self._engineering_correction.set_observe_mode_change_on(mode_changer)

def _on_define_value_as(self, new_event):
def _on_define_value_as(self, new_event) -> None:

Check failure on line 176 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (ANN001)

ReflectometryServer\ioc_driver.py:176:35: ANN001 Missing type annotation for function argument `new_event`
"""
When a define value as occurs then set the value on the axis

Expand All @@ -194,9 +194,9 @@
)
self._motor_axis.define_position_as(correct_position)

def _on_correction_update(self, new_correction_value):
def _on_correction_update(self, new_correction_value) -> None:

Check failure on line 197 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (ANN001)

ReflectometryServer\ioc_driver.py:197:37: ANN001 Missing type annotation for function argument `new_correction_value`
"""
When a correction update is got from the engineering correction then trigger our own correction update after

Check failure on line 199 in ReflectometryServer/ioc_driver.py

View workflow job for this annotation

GitHub Actions / call-workflow / ruff

Ruff (E501)

ReflectometryServer\ioc_driver.py:199:101: E501 Line too long (116 > 100)
updating description.
Args:
new_correction_value (CorrectionUpdate): the new correction value
Expand All @@ -206,12 +206,12 @@
)
self.trigger_listeners(CorrectionUpdate(new_correction_value.correction, description))

def __repr__(self):
def __repr__(self) -> str:
return "{} for axis pv {} and component {}".format(
self.__class__.__name__, self._motor_axis.name, self.component.name
)

def initialise(self):
def initialise(self) -> None:
"""
Post monitors and read initial value from the axis.
"""
Expand All @@ -221,7 +221,7 @@
motor_axis.initialise()
self.initialise_setpoint()

def initialise_setpoint(self):
def initialise_setpoint(self) -> None:
"""
Initialise the setpoint beam model in the component layer with an initial value read from the motor axis.
"""
Expand Down Expand Up @@ -362,7 +362,7 @@
)
return duration

def perform_move(self, move_duration, force=False):
def perform_move(self, move_duration, force=False) -> None:
"""
Tells the driver to perform a move to the component set points within a given duration.
The axis will update the set point cache when it is changed so don't need to do it here
Expand All @@ -381,6 +381,11 @@
)
else:
self._motor_axis.record_no_cache_velocity()
if self._clamp_to_soft_limits:
if component_sp >= self._motor_axis.hlm:
self._motor_axis.sp = self._motor_axis.hlm
elif component_sp <= self._motor_axis.llm:
self._motor_axis.sp = self._motor_axis.llm

self._motor_axis.sp = self._engineering_correction.to_axis(component_sp)
elif self.at_target_setpoint():
Expand Down Expand Up @@ -484,7 +489,7 @@
displacement = out_of_beam_position.get_final_position()
return displacement, is_to_from_park

def _on_update_rbv(self, update):
def _on_update_rbv(self, update) -> None:
"""
Listener to trigger on a change of the readback value of the underlying motor.

Expand All @@ -499,7 +504,7 @@
CorrectedReadbackUpdate(corrected_new_value, update.alarm_severity, update.alarm_status)
)

def _retrigger_motor_axis_updates(self, _):
def _retrigger_motor_axis_updates(self, _) -> None:
"""
Something about the axis has changed, e.g. engineering correction or motor axis change, so we should recalcuate
anything that depends on the axis bc
Expand All @@ -514,7 +519,7 @@
if last_changing is not None:
self._on_update_is_changing(last_changing)

def _propagate_rbv_change(self, update):
def _propagate_rbv_change(self, update) -> None:
"""
Signal that the motor readback value has changed to the middle component layer. Subclass must implement this
method.
Expand Down Expand Up @@ -545,7 +550,7 @@
in_beam_status, is_at_sequence_position = True, False
return in_beam_status, is_at_sequence_position

def _on_update_sp(self, update):
def _on_update_sp(self, update) -> None:
"""
Updates the cached set point from the axis with a new value.

Expand All @@ -556,7 +561,7 @@
update.value, self._get_component_sp(True)
)

def _on_update_is_changing(self, update):
def _on_update_is_changing(self, update) -> None:
"""
Updates the cached is_moving field for the motor record with a new value if the underlying motor rbv is changing

Expand Down Expand Up @@ -590,14 +595,15 @@
"""
llm = self._motor_axis.llm
hlm = self._motor_axis.hlm

component_sp = self._get_component_sp()
if self._sp_cache is None or component_sp is None:
return True, self._sp_cache, hlm, llm

if self._ignore_soft_limits:
if self._clamp_to_soft_limits:
# if clamping, we will always be within limits, so return true here.
return True, component_sp, hlm, llm

if self._sp_cache is None or component_sp is None:
return True, self._sp_cache, hlm, llm

inside_limits = llm <= component_sp <= hlm
return inside_limits, component_sp, hlm, llm

Expand All @@ -622,5 +628,5 @@
if self._motor_axis != new_axis:
self._set_motor_axis(new_axis, True)

def _move_to_parking_sequence(self, _: ParkingSequenceUpdate):
def _move_to_parking_sequence(self, _: ParkingSequenceUpdate) -> None:
self.perform_move(0)
Loading