|
| 1 | +"""Muon specific bluesky device helpers.""" |
| 2 | + |
| 3 | +import asyncio |
| 4 | +import logging |
| 5 | +import typing |
| 6 | + |
| 7 | +import lmfit |
| 8 | +import numpy as np |
| 9 | +import numpy.typing as npt |
| 10 | +import scipp as sc |
| 11 | +from lmfit import Model |
| 12 | +from lmfit.model import ModelResult |
| 13 | +from numpy.typing import NDArray |
| 14 | +from ophyd_async.core import ( |
| 15 | + Device, |
| 16 | + StandardReadable, |
| 17 | + soft_signal_r_and_setter, |
| 18 | +) |
| 19 | + |
| 20 | +from ibex_bluesky_core.devices.dae import Dae, DaeSpectra |
| 21 | +from ibex_bluesky_core.devices.simpledae import Reducer |
| 22 | +from ibex_bluesky_core.utils import calculate_polarisation |
| 23 | + |
| 24 | +logger = logging.getLogger(__name__) |
| 25 | + |
| 26 | +__all__ = [ |
| 27 | + "MuonAsymmetryReducer", |
| 28 | + "damped_oscillator", |
| 29 | + "double_damped_oscillator", |
| 30 | +] |
| 31 | + |
| 32 | + |
| 33 | +def damped_oscillator( |
| 34 | + t: NDArray[np.floating], |
| 35 | + B: float, # noqa: N803 |
| 36 | + A_0: float, # noqa: N803 |
| 37 | + omega_0: float, |
| 38 | + phi_0: float, |
| 39 | + lambda_0: float, |
| 40 | +) -> NDArray[np.floating]: |
| 41 | + r"""Equation for a damped oscillator with an offset, as a function of time :math:`t`. |
| 42 | +
|
| 43 | + .. math:: |
| 44 | +
|
| 45 | + B + A_0 \cos(\omega_0 t + \phi_0) e^{-\lambda_0 t} |
| 46 | + """ |
| 47 | + return B + A_0 * np.cos(omega_0 * t + phi_0) * np.exp(-t * lambda_0) |
| 48 | + |
| 49 | + |
| 50 | +def double_damped_oscillator( # noqa: PLR0913 PLR0917 (model is just this complex) |
| 51 | + t: NDArray[np.floating], |
| 52 | + B: float, # noqa: N803 |
| 53 | + A_0: float, # noqa: N803 |
| 54 | + omega_0: float, |
| 55 | + phi_0: float, |
| 56 | + lambda_0: float, |
| 57 | + A_1: float, # noqa: N803 |
| 58 | + omega_1: float, |
| 59 | + phi_1: float, |
| 60 | + lambda_1: float, |
| 61 | +) -> NDArray[np.floating]: |
| 62 | + r"""Equation for two damped oscillators with an offset, as a function of time :math:`t`. |
| 63 | +
|
| 64 | + .. math:: |
| 65 | +
|
| 66 | + B + A_0 \cos(\omega_0 t + \phi_0) e^{-\lambda_0 t} |
| 67 | + + A_1 \cos(\omega_1 t + \phi_1) e^{-\lambda_1 t} |
| 68 | + """ |
| 69 | + return ( |
| 70 | + B |
| 71 | + + A_0 * np.cos(omega_0 * t + phi_0) * np.exp(-t * lambda_0) |
| 72 | + + A_1 * np.cos(omega_1 * t + phi_1) * np.exp(-t * lambda_1) |
| 73 | + ) |
| 74 | + |
| 75 | + |
| 76 | +class MuonAsymmetryReducer(Reducer, StandardReadable): |
| 77 | + r"""DAE reducer which exposes a fitted asymmetry quantity. |
| 78 | +
|
| 79 | + This reducer takes two lists of detectors; a forward scattering set of detectors, |
| 80 | + :math:`F`, and a backward scattering set, :math:`B`. |
| 81 | +
|
| 82 | + The spin-asymmetry is computed with: |
| 83 | +
|
| 84 | + .. math:: |
| 85 | +
|
| 86 | + a = \frac{F - \alpha B}{F + \alpha B} |
| 87 | +
|
| 88 | + Where :math:`\alpha` is a user-specified scalar constant, :math:`F` is an array of |
| 89 | + total forward-scattering detector counts against time, and :math:`B` is an array of |
| 90 | + backward-scattering detector counts against time. This results in an array of |
| 91 | + asymmetry (:math:`a`) against time. |
| 92 | +
|
| 93 | + Finally, the array of asymmetry (:math:`a`) against time (:math:`t`, in nanoseconds) |
| 94 | + is fitted using a user-specified model - for example, one of the two models below |
| 95 | + (which are implemented by |
| 96 | + :py:obj:`damped_oscillator <ibex_bluesky_core.devices.muon.damped_oscillator>` and |
| 97 | + :py:obj:`double_damped_oscillator <ibex_bluesky_core.devices.muon.double_damped_oscillator>`). |
| 98 | +
|
| 99 | + .. math:: |
| 100 | +
|
| 101 | + a = B + A_0 cos({ω_0} {t} + {φ_0}) e^{-λ_0 t} |
| 102 | +
|
| 103 | + a = B + A_0 cos({ω_0} {t} + {φ_0}) e^{-λ_0 t} + A_1 cos({ω_1} {t} + {φ_1}) e^{-λ_1 t} |
| 104 | +
|
| 105 | + The resulting fit parameters, along with their uncertainties, are exposed as |
| 106 | + signals from this reducer. For example, for a model like: |
| 107 | +
|
| 108 | + .. code-block:: python |
| 109 | +
|
| 110 | + def my_model(t, m, c): |
| 111 | + return m * t + c |
| 112 | +
|
| 113 | + model = lmfit.Model(my_model) |
| 114 | +
|
| 115 | + The exposed signals will include ``m``, ``m_err``, ``c``, and ``c_err``. |
| 116 | +
|
| 117 | + .. note:: |
| 118 | +
|
| 119 | + The independent variable must be called `t` (time). |
| 120 | +
|
| 121 | + An example setup showing how to fit a linear model to asymmetry using this |
| 122 | + reducer is: |
| 123 | +
|
| 124 | + .. code-block:: python |
| 125 | +
|
| 126 | + def linear(t, m, c): |
| 127 | + return m * t + c |
| 128 | +
|
| 129 | + # lmfit Parameters describing initial guesses and fitting constraints |
| 130 | + parameters = lmfit.Parameters() |
| 131 | + parameters.add("m", 0) |
| 132 | + parameters.add("c", 0, min=0, max=1000) |
| 133 | +
|
| 134 | + controller = RunPerPointController(save_run=True) |
| 135 | + waiter = PeriodGoodFramesWaiter(500) |
| 136 | + reducer = MuonAsymmetryReducer( |
| 137 | + prefix=prefix, |
| 138 | + # Selects spectra 1-4 for forwards-scattering, spectra 5-8 for backwards-scattering |
| 139 | + forward_detectors=np.array([1, 2, 3, 4]), |
| 140 | + backward_detectors=np.array([5, 6, 7, 8]), |
| 141 | + # Optional: rebin the muon data to these time bins before fitting. |
| 142 | + time_bin_edges=sc.linspace( |
| 143 | + start=0, stop=200, num=100, unit=sc.units.ns, dtype="float64", dim="tof" |
| 144 | + ), |
| 145 | + # Scalar multiplier applied to backwards detectors in asymmetry calculation. |
| 146 | + alpha=1.0, |
| 147 | + model=lmfit.Model(linear), |
| 148 | + fit_parameters=parameters, |
| 149 | + ) |
| 150 | +
|
| 151 | + dae = SimpleDae( |
| 152 | + prefix=prefix, |
| 153 | + controller=controller, |
| 154 | + waiter=waiter, |
| 155 | + reducer=reducer, |
| 156 | + ) |
| 157 | +
|
| 158 | + """ |
| 159 | + |
| 160 | + def __init__( # noqa: PLR0913 (complex function, mitigated by kw-only arguments) |
| 161 | + self, |
| 162 | + *, |
| 163 | + prefix: str, |
| 164 | + forward_detectors: npt.NDArray[np.int32], |
| 165 | + backward_detectors: npt.NDArray[np.int32], |
| 166 | + alpha: float = 1.0, |
| 167 | + time_bin_edges: sc.Variable | None = None, |
| 168 | + model: Model, |
| 169 | + fit_parameters: lmfit.Parameters, |
| 170 | + ) -> None: |
| 171 | + """Create a new Muon asymmetry reducer. |
| 172 | +
|
| 173 | + Args: |
| 174 | + prefix: PV prefix for the |
| 175 | + :py:obj:`SimpleDae <ibex_bluesky_core.devices.simpledae.SimpleDae>`. |
| 176 | + forward_detectors: numpy :external+numpy:py:obj:`array <numpy.array>` of detector |
| 177 | + spectra to select for forward-scattering. |
| 178 | + For example, ``np.array([1, 2, 3])`` selects spectra 1-3 inclusive. |
| 179 | + All detectors in this list are assumed to have the same time |
| 180 | + channel boundaries. |
| 181 | + backward_detectors: numpy :external+numpy:py:obj:`array <numpy.array>` of detector |
| 182 | + spectra to select for backward-scattering. |
| 183 | + alpha: Scaling factor used in asymmetry calculation, applied to backward detector |
| 184 | + counts. Defaults to 1. |
| 185 | + time_bin_edges: Optional scipp :external+scipp:py:obj:`Variable <scipp.Variable>` |
| 186 | + describing bin-edges for rebinning the data before fitting. |
| 187 | + This must be bin edge coordinates, aligned along a scipp dimension label of |
| 188 | + "tof", have a unit of time, for example nanoseconds, and must be strictly ascending. |
| 189 | + Use :py:obj:`None` to not apply any rebinning to the data. |
| 190 | + model: :external:py:obj:`lmfit.model.Model` object describing the model to fit to |
| 191 | + the muon data. The independent variable must be :math:`t` (time, in nanoseconds). |
| 192 | + fit_parameters: :external:py:obj:`lmfit.parameter.Parameters` object describing |
| 193 | + the initial parameters (and contraints) for each fit parameter. |
| 194 | +
|
| 195 | + """ |
| 196 | + self._forward_detectors = forward_detectors |
| 197 | + self._backward_detectors = backward_detectors |
| 198 | + self._alpha = alpha |
| 199 | + self._model = model |
| 200 | + self._time_bin_edges = time_bin_edges |
| 201 | + |
| 202 | + self._first_det = DaeSpectra( |
| 203 | + dae_prefix=prefix + "DAE:", spectra=int(forward_detectors[0]), period=0 |
| 204 | + ) |
| 205 | + # ask for independent variables which should be a single T |
| 206 | + |
| 207 | + self._fit_parameters = fit_parameters |
| 208 | + self._parameter_setters = {} |
| 209 | + self._parameter_error_setters = {} |
| 210 | + |
| 211 | + missing = set(model.param_names) - set(fit_parameters.keys()) |
| 212 | + if missing: |
| 213 | + raise ValueError(f"Missing parameters: {missing}") |
| 214 | + |
| 215 | + for param in model.param_names: |
| 216 | + signal, setter = soft_signal_r_and_setter(float, 0.0) |
| 217 | + setattr(self, param, signal) |
| 218 | + self._parameter_setters[param] = setter |
| 219 | + |
| 220 | + error_signal, error_setter = soft_signal_r_and_setter(float, 0.0) |
| 221 | + setattr(self, f"{param}_err", error_signal) |
| 222 | + self._parameter_error_setters[param] = error_setter |
| 223 | + |
| 224 | + super().__init__(name="") |
| 225 | + |
| 226 | + def _rebin_and_sum(self, counts: NDArray[np.int32], time_coord: sc.Variable) -> sc.DataArray: |
| 227 | + da = sc.DataArray( |
| 228 | + data=sc.array( |
| 229 | + dims=["spec", "tof"], |
| 230 | + values=counts, |
| 231 | + variances=counts, |
| 232 | + unit=sc.units.counts, |
| 233 | + dtype="float64", |
| 234 | + ), |
| 235 | + coords={ |
| 236 | + "tof": time_coord, |
| 237 | + }, |
| 238 | + ) |
| 239 | + da = da.sum(dim="spec") |
| 240 | + |
| 241 | + if self._time_bin_edges is not None: |
| 242 | + da = da.rebin({"tof": self._time_bin_edges}) |
| 243 | + |
| 244 | + return da |
| 245 | + |
| 246 | + def _fit_data(self, asymmetry: sc.DataArray) -> ModelResult | None: |
| 247 | + bin_edges = asymmetry.coords["tof"].to(unit=sc.units.ns, dtype="float64").values |
| 248 | + bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 |
| 249 | + |
| 250 | + result = self._model.fit( |
| 251 | + asymmetry.values, |
| 252 | + t=bin_centers, |
| 253 | + weights=1.0 / (asymmetry.variances**0.5), |
| 254 | + params=self._fit_parameters, |
| 255 | + nan_policy="omit", |
| 256 | + ) |
| 257 | + |
| 258 | + return result |
| 259 | + |
| 260 | + def _calculate_asymmetry( |
| 261 | + self, current_period_data: NDArray[np.int32], first_spec_dataarray: sc.DataArray |
| 262 | + ) -> sc.DataArray: |
| 263 | + forward = self._rebin_and_sum( |
| 264 | + current_period_data[self._forward_detectors, :], first_spec_dataarray.coords["tof"] |
| 265 | + ) |
| 266 | + backward = self._rebin_and_sum( |
| 267 | + current_period_data[self._backward_detectors, :], first_spec_dataarray.coords["tof"] |
| 268 | + ) |
| 269 | + forward.variances += 0.5 |
| 270 | + backward.variances += 0.5 |
| 271 | + return calculate_polarisation(forward, backward, self._alpha) |
| 272 | + |
| 273 | + async def reduce_data(self, dae: Dae) -> None: |
| 274 | + """Fitting asymmetry to a set of DAE data.""" |
| 275 | + logger.info("starting reduction reads") |
| 276 | + ( |
| 277 | + current_period_data, |
| 278 | + first_spec_dataarray, |
| 279 | + ) = await asyncio.gather( |
| 280 | + dae.trigger_and_get_specdata(), |
| 281 | + self._first_det.read_spectrum_dataarray(), |
| 282 | + ) |
| 283 | + |
| 284 | + logger.info("starting reduction") |
| 285 | + |
| 286 | + asymmetry = self._calculate_asymmetry(current_period_data, first_spec_dataarray) |
| 287 | + fit_result = self._fit_data(asymmetry) |
| 288 | + |
| 289 | + if fit_result is None: |
| 290 | + raise ValueError( |
| 291 | + "MuonAsymmetryReducer failed to fit asymmetry model to muon data.\n" |
| 292 | + "Check beamline setup." |
| 293 | + ) |
| 294 | + |
| 295 | + for param in self._parameter_setters: |
| 296 | + result = fit_result.params[param] |
| 297 | + |
| 298 | + self._parameter_setters[param](result.value) |
| 299 | + self._parameter_error_setters[param](result.stderr) |
| 300 | + |
| 301 | + logger.info("reduction complete") |
| 302 | + |
| 303 | + def additional_readable_signals(self, dae: Dae) -> list[Device]: |
| 304 | + """Publish interesting signals derived or used by this reducer.""" |
| 305 | + signal_values = [] |
| 306 | + signal_errors = [] |
| 307 | + |
| 308 | + for param in self._model.param_names: |
| 309 | + signal_values.append(getattr(self, param)) |
| 310 | + signal_errors.append(getattr(self, f"{param}_err")) |
| 311 | + |
| 312 | + return signal_values + signal_errors |
| 313 | + |
| 314 | + # As we have dynamic attributes, tell pyright that __getattr__ may return any type. |
| 315 | + __getattr__: typing.Callable[[str], typing.Any] |
0 commit comments