Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 282 additions & 0 deletions unilabos/devices/ChinWe/chinwe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
import sys
import threading
import serial
import serial.tools.list_ports
import re
import time
from typing import Optional, List, Dict, Tuple

class ChinweDevice:
"""
ChinWe设备控制类
提供串口通信、电机控制、传感器数据读取等功能
"""

def __init__(self, port: str, baudrate: int = 115200, debug: bool = False):
"""
初始化ChinWe设备

Args:
port: 串口名称,如果为None则自动检测
baudrate: 波特率,默认115200
"""
self.debug = debug
self.port = port
self.baudrate = baudrate
self.serial_port: Optional[serial.Serial] = None
self._voltage: float = 0.0
self._ec_value: float = 0.0
self._ec_adc_value: int = 0
self._is_connected = False
self.connect()

@property
def is_connected(self) -> bool:
"""获取连接状态"""
return self._is_connected and self.serial_port and self.serial_port.is_open

@property
def voltage(self) -> float:
"""获取电源电压值"""
return self._voltage

@property
def ec_value(self) -> float:
"""获取电导率值 (ms/cm)"""
return self._ec_value

@property
def ec_adc_value(self) -> int:
"""获取EC ADC原始值"""
return self._ec_adc_value


@property
def device_status(self) -> Dict[str, any]:
"""
获取设备状态信息

Returns:
包含设备状态的字典
"""
return {
"connected": self.is_connected,
"port": self.port,
"baudrate": self.baudrate,
"voltage": self.voltage,
"ec_value": self.ec_value,
"ec_adc_value": self.ec_adc_value
}

def connect(self, port: Optional[str] = None, baudrate: Optional[int] = None) -> bool:
"""
连接到串口设备

Args:
port: 串口名称,如果为None则使用初始化时的port或自动检测
baudrate: 波特率,如果为None则使用初始化时的baudrate

Returns:
连接是否成功
"""
if self.is_connected:
return True

target_port = port or self.port
target_baudrate = baudrate or self.baudrate

try:
self.serial_port = serial.Serial(target_port, target_baudrate, timeout=0.5)
self._is_connected = True
self.port = target_port
self.baudrate = target_baudrate
connect_allow_times = 5
while not self.serial_port.is_open and connect_allow_times > 0:
time.sleep(0.5)
connect_allow_times -= 1
print(f"尝试连接到 {target_port} @ {target_baudrate},剩余尝试次数: {connect_allow_times}", self.debug)
raise ValueError("串口未打开,请检查设备连接")
print(f"已连接到 {target_port} @ {target_baudrate}", self.debug)
threading.Thread(target=self._read_data, daemon=True).start()
return True
except Exception as e:
print(f"ChinweDevice连接失败: {e}")
self._is_connected = False
return False

def disconnect(self) -> bool:
"""
断开串口连接

Returns:
断开是否成功
"""
if self.serial_port and self.serial_port.is_open:
try:
self.serial_port.close()
self._is_connected = False
print("已断开串口连接")
return True
except Exception as e:
print(f"断开连接失败: {e}")
return False
return True

def _send_motor_command(self, command: str) -> bool:
"""
发送电机控制命令

Args:
command: 电机命令字符串,例如 "M 1 CW 1.5"

Returns:
发送是否成功
"""
if not self.is_connected:
print("设备未连接")
return False

try:
self.serial_port.write((command + "\n").encode('utf-8'))
print(f"发送命令: {command}")
return True
except Exception as e:
print(f"发送命令失败: {e}")
return False

def rotate_motor(self, motor_id: int, turns: float, clockwise: bool = True) -> bool:
"""
使电机转动指定圈数

Args:
motor_id: 电机ID(1, 2, 3...)
turns: 转动圈数,支持小数
clockwise: True为顺时针,False为逆时针

Returns:
命令发送是否成功
"""
if clockwise:
command = f"M {motor_id} CW {turns}"
else:
command = f"M {motor_id} CCW {turns}"
return self._send_motor_command(command)

def set_motor_speed(self, motor_id: int, speed: float) -> bool:
"""
设置电机转速(如果设备支持)

Args:
motor_id: 电机ID(1, 2, 3...)
speed: 转速值

Returns:
命令发送是否成功
"""
command = f"M {motor_id} SPEED {speed}"
return self._send_motor_command(command)

