Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyctiarbin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .cycler_interface import CyclerInterface
from .channel_interface import ChannelInterface
from .messages import Msg
from .messages import MessageABC
from .messages import MessageABC
2 changes: 1 addition & 1 deletion pyctiarbin/arbinspoofer/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .arbin_spoofer import ArbinSpoofer
from .arbin_spoofer import ArbinSpoofer
9 changes: 7 additions & 2 deletions pyctiarbin/arbinspoofer/arbin_spoofer.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ def __process_client_msg(self, rx_msg):

if cmd_code == Msg.Login.Client.command_code:
rx_msg_dict = Msg.Login.Client.unpack(rx_msg)
tx_msg = Msg.Login.Server.pack({'num_channels':self.__channel_data.num_channels})
tx_msg = Msg.Login.Server.pack(
{'num_channels': self.__channel_data.num_channels})
elif cmd_code == Msg.ChannelInfo.Client.command_code:
rx_msg_dict = Msg.ChannelInfo.Client.unpack(rx_msg)
channel_values = self.__channel_data.fetch_channel_readings(
Expand All @@ -191,6 +192,10 @@ def __process_client_msg(self, rx_msg):
rx_msg_dict = Msg.StopSchedule.Client.unpack(rx_msg)
tx_msg = Msg.StopSchedule.Server.pack(
{'channel': rx_msg_dict['channel']})
elif cmd_code == Msg.JumpChannel.Client.command_code:
rx_msg_dict = Msg.JumpChannel.Client.unpack(rx_msg)
tx_msg = Msg.JumpChannel.Server.pack(
{'channel': rx_msg_dict['channel']})
elif cmd_code == Msg.SetMetaVariable.Client.command_code:
rx_msg_dict = Msg.SetMetaVariable.Client.unpack(rx_msg)
tx_msg = Msg.SetMetaVariable.Server.pack(
Expand Down Expand Up @@ -325,4 +330,4 @@ def stop(self):
self.__server_thread.join()

def __del__(self):
self.stop()
self.stop()
37 changes: 36 additions & 1 deletion pyctiarbin/channel_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,40 @@ def stop_test(self) -> bool:

return success

def jump_channel(self, step_num: int) -> bool:
"""
Jumps the test on the channel specified in the config to the passed step number `step_num`.
If `step_num` is greater than the number of steps in the schedule, test will stop.

Parameters
----------
step_num: int
Step number in the schedule to jump the test to.
Returns
-------
success: bool
True/False based on whether the test successfully jumped step.
"""
success = False

updated_msg_vals = {}
updated_msg_vals['channel'] = self.__config.channel
updated_msg_vals['step_num'] = step_num

jump_channel_tx_bin = Msg.JumpChannel.Client.pack(updated_msg_vals)
response_msg_bin = self._send_receive_msg(jump_channel_tx_bin)

if response_msg_bin:
jump_channel_msg_rx_dict = Msg.JumpChannel.Server.unpack(response_msg_bin)
if jump_channel_msg_rx_dict["response"] == "success":
success = True
logger.info(f"Successfully jumped channel to step {step_num}")
logger.debug(jump_channel_msg_rx_dict)
else:
logger.error(f"Failed to jump channel to step {step_num}! Issue: {jump_channel_msg_rx_dict['result']}")

return success

def set_meta_variable(self, mv_num: int, mv_value: float) -> bool:
"""
Sets the passed meta variable number `mv_num` to the passed value `mv_value`
Expand Down Expand Up @@ -197,6 +231,7 @@ def set_meta_variable(self, mv_num: int, mv_value: float) -> bool:

return success


class ChannelInterfaceConfig(BaseModel):
'''
Holds channel config information for the CyclerInterface class.
Expand Down Expand Up @@ -231,4 +266,4 @@ class ChannelInterfaceConfig(BaseModel):
def username_alphanumeric(cls, v):
if v < 1:
raise ValueError('Channel must be greater than zero!')
return v-1
return v-1
93 changes: 91 additions & 2 deletions pyctiarbin/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def unpack(cls, msg_bin: bytearray) -> dict:

# Create a template to unpack message with
template = {**deepcopy(cls.base_template),
**deepcopy(cls.msg_specific_template)}
**deepcopy(cls.msg_specific_template)}

for item_name, item in template.items():
start_idx = item['start_byte']
Expand Down Expand Up @@ -111,7 +111,7 @@ def pack(cls, msg_values={}) -> bytearray:
"""
# Create a template to build messages from
template = {**deepcopy(cls.base_template),
**deepcopy(cls.msg_specific_template)}
**deepcopy(cls.msg_specific_template)}

# Update the template with message specific length and command code
template['msg_length']['value'] = cls.msg_length
Expand Down Expand Up @@ -1083,6 +1083,95 @@ def unpack(cls, msg_bin: bytearray) -> dict:
ord(msg_dict['result'])]
return msg_dict

