Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrating sel phase optimizer files #29

Merged
merged 10 commits into from
Oct 29, 2024
115 changes: 115 additions & 0 deletions applications/sel_phase_optimizer/sel_phase_linac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
import os
import pathlib
from typing import Optional

import numpy as np
from lcls_tools.common.controls.pyepics.utils import PV
from lcls_tools.common.logger.logger import custom_logger, FORMAT_STRING
from scipy import stats

from utils.sc_linac.cavity import Cavity
from utils.sc_linac.linac import Machine
from utils.sc_linac.linac_utils import RF_MODE_SELAP

MAX_STEP = 5
MULT = -51.0471


class SELCavity(Cavity):
def __init__(
self,
cavity_num,
rack_object,
):
super().__init__(cavity_num=cavity_num, rack_object=rack_object)
self._q_waveform_pv: Optional[PV] = None
self._i_waveform_pv: Optional[PV] = None
self._sel_poff_pv: Optional[PV] = None

self.logger = custom_logger(f"{self} SEL Phase Opt Logger")
file_directory = pathlib.Path(__file__).parent.resolve()
self.logfile = f"{file_directory}/logfiles/cm{self.cryomodule.name}/{self.number}_sel_phase_opt.log"
print(self.logfile)
os.makedirs(os.path.dirname(self.logfile), exist_ok=True)

self.file_handler = logging.FileHandler(self.logfile, mode="a")
self.file_handler.setFormatter(logging.Formatter(FORMAT_STRING))
self.logger.addHandler(self.file_handler)

@property
def sel_poff_pv(self) -> PV:
if not self._sel_poff_pv:
self._sel_poff_pv = PV(self.pv_addr("SEL_POFF"))
return self._sel_poff_pv

@property
def sel_phase_offset(self):
return self.sel_poff_pv.get()

@property
def i_waveform(self):
if not self._i_waveform_pv:
self._i_waveform_pv = PV(self.pv_addr("CTRL:IWF"))
return self._i_waveform_pv.get()

@property
def q_waveform(self):
if not self._q_waveform_pv:
self._q_waveform_pv = PV(self.pv_addr("CTRL:QWF"))
return self._q_waveform_pv.get()

def can_be_straightened(self) -> bool:
return (
self.is_online
and self.is_on
and self.rf_mode == RF_MODE_SELAP
and self.aact > 1
)

def straighten_iq_plot(self) -> float:
"""
TODO make the return value more intuitive
:return: change in SEL phase offset
"""

if not self.can_be_straightened():
return 0

start_val = self.sel_phase_offset
iwf = self.i_waveform
qwf = self.q_waveform

[slop, inter] = stats.siegelslopes(iwf, qwf)

if not np.isnan(slop):
chisum = 0
for nn, yy in enumerate(qwf):
chisum += (yy - (slop * iwf[nn] + inter)) ** 2 / (
slop * iwf[nn] + inter
)

step = slop * MULT
if abs(step) > MAX_STEP:
step = MAX_STEP * np.sign(step)
self.logger.warning(
f"Desired SEL Phase Offset change too large, moving by {step} instead"
)

if start_val + step < -180:
step = step + 360
elif start_val + step > 180:
step = step - 360

self.sel_poff_pv.put(start_val + step)
self.logger.info(
f"Changed SEL Phase Offset by {step:5.2f} with chi^2 {chisum:.2g}"
)
return step

else:
self.logger.warning("IQ slope is NaN, not changing SEL Phase Offset")
return 0


SEL_MACHINE: Machine = Machine(cavity_class=SELCavity)
65 changes: 65 additions & 0 deletions applications/sel_phase_optimizer/sel_phase_optimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python

"""
Script to optimize SEL phase offsets
Originally by J. Nelson, refactored by L. Zacarias
"""
import sys
import time
from typing import List

from lcls_tools.common.controls.pyepics.utils import PV, PVInvalidError

sys.path.append("/home/physics/srf/sc_linac_physics")
from applications.sel_phase_optimizer.sel_phase_linac import ( # noqa: E402
SEL_MACHINE,
SELCavity,
MAX_STEP,
)

HEARTBEAT_PV = PV("PHYS:SYS0:1:SC_SEL_PHAS_OPT_HEARTBEAT")


def update_heartbeat(time_to_wait: int):
print(f"Sleeping for {time_to_wait} seconds")
for _ in range(time_to_wait):
try:
HEARTBEAT_PV.put(HEARTBEAT_PV.get() + 1)
except TypeError as e:
print(e)
time.sleep(1)


cavities: List[SELCavity] = list(SEL_MACHINE.all_iterator)


def run():
while True:
num_large_steps = 0

for cavity in cavities:
try:
num_large_steps += 1 if cavity.straighten_iq_plot() >= MAX_STEP else 0
HEARTBEAT_PV.put(HEARTBEAT_PV.get() + 1)
except (PVInvalidError, TypeError) as e:
cavity.logger.error(e)

if num_large_steps > 5:
print(
f"\033[91mPhase change limited to 5 deg {num_large_steps} times."
f" Re-running program.\033[0m"
)
update_heartbeat(5)
else:
timi = time.localtime()
current_time = time.strftime("%m/%d/%y %H:%M:%S", timi)
print(
f"\033[94mThanks for your help! The current date/time is {current_time}\033[0m"
)

