Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ __pycache__/
.vscode
*.py[cod]
*$py.class
service

# C extensions
*.so
Expand Down
4 changes: 2 additions & 2 deletions unilabos/app/controler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ def job_add(req: JobAddReq) -> JobData:
action_kwargs = {"command": json.dumps(action_kwargs)}
elif "command" in action_kwargs:
action_kwargs = action_kwargs["command"]
print(f"job_add:{req.device_id} {action_name} {action_kwargs}")
HostNode.get_instance().send_goal(req.device_id, action_name=action_name, action_kwargs=action_kwargs, goal_uuid=req.job_id)
# print(f"job_add:{req.device_id} {action_name} {action_kwargs}")
HostNode.get_instance().send_goal(req.device_id, action_name=action_name, action_kwargs=action_kwargs, goal_uuid=req.job_id, server_info=req.server_info)
return JobData(jobId=req.job_id)
5 changes: 3 additions & 2 deletions unilabos/app/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ class Resp(BaseModel):
class JobAddReq(BaseModel):
device_id: str = Field(examples=["Gripper"], description="device id")
data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}])
job_id: str = Field(examples=["sfsfsfeq"], description="goal uuid")
node_id: str = Field(examples=["sfsfsfeq"], description="node uuid")
job_id: str = Field(examples=["job_id"], description="goal uuid")
node_id: str = Field(examples=["node_id"], description="node uuid")
server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info")


class JobStepFinishReq(BaseModel):
Expand Down
39 changes: 33 additions & 6 deletions unilabos/app/mq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import os

from unilabos.config.config import MQConfig
from unilabos.app.controler import devices, job_add
from unilabos.app.controler import job_add
from unilabos.app.model import JobAddReq
from unilabos.utils import logger
from unilabos.utils.type_check import TypeEncoder
Expand Down Expand Up @@ -43,13 +43,10 @@ def _on_log(self, client, userdata, level, buf):
def _on_connect(self, client, userdata, flags, rc, properties=None):
logger.info("[MQTT] Connected with result code " + str(rc))
client.subscribe(f"labs/{MQConfig.lab_id}/job/start/", 0)
isok, data = devices()
if not isok:
logger.error("[MQTT] on_connect ErrorHostNotInit")
return
client.subscribe(f"labs/{MQConfig.lab_id}/pong/", 0)

def _on_message(self, client, userdata, msg) -> None:
logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload))
# logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload))
try:
payload_str = msg.payload.decode("utf-8")
payload_json = json.loads(payload_str)
Expand All @@ -63,6 +60,14 @@ def _on_message(self, client, userdata, msg) -> None:
job_req = JobAddReq.model_validate(payload_json)
data = job_add(job_req)
return
elif msg.topic == f"labs/{MQConfig.lab_id}/pong/":
# 处理pong响应,通知HostNode
from unilabos.ros.nodes.presets.host_node import HostNode

host_instance = HostNode.get_instance(0)
if host_instance:
host_instance.handle_pong_response(payload_json)
return

except json.JSONDecodeError as e:
logger.error(f"[MQTT] JSON 解析错误: {e}")
Expand Down Expand Up @@ -179,6 +184,28 @@ def publish_actions(self, action_id: str, action_info: dict):
self.client.publish(address, json.dumps(action_info), qos=2)
logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}")

def send_ping(self, ping_id: str, timestamp: float):
"""发送ping消息到服务端"""
if self.mqtt_disable:
return
address = f"labs/{MQConfig.lab_id}/ping/"
ping_data = {"ping_id": ping_id, "client_timestamp": timestamp, "type": "ping"}
self.client.publish(address, json.dumps(ping_data), qos=2)

def setup_pong_subscription(self):
"""设置pong消息订阅"""
if self.mqtt_disable:
return
pong_topic = f"labs/{MQConfig.lab_id}/pong/"
self.client.subscribe(pong_topic, 0)
logger.debug(f"Subscribed to pong topic: {pong_topic}")

def handle_pong(self, pong_data: dict):
"""处理pong响应(这个方法会在收到pong消息时被调用)"""
logger.debug(f"Pong received: {pong_data}")
# 这里会被HostNode的ping-pong处理逻辑调用
pass


mqtt_client = MQTTClient()

Expand Down
146 changes: 87 additions & 59 deletions unilabos/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,10 @@ def __init__(self, registry_paths=None):
self.ResourceCreateFromOuterEasy = self._replace_type_with_class(
"ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource"
)
self.device_type_registry = {
"host_node": {
"description": "UniLabOS主机节点",
"class": {
"module": "unilabos.ros.nodes.presets.host_node",
"type": "python",
"status_types": {},
"action_value_mappings": {
"create_resource_detailed": {
"type": msg_converter_manager.search_class("ResourceCreateFromOuter"),
"goal": {
"resources": "resources",
"device_ids": "device_ids",
"bind_parent_ids": "bind_parent_ids",
"bind_locations": "bind_locations",
"other_calling_params": "other_calling_params",
},
"feedback": {},
"result": {
"success": "success"
},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuter)
},
"create_resource": {
"type": msg_converter_manager.search_class("ResourceCreateFromOuterEasy"),
"goal": {
"res_id": "res_id",
"class_name": "class_name",
"parent": "parent",
"device_id": "device_id",
"bind_locations": "bind_locations",
"liquid_input_slot": "liquid_input_slot[]",
"liquid_type": "liquid_type[]",
"liquid_volume": "liquid_volume[]",
"slot_on_deck": "slot_on_deck",
},
"feedback": {},
"result": {
"success": "success"
},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy)
}
}
},
"schema": {
"properties": {},
"additionalProperties": False,
"type": "object"
},
"file_path": "/"
}
}
self.EmptyIn = self._replace_type_with_class(
"EmptyIn", "host_node", f""
)
self.device_type_registry = {}
self.resource_type_registry = {}
self._setup_called = False # 跟踪setup是否已调用
# 其他状态变量
Expand All @@ -88,9 +40,70 @@ def setup(self):
logger.critical("[UniLab Registry] setup方法已被调用过,不允许多次调用")
return