def _read_data(self) -> List[str]:
"""
读取串口数据并解析

Returns:
读取到的数据行列表
"""
print("开始读取串口数据...")
if not self.is_connected:
return []

data_lines = []
try:
while self.serial_port.in_waiting:
time.sleep(0.1) # 等待数据稳定
try:
line = self.serial_port.readline().decode('utf-8', errors='ignore').strip()
if line:
data_lines.append(line)
self._parse_sensor_data(line)
except Exception as ex:
print(f"解码数据错误: {ex}")
except Exception as e:
print(f"读取串口数据错误: {e}")

return data_lines

def _parse_sensor_data(self, line: str) -> None:
"""
解析传感器数据

Args:
line: 接收到的数据行
"""
# 解析电源电压
if "电源电压" in line:
try:
val = float(line.split(":")[1].replace("V", "").strip())
self._voltage = val
if self.debug:
print(f"电源电压更新: {val}V")
except Exception:
pass

# 解析电导率和ADC原始值(支持两种格式)
if "电导率" in line and "ADC原始值" in line:
try:
# 支持格式如:电导率:2.50ms/cm, ADC原始值:2052
ec_match = re.search(r"电导率[::]\s*([\d\.]+)", line)
adc_match = re.search(r"ADC原始值[::]\s*(\d+)", line)
if ec_match:
ec_val = float(ec_match.group(1))
self._ec_value = ec_val
if self.debug:
print(f"电导率更新: {ec_val:.2f} ms/cm")
if adc_match:
adc_val = int(adc_match.group(1))
self._ec_adc_value = adc_val
if self.debug:
print(f"EC ADC原始值更新: {adc_val}")
except Exception:
pass
# 仅电导率,无ADC原始值
elif "电导率" in line:
try:
val = float(line.split(":")[1].replace("ms/cm", "").strip())
self._ec_value = val
if self.debug:
print(f"电导率更新: {val:.2f} ms/cm")
except Exception:
pass
# 仅ADC原始值(如有分开回传场景)
elif "ADC原始值" in line:
try:
adc_val = int(line.split(":")[1].strip())
self._ec_adc_value = adc_val
if self.debug:
print(f"EC ADC原始值更新: {adc_val}")
except Exception:
pass

def spin_when_ec_ge_0():
pass


def main():
"""测试函数"""
print("=== ChinWe设备测试 ===")

# 创建设备实例
device = ChinweDevice("/dev/tty.usbserial-A5069RR4", debug=True)
try:
# 测试5: 发送电机命令
print("\n5. 发送电机命令测试:")
print(" 5.3 使用通用函数控制电机20顺时针转2圈:")
device.rotate_motor(2, 20.0, clockwise=True)
time.sleep(0.5)
finally:
time.sleep(10)
# 测试7: 断开连接
print("\n7. 断开连接:")
device.disconnect()
if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions unilabos/devices/pump_and_valve/runze_backbone.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from enum import Enum
from dataclasses import dataclass
import time
import traceback
from typing import Any, Union, Optional, overload

import serial.tools.list_ports
Expand Down Expand Up @@ -386,3 +387,8 @@ def close(self):
def list():
for item in serial.tools.list_ports.comports():
yield RunzeSyringePumpInfo(port=item.device)


if __name__ == "__main__":
r = RunzeSyringePump("/dev/tty.usbserial-D30JUGG5", "1", 25.0)
r.initialize()
2 changes: 2 additions & 0 deletions unilabos/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ def load_device_types(self, path: os.PathLike, complete_registry: bool):
if complete_registry:
device_config["class"]["status_types"].clear()
enhanced_info = get_enhanced_class_info(device_config["class"]["module"], use_dynamic=True)
if not enhanced_info.get("dynamic_import_success", False):
continue
device_config["class"]["status_types"].update(
{k: v["return_type"] for k, v in enhanced_info["status_methods"].items()}
)
Expand Down
3 changes: 3 additions & 0 deletions unilabos/ros/nodes/presets/protocol_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def __init__(
if device_config.get("type", "device") != "device":
continue
# 设置硬件接口代理
if device_id not in self.sub_devices:
self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...")
continue
d = self.sub_devices[device_id]
if d:
hardware_interface = d.ros_node_instance._hardware_interface
Expand Down
Loading