Skip to content

Commit

Permalink
[fix] check if tescan stage moves to the requested position otherwise…
Browse files Browse the repository at this point in the history
… raise an error
  • Loading branch information
K4rishma committed Nov 25, 2024
1 parent a698f2d commit 80d7b17
Showing 1 changed file with 72 additions and 7 deletions.
79 changes: 72 additions & 7 deletions src/odemis/driver/tescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@
Odemis. If not, see http://www.gnu.org/licenses/.
'''

import queue
import gc
import logging
import math
import numpy
from odemis import model, util
from odemis.model import (HwError, isasync, CancellableThreadPoolExecutor,
roattribute, oneway)
from odemis.util import TimeoutError
import queue
import re
import socket
from tescan import sem, CancelledError
import threading
import time
import weakref
from typing import Dict

import numpy
from tescan import sem, CancelledError

from odemis import model, util
from odemis.model import (HwError, isasync, CancellableThreadPoolExecutor,
roattribute, oneway)
from odemis.util import TimeoutError
from odemis.util.driver import isNearPosition

ACQ_CMD_UPD = 1
ACQ_CMD_TERM = 2
Expand Down Expand Up @@ -1027,6 +1031,12 @@ def _waitSync(self):
self._evtq.get()


STAGE_WAIT_DURATION = 20e-03 # s
STAGE_FRACTION_TOTAL_MOVE = 0.01 # tolerance of the requested stage movement
STAGE_TOL_LINEAR = 1e-06 # in m, minimum tolerance
STAGE_TOL_ROTATION = 0.00436 # in radians (0.25 degrees), minimum tolerance


class Stage(model.Actuator):
"""
This is an extension of the model.Actuator class. It provides functions for
Expand Down Expand Up @@ -1096,6 +1106,58 @@ def _pollXYZ(self):
except Exception:
logging.exception("Unexpected failure during XYZ polling")

def _checkPosition(self, ref_pos: Dict[str, float], pos: Dict[str, float], rel: bool = False):
"""
Checks if the current stage position is equal to the given position.
:param ref_pos: stage position before movement
:param pos: requested stage position in relative or absolute coordinates
:param rel: By default False, requested stage position is in absolution position otherwise relative
"""
# Get the target position in absolute coordinates
if rel:
target_pos = {ax: val + pos[ax] for ax, val in pos.items()}
else:
target_pos = {ax: pos[ax] for ax, val in pos.items()}

# Drop axes from the original position, which are not important because they have not moved
orig_pos = {a: ref_pos[a] for a, nv in target_pos.items() if nv != ref_pos[a]}

# TODO: base the timeout on stage movement time estimation, needs to be checked on hardware
wait_stage_move = 5 # s
expected_end_time = time.time() + wait_stage_move # s

# Update rotational and linear tolerances according to the magnitude of requested change
linear_axes_to_check = {"x", "y", "z"}.intersection(target_pos.keys())
rotational_axes_to_check = {"rx", "rz"}.intersection(target_pos.keys())
current_pos = self._position
movement_req = {ax: STAGE_FRACTION_TOTAL_MOVE * abs(target_pos[ax] - current_pos[ax]) for ax in
target_pos.keys()}
tol_linear = STAGE_TOL_LINEAR
tol_rotation = STAGE_TOL_ROTATION
if linear_axes_to_check:
movement_req_linear = [movement_req[ax] for ax in linear_axes_to_check]
tol_linear = max(min(movement_req_linear), STAGE_TOL_LINEAR) # m

if rotational_axes_to_check:
movement_req_rotational = [movement_req[ax] for ax in rotational_axes_to_check]
tol_rotation = max(min(movement_req_rotational), STAGE_TOL_ROTATION) # radians

axes_to_check = linear_axes_to_check | rotational_axes_to_check

while not isNearPosition(current_pos=current_pos, target_position=target_pos,
axes=axes_to_check, rot_axes=rotational_axes_to_check,
atol_linear=tol_linear, atol_rotation=tol_rotation):
time.sleep(STAGE_WAIT_DURATION)
self._updatePosition()
current_pos = self._position
if time.time() > expected_end_time:
raise ValueError(
"Stage position after + %s s is %s instead of requested position: %s. Start position:%s"
"Aborting move.", wait_stage_move, current_pos, target_pos, orig_pos)
else:
logging.debug("Position has updated fully: from %s -> %s", orig_pos,
current_pos)

def _updatePosition(self):
"""
update the position VA
Expand Down Expand Up @@ -1140,6 +1202,8 @@ def _doMoveAbs(self, pos: dict):

current_pos.update(req_pos)

pos_before_move = self._position

# always issue a move command containing values for all 5 axes so
# there is no separate code needed for backward compatibility
self.parent._device.StgMoveTo(current_pos["x"],
Expand All @@ -1158,6 +1222,7 @@ def _doMoveAbs(self, pos: dict):

logging.debug("Stage move to %s completed", pos)
self._updatePosition()
self._checkPosition(pos_before_move, pos, rel=False)

def _doMoveRel(self, shift: dict):
"""
Expand Down

0 comments on commit 80d7b17

Please sign in to comment.