# 标记setup已被调用
self._setup_called = True
from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type

self.device_type_registry.update(
{
"host_node": {
"description": "UniLabOS主机节点",
"class": {
"module": "unilabos.ros.nodes.presets.host_node",
"type": "python",
"status_types": {},
"action_value_mappings": {
"create_resource_detailed": {
"type": self.ResourceCreateFromOuter,
"goal": {
"resources": "resources",
"device_ids": "device_ids",
"bind_parent_ids": "bind_parent_ids",
"bind_locations": "bind_locations",
"other_calling_params": "other_calling_params",
},
"feedback": {},
"result": {"success": "success"},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuter),
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal))
),
},
"create_resource": {
"type": self.ResourceCreateFromOuterEasy,
"goal": {
"res_id": "res_id",
"class_name": "class_name",
"parent": "parent",
"device_id": "device_id",
"bind_locations": "bind_locations",
"liquid_input_slot": "liquid_input_slot[]",
"liquid_type": "liquid_type[]",
"liquid_volume": "liquid_volume[]",
"slot_on_deck": "slot_on_deck",
},
"feedback": {},
"result": {"success": "success"},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy),
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
),
},
"test_latency": {
"type": self.EmptyIn,
"goal": {},
"feedback": {},
"result": {"latency_ms": "latency_ms", "time_diff_ms": "time_diff_ms"},
"schema": ros_action_to_json_schema(self.EmptyIn),
"goal_default": {},
},
},
},
"icon": "icon_device.webp",
"registry_type": "device",
"schema": {"properties": {}, "additionalProperties": False, "type": "object"},
"file_path": "/",
}
}
)
logger.debug(f"[UniLab Registry] ----------Setup----------")
self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths):
Expand All @@ -100,6 +113,8 @@ def setup(self):
self.load_device_types(path)
self.load_resource_types(path)
logger.info("[UniLab Registry] 注册表设置完成")
# 标记setup已被调用
self._setup_called = True

def load_resource_types(self, path: os.PathLike):
abs_path = Path(path).absolute()
Expand All @@ -115,6 +130,9 @@ def load_resource_types(self, path: os.PathLike):
resource_info["file_path"] = str(file.absolute()).replace("\\", "/")
if "description" not in resource_info:
resource_info["description"] = ""
if "icon" not in resource_info:
resource_info["icon"] = ""
resource_info["registry_type"] = "resource"
self.resource_type_registry.update(data)
logger.debug(
f"[UniLab Registry] Resource-{current_resource_number} File-{i+1}/{len(files)} "
Expand Down Expand Up @@ -164,6 +182,7 @@ def load_device_types(self, path: os.PathLike):
)
current_device_number = len(self.device_type_registry) + 1
from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type

for i, file in enumerate(files):
data = yaml.safe_load(open(file, encoding="utf-8"))
if data:
Expand All @@ -173,6 +192,9 @@ def load_device_types(self, path: os.PathLike):
device_config["file_path"] = str(file.absolute()).replace("\\", "/")
if "description" not in device_config:
device_config["description"] = ""
if "icon" not in device_config:
device_config["icon"] = ""
device_config["registry_type"] = "device"
if "class" in device_config:
# 处理状态类型
if "status_types" in device_config["class"]:
Expand All @@ -189,7 +211,9 @@ def load_device_types(self, path: os.PathLike):
action_config["type"], device_id, f"动作 {action_name}"
)
if action_config["type"] is not None:
action_config["goal_default"] = yaml.safe_load(io.StringIO(get_yaml_from_goal_type(action_config["type"].Goal)))
action_config["goal_default"] = yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(action_config["type"].Goal))
)
action_config["schema"] = ros_action_to_json_schema(action_config["type"])
else:
logger.warning(
Expand All @@ -212,13 +236,17 @@ def load_device_types(self, path: os.PathLike):
def obtain_registry_device_info(self):
devices = []
for device_id, device_info in self.device_type_registry.items():
msg = {
"id": device_id,
**device_info
}
msg = {"id": device_id, **device_info}
devices.append(msg)
return devices

def obtain_registry_resource_info(self):
resources = []
for resource_id, resource_info in self.resource_type_registry.items():
msg = {"id": resource_id, **resource_info}
resources.append(msg)
return resources


# 全局单例实例
lab_registry = Registry()
Expand Down
Loading