class JumpChannel:
'''
Message for jumping channel to a different step in its schedule file.
THIRD_PARTY_JUMP_CHANNEL/THIRD_PARTY_JUMP_CHANNEL_FEEDBACK
in Arbin docs for more info.
'''
class Client(MessageABC):
msg_length = 119
command_code = 0xBB320005

msg_specific_template = {
'step_num': {
'format': '<L',
'start_byte': 20,
'value': 0,
},
'channel': {
'format': '<L',
'start_byte': 24,
'value': 0
},
'reserved': {
'format': '101s',
'start_byte': 28,
'value': ''.join(['\0' for i in range(101)]),
'text_encoding': 'utf-8',
},
}

class Server(MessageABC):
msg_length = 128
command_code = 0xBB230005

msg_specific_template = {
'channel': {
'format': '<I',
'start_byte': 20,
'value': 0
},
'result': {
'format': 'c',
'start_byte': 24,
'value': '\0',
'text_encoding': 'utf-8',
},
'reserved': {
'format': '101s',
'start_byte': 25,
'value': ''.join(['\0' for i in range(101)]),
'text_encoding': 'utf-8',
},
}

jump_channel_feedback_codes = {
0: 'success',
17: 'Someone else is using the monitor window at the moment',
18: 'Channel not running',
19: 'Channel not connected to DAQ',
20: 'Invalid Schedule',
21: 'No schedule assigned',
22: 'Invalid schedule version',
25: 'Schedule cannot contain over 200 steps',
33: 'DAQ still downloading schedule',
36: 'Schedule contains invalid step limit setting',
37: 'Invalid parallel setting',
38: 'Schedule safety check not safe',
}

@classmethod
def unpack(cls, msg_bin: bytearray) -> dict:
"""
Same as the parent method, but converts the result based on the
jump_channel_feedback_codes.

Parameters
----------
msg_bin : bytearray
The message to unpack.

Returns
-------
msg_dict : dict
The message with items decoded into a dictionary
"""
msg_dict = super().unpack(msg_bin)
msg_dict['result'] = cls.jump_channel_feedback_codes[
ord(msg_dict['result'])]
return msg_dict

class SetMetaVariable:
'''
Message for setting meta variables.
Expand Down
Binary file added tests/example_messages/client_jump_channel_msg.bin
Binary file not shown.
9 changes: 9 additions & 0 deletions tests/example_messages/client_jump_channel_msg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"header": 1287429013477645789,
"msg_length": 119,
"command_code": 3140616197,
"extended_command_code": 0,
"step_num": 5,
"channel": 13,
"reserved": ""
}
Binary file added tests/example_messages/server_jump_channel_msg.bin
Binary file not shown.
9 changes: 9 additions & 0 deletions tests/example_messages/server_jump_channel_msg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"header": 1287429013477645789,
"msg_length": 128,
"command_code": 3139633157,
"extended_command_code": 0,
"channel": 13,
"result": "success",
"reserved": ""
}
2 changes: 2 additions & 0 deletions tests/test_arbinspoofer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def test_messages():
Msg.StartSchedule.Server.pack()),
(Msg.StopSchedule.Client.pack(),
Msg.StopSchedule.Server.pack()),
(Msg.JumpChannel.Client.pack(),
Msg.JumpChannel.Server.pack()),
(Msg.SetMetaVariable.Client.pack(),
Msg.SetMetaVariable.Server.pack()),
]
Expand Down
46 changes: 46 additions & 0 deletions tests/test_jump_channel_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
import os
import copy
from pyctiarbin import Msg
from helper_test_utils import message_file_loader

MSG_DIR = os.path.join(os.path.dirname(__file__), 'example_messages')


@pytest.mark.messages
def test_jump_channel_client_message():
'''
Test packing/parsing a client jump channel request message
'''

example_msg_name = 'client_jump_channel_msg'
(msg_bin, msg_dict) = message_file_loader(MSG_DIR, example_msg_name)

# Pack the msg_dict and check if it matches the example binary message
packed_msg = Msg.JumpChannel.Client.pack(msg_dict)
assert (packed_msg == msg_bin)

# Checking parsing the example message binary and that it matches the msg_dict
parsed_msg = Msg.JumpChannel.Client.unpack(msg_bin)
assert (parsed_msg == msg_dict)


@pytest.mark.messages
def test_jump_channel_server_message():
'''
Test parsing/packing a server jump channel response message
'''
example_msg_name = 'server_jump_channel_msg'
(msg_bin, msg_dict) = message_file_loader(MSG_DIR, example_msg_name)

# Check that the parsed binary message matches the msg_dict
parsed_msg = Msg.JumpChannel.Server.unpack(msg_bin)
assert (parsed_msg == msg_dict)

# Check packing own version of message from msg_dict
buildable_msg_dict = copy.deepcopy(msg_dict)
# Need to re-code this from login_result_decoder
buildable_msg_dict['result'] = '\0'
packed_msg = Msg.JumpChannel.Server.pack(buildable_msg_dict)
parsed_msg = Msg.JumpChannel.Server.unpack(packed_msg)
assert (parsed_msg == msg_dict)