Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
865 changes: 633 additions & 232 deletions unilabos/device_comms/opcua_client/client.py

Large diffs are not rendered by default.

176 changes: 150 additions & 26 deletions unilabos/device_comms/opcua_client/node/uniopcua.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from typing import Tuple, Union, Optional, Any, List

from opcua import Client, Node
from opcua import Client, Node, ua
from opcua.ua import NodeId, NodeClass, VariantType


Expand Down Expand Up @@ -47,23 +47,68 @@ def __init__(self, client: Client, name: str, node_id: str, typ: NodeType, data_
def _get_node(self) -> Node:
if self._node is None:
try:
# 检查是否是NumericNodeId(ns=X;i=Y)格式
if "NumericNodeId" in self._node_id:
# 从字符串中提取命名空间和标识符
import re
match = re.search(r'ns=(\d+);i=(\d+)', self._node_id)
if match:
ns = int(match.group(1))
identifier = int(match.group(2))
node_id = NodeId(identifier, ns)
self._node = self._client.get_node(node_id)
# 尝试多种 NodeId 字符串格式解析,兼容不同服务器/库的输出
# 可能的格式示例: 'ns=2;i=1234', 'ns=2;s=SomeString',
# 'StringNodeId(ns=4;s=OPC|变量名)', 'NumericNodeId(ns=2;i=1234)' 等
import re

nid = self._node_id
# 如果已经是 NodeId/Node 对象(库用户可能传入),直接使用
try:
from opcua.ua import NodeId as UaNodeId
if isinstance(nid, UaNodeId):
self._node = self._client.get_node(nid)
return self._node
except Exception:
# 若导入或类型判断失败,则继续下一步
pass

# 直接以字符串形式处理
if isinstance(nid, str):
nid = nid.strip()

# 处理包含类名的格式,如 'StringNodeId(ns=4;s=...)' 或 'NumericNodeId(ns=2;i=...)'
# 提取括号内的内容
match_wrapped = re.match(r'(String|Numeric|Byte|Guid|TwoByteNode|FourByteNode)NodeId\((.*)\)', nid)
if match_wrapped:
# 提取括号内的实际 node_id 字符串
nid = match_wrapped.group(2).strip()

# 常见短格式 'ns=2;i=1234' 或 'ns=2;s=SomeString'
if re.match(r'^ns=\d+;[is]=', nid):
self._node = self._client.get_node(nid)
else:
raise ValueError(f"无法解析节点ID: {self._node_id}")
# 尝试提取 ns 和 i 或 s
# 对于字符串标识符,可能包含特殊字符,使用非贪婪匹配
m_num = re.search(r'ns=(\d+);i=(\d+)', nid)
m_str = re.search(r'ns=(\d+);s=(.+?)(?:\)|$)', nid)
if m_num:
ns = int(m_num.group(1))
identifier = int(m_num.group(2))
node_id = NodeId(identifier, ns)
self._node = self._client.get_node(node_id)
elif m_str:
ns = int(m_str.group(1))
identifier = m_str.group(2).strip()
# 对于字符串标识符,直接使用字符串格式
node_id_str = f"ns={ns};s={identifier}"
self._node = self._client.get_node(node_id_str)
else:
# 回退:尝试直接传入字符串(有些实现接受其它格式)
try:
self._node = self._client.get_node(self._node_id)
except Exception as e:
# 输出更详细的错误信息供调试
print(f"获取节点失败(尝试直接字符串): {self._node_id}, 错误: {e}")
raise
else:
# 直接使用节点ID字符串
# 非字符串,尝试直接使用
self._node = self._client.get_node(self._node_id)
except Exception as e:
print(f"获取节点失败: {self._node_id}, 错误: {e}")
# 添加额外提示,帮助定位 BadNodeIdUnknown 问题
print("提示: 请确认该 node_id 是否来自当前连接的服务器地址空间," \
"以及 CSV/配置中名称与服务器 BrowseName 是否匹配。")
raise
return self._node

Expand Down Expand Up @@ -104,7 +149,56 @@ def read(self) -> Tuple[Any, bool]:

def write(self, value: Any) -> bool:
try:
self._get_node().set_value(value)
# 如果声明了数据类型,则尝试转换并使用对应的 Variant 写入
coerced = value
try:
if self._data_type is not None:
# 基于声明的数据类型做简单类型转换
dt = self._data_type
if dt in (DataType.SBYTE, DataType.BYTE, DataType.INT16, DataType.UINT16,
DataType.INT32, DataType.UINT32, DataType.INT64, DataType.UINT64):
# 数值类型 -> int
if isinstance(value, str):
coerced = int(value)
else:
coerced = int(value)
elif dt in (DataType.FLOAT, DataType.DOUBLE):
if isinstance(value, str):
coerced = float(value)
else:
coerced = float(value)
elif dt == DataType.BOOLEAN:
if isinstance(value, str):
v = value.strip().lower()
if v in ("true", "1", "yes", "on"):
coerced = True
elif v in ("false", "0", "no", "off"):
coerced = False
else:
coerced = bool(value)
else:
coerced = bool(value)
elif dt == DataType.STRING or dt == DataType.BYTESTRING or dt == DataType.DATETIME:
coerced = str(value)

# 使用 ua.Variant 明确指定 VariantType
try:
variant = ua.Variant(coerced, dt.value)
self._get_node().set_value(variant)
except Exception:
# 回退:有些 set_value 实现接受 (value, variant_type)
try:
self._get_node().set_value(coerced, dt.value)
except Exception:
# 最后回退到直接写入(保持兼容性)
self._get_node().set_value(coerced)
else:
# 未声明数据类型,直接写入
self._get_node().set_value(value)
except Exception:
# 若在转换或按数据类型写入失败,尝试直接写入原始值并让上层捕获错误
self._get_node().set_value(value)

return False
except Exception as e:
print(f"写入变量 {self._name} 失败: {e}")
Expand All @@ -120,20 +214,50 @@ def __init__(self, client: Client, name: str, node_id: str, parent_node_id: str,
def _get_parent_node(self) -> Node:
if self._parent_node is None:
try:
# 检查是否是NumericNodeId(ns=X;i=Y)格式
if "NumericNodeId" in self._parent_node_id:
# 从字符串中提取命名空间和标识符
import re
match = re.search(r'ns=(\d+);i=(\d+)', self._parent_node_id)
if match:
ns = int(match.group(1))
identifier = int(match.group(2))
node_id = NodeId(identifier, ns)
self._parent_node = self._client.get_node(node_id)
# 处理父节点ID,使用与_get_node相同的解析逻辑
import re

nid = self._parent_node_id

# 如果已经是 NodeId 对象,直接使用
try:
from opcua.ua import NodeId as UaNodeId
if isinstance(nid, UaNodeId):
self._parent_node = self._client.get_node(nid)
return self._parent_node
except Exception:
pass

# 字符串处理
if isinstance(nid, str):
nid = nid.strip()

# 处理包含类名的格式
match_wrapped = re.match(r'(String|Numeric|Byte|Guid|TwoByteNode|FourByteNode)NodeId\((.*)\)', nid)
if match_wrapped:
nid = match_wrapped.group(2).strip()

# 常见短格式
if re.match(r'^ns=\d+;[is]=', nid):
self._parent_node = self._client.get_node(nid)
else:
raise ValueError(f"无法解析父节点ID: {self._parent_node_id}")
# 提取 ns 和 i 或 s
m_num = re.search(r'ns=(\d+);i=(\d+)', nid)
m_str = re.search(r'ns=(\d+);s=(.+?)(?:\)|$)', nid)
if m_num:
ns = int(m_num.group(1))
identifier = int(m_num.group(2))
node_id = NodeId(identifier, ns)
self._parent_node = self._client.get_node(node_id)
elif m_str:
ns = int(m_str.group(1))
identifier = m_str.group(2).strip()
node_id_str = f"ns={ns};s={identifier}"
self._parent_node = self._client.get_node(node_id_str)
else:
# 回退
self._parent_node = self._client.get_node(self._parent_node_id)
else:
# 直接使用节点ID字符串
self._parent_node = self._client.get_node(self._parent_node_id)
except Exception as e:
print(f"获取父节点失败: {self._parent_node_id}, 错误: {e}")
Expand Down
Empty file.
93 changes: 93 additions & 0 deletions unilabos/devices/workstation/post_process/bottle_carriers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from pylabrobot.resources import create_homogeneous_resources, Coordinate, ResourceHolder, create_ordered_items_2d

from unilabos.resources.itemized_carrier import BottleCarrier
from unilabos.devices.workstation.post_process.bottles import POST_PROCESS_PolymerStation_Reagent_Bottle

# 命名约定:试剂瓶-Bottle,烧杯-Beaker,烧瓶-Flask,小瓶-Vial


# ============================================================================
# 聚合站(PolymerStation)载体定义(统一入口)
# ============================================================================

def POST_PROCESS_Raw_1BottleCarrier(name: str) -> BottleCarrier:
"""聚合站-单试剂瓶载架

参数:
- name: 载架名称前缀
"""

# 载架尺寸 (mm)
carrier_size_x = 127.8
carrier_size_y = 85.5
carrier_size_z = 20.0

# 烧杯/试剂瓶占位尺寸(使用圆形占位)
beaker_diameter = 60.0

# 计算中央位置
center_x = (carrier_size_x - beaker_diameter) / 2
center_y = (carrier_size_y - beaker_diameter) / 2
center_z = 5.0

carrier = BottleCarrier(
name=name,
size_x=carrier_size_x,
size_y=carrier_size_y,
size_z=carrier_size_z,
sites=create_homogeneous_resources(
klass=ResourceHolder,
locations=[Coordinate(center_x, center_y, center_z)],
resource_size_x=beaker_diameter,
resource_size_y=beaker_diameter,
name_prefix=name,
),
model="POST_PROCESS_Raw_1BottleCarrier",
)
carrier.num_items_x = 1
carrier.num_items_y = 1
carrier.num_items_z = 1
# 统一后缀采用 "flask_1" 命名(可按需调整)
carrier[0] = POST_PROCESS_PolymerStation_Reagent_Bottle(f"{name}_flask_1")
return carrier

def POST_PROCESS_Reaction_1BottleCarrier(name: str) -> BottleCarrier:
"""聚合站-单试剂瓶载架

参数:
- name: 载架名称前缀
"""

# 载架尺寸 (mm)
carrier_size_x = 127.8
carrier_size_y = 85.5
carrier_size_z = 20.0

# 烧杯/试剂瓶占位尺寸(使用圆形占位)
beaker_diameter = 60.0

# 计算中央位置
center_x = (carrier_size_x - beaker_diameter) / 2
center_y = (carrier_size_y - beaker_diameter) / 2
center_z = 5.0

carrier = BottleCarrier(
name=name,
size_x=carrier_size_x,
size_y=carrier_size_y,
size_z=carrier_size_z,
sites=create_homogeneous_resources(
klass=ResourceHolder,
locations=[Coordinate(center_x, center_y, center_z)],
resource_size_x=beaker_diameter,
resource_size_y=beaker_diameter,
name_prefix=name,
),
model="POST_PROCESS_Reaction_1BottleCarrier",
)
carrier.num_items_x = 1
carrier.num_items_y = 1
carrier.num_items_z = 1
# 统一后缀采用 "flask_1" 命名(可按需调整)
carrier[0] = POST_PROCESS_PolymerStation_Reagent_Bottle(f"{name}_flask_1")
return carrier
20 changes: 20 additions & 0 deletions unilabos/devices/workstation/post_process/bottles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from unilabos.resources.itemized_carrier import Bottle


def POST_PROCESS_PolymerStation_Reagent_Bottle(
name: str,
diameter: float = 70.0,
height: float = 120.0,
max_volume: float = 500000.0, # 500mL
barcode: str = None,
) -> Bottle:
"""创建试剂瓶"""
return Bottle(
name=name,
diameter=diameter,
height=height,
max_volume=max_volume,
barcode=barcode,
model="POST_PROCESS_PolymerStation_Reagent_Bottle",
)

46 changes: 46 additions & 0 deletions unilabos/devices/workstation/post_process/decks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from os import name
from pylabrobot.resources import Deck, Coordinate, Rotation

from unilabos.devices.workstation.post_process.warehouses import (
post_process_warehouse_4x3x1,
post_process_warehouse_4x3x1_2,
)



class post_process_deck(Deck):
def __init__(
self,
name: str = "post_process_deck",
size_x: float = 2000.0,
size_y: float = 1000.0,
size_z: float = 2670.0,
category: str = "deck",
setup: bool = True,
) -> None:
super().__init__(name=name, size_x=1700.0, size_y=1350.0, size_z=2670.0)
if setup:
self.setup()

def setup(self) -> None:
# 添加仓库
self.warehouses = {
"原料罐堆栈": post_process_warehouse_4x3x1("原料罐堆栈"),
"反应罐堆栈": post_process_warehouse_4x3x1_2("反应罐堆栈"),

}
# warehouse 的位置
self.warehouse_locations = {
"原料罐堆栈": Coordinate(350.0, 55.0, 0.0),
"反应罐堆栈": Coordinate(1000.0, 55.0, 0.0),

}

for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])






Loading
Loading