update_heartbeat(600)


if __name__ == "__main__":
HEARTBEAT_PV.put(0)
run()
Empty file.
117 changes: 117 additions & 0 deletions tests/applications/sel_phase_optimizer/test_sel_phase_linac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from random import randint, choice
from unittest.mock import MagicMock

import pytest
from lcls_tools.common.controls.pyepics.utils import make_mock_pv

from applications.sel_phase_optimizer.sel_phase_linac import SEL_MACHINE, SELCavity
from utils.sc_linac.linac_utils import (
HW_MODE_ONLINE_VALUE,
RF_MODE_SELAP,
HW_MODE_MAINTENANCE_VALUE,
HW_MODE_OFFLINE_VALUE,
HW_MODE_MAIN_DONE_VALUE,
HW_MODE_READY_VALUE,
RF_MODE_SELA,
RF_MODE_SEL,
RF_MODE_SEL_RAW,
RF_MODE_PULSE,
RF_MODE_CHIRP,
)

non_hl_iterator = SEL_MACHINE.non_hl_iterator


@pytest.fixture
def cavity() -> SELCavity:
yield next(non_hl_iterator)


def test_sel_poff_pv(cavity):
cavity._sel_poff_pv = make_mock_pv()
assert cavity.sel_poff_pv == cavity._sel_poff_pv


def test_sel_phase_offset(cavity):
offset = randint(0, 100)
cavity._sel_poff_pv = make_mock_pv(get_val=offset)
assert cavity.sel_phase_offset == offset


def test_i_waveform(cavity):
wf = [i for i in range(randint(1, 100))]
cavity._i_waveform_pv = make_mock_pv(get_val=wf)
assert cavity.i_waveform == wf


def test_q_waveform(cavity):
wf = [i for i in range(randint(1, 100))]
cavity._q_waveform_pv = make_mock_pv(get_val=wf)
assert cavity.q_waveform == wf


def test_can_be_straightened(cavity):
cavity._hw_mode_pv_obj = make_mock_pv(get_val=HW_MODE_ONLINE_VALUE)
cavity._rf_state_pv_obj = make_mock_pv(get_val=1)
cavity._rf_mode_pv_obj = make_mock_pv(get_val=RF_MODE_SELAP)
cavity._aact_pv_obj = make_mock_pv(get_val=randint(5, 21))
assert cavity.can_be_straightened()


def test_cannot_be_straightened_hw_mode(cavity):
cavity._hw_mode_pv_obj = make_mock_pv(
get_val=choice(
[
HW_MODE_MAINTENANCE_VALUE,
HW_MODE_OFFLINE_VALUE,
HW_MODE_MAIN_DONE_VALUE,
HW_MODE_READY_VALUE,
]
)
)
cavity._rf_state_pv_obj = make_mock_pv(get_val=1)
cavity._rf_mode_pv_obj = make_mock_pv(get_val=RF_MODE_SELAP)
cavity._aact_pv_obj = make_mock_pv(get_val=randint(5, 21))
assert not cavity.can_be_straightened()


def test_cannot_be_straightened_rf_state(cavity):
cavity._hw_mode_pv_obj = make_mock_pv(get_val=HW_MODE_ONLINE_VALUE)
cavity._rf_state_pv_obj = make_mock_pv(get_val=0)
cavity._rf_mode_pv_obj = make_mock_pv(get_val=RF_MODE_SELAP)
cavity._aact_pv_obj = make_mock_pv(get_val=randint(5, 21))
assert not cavity.can_be_straightened()


def test_cannot_be_straightened_rf_mode(cavity):
cavity._hw_mode_pv_obj = make_mock_pv(get_val=HW_MODE_ONLINE_VALUE)
cavity._rf_state_pv_obj = make_mock_pv(get_val=1)
cavity._rf_mode_pv_obj = make_mock_pv(
get_val=choice(
[RF_MODE_SELA, RF_MODE_SEL, RF_MODE_SEL_RAW, RF_MODE_PULSE, RF_MODE_CHIRP]
)
)
cavity._aact_pv_obj = make_mock_pv(get_val=randint(5, 21))
assert not cavity.can_be_straightened()


def test_cannot_be_straightened_aact(cavity):
cavity._hw_mode_pv_obj = make_mock_pv(get_val=HW_MODE_ONLINE_VALUE)
cavity._rf_state_pv_obj = make_mock_pv(get_val=1)
cavity._rf_mode_pv_obj = make_mock_pv(get_val=RF_MODE_SELAP)
cavity._aact_pv_obj = make_mock_pv(get_val=1)
assert not cavity.can_be_straightened()


def test_should_not_straighten_iq_plot(cavity):
cavity.can_be_straightened = MagicMock(return_value=False)
assert cavity.straighten_iq_plot() == 0


def test_straighten_iq_plot(cavity):
cavity.can_be_straightened = MagicMock(return_value=True)
wf = [i for i in range(1, randint(2, 100))]
cavity._i_waveform_pv = make_mock_pv(get_val=wf)
cavity._q_waveform_pv = make_mock_pv(get_val=wf)
cavity._sel_poff_pv = make_mock_pv(get_val=randint(0, 360))
assert cavity.straighten_iq_plot() != 0
Loading