1414
1515from subprocess import PIPE , Popen
1616from threading import Timer
17- from typing import List , Literal
17+ from typing import List , Literal , Optional
1818
1919from langchain_core .tools import tool
2020
@@ -55,114 +55,90 @@ def run_command(cmd: List[str], timeout: int = 5):
5555@tool
5656def ros2_action (
5757 command : Literal ["info" , "list" , "type" ],
58- action_name : str = "" ,
58+ arguments : Optional [ List [ str ]] = None ,
5959 timeout : int = 5 ,
6060):
6161 """Run a ROS2 action command
6262 Args:
6363 command: The action command to run (info/list/type)
64- action_name: Name of the action (required for info and type)
64+ arguments: Additional arguments for the command as a list of strings
6565 timeout: Command timeout in seconds
6666 """
67- if command in ["info" , "type" ]:
68- if not action_name :
69- raise ValueError ("Action name is required for info and type commands" )
70-
7167 cmd = ["ros2" , "action" , command ]
72- if action_name :
73- cmd .append ( action_name )
68+ if arguments :
69+ cmd .extend ( arguments )
7470 return run_command (cmd , timeout )
7571
7672
7773@tool
7874def ros2_service (
7975 command : Literal ["call" , "find" , "info" , "list" , "type" ],
80- service_name : str = "" ,
76+ arguments : Optional [ List [ str ]] = None ,
8177 timeout : int = 5 ,
8278):
8379 """Run a ROS2 service command
8480 Args:
8581 command: The service command to run
86- service_name: Name of the service (required for call, info, and type)
82+ arguments: Additional arguments for the command as a list of strings
8783 timeout: Command timeout in seconds
8884 """
89- if command in ["call" , "info" , "type" ]:
90- if not service_name :
91- raise ValueError (
92- "Service name is required for call, info, and type commands"
93- )
94-
9585 cmd = ["ros2" , "service" , command ]
96- if service_name :
97- cmd .append ( service_name )
86+ if arguments :
87+ cmd .extend ( arguments )
9888 return run_command (cmd , timeout )
9989
10090
10191@tool
102- def ros2_node (command : Literal ["info" , "list" ], node_name : str = "" , timeout : int = 5 ):
92+ def ros2_node (
93+ command : Literal ["info" , "list" ],
94+ arguments : Optional [List [str ]] = None ,
95+ timeout : int = 5 ,
96+ ):
10397 """Run a ROS2 node command
10498 Args:
10599 command: The node command to run
106- node_name: Name of the node (required for info)
100+ arguments: Additional arguments for the command as a list of strings
107101 timeout: Command timeout in seconds
108102 """
109- if command == "info" :
110- if not node_name :
111- raise ValueError ("Node name is required for info command" )
112-
113103 cmd = ["ros2" , "node" , command ]
114- if node_name :
115- cmd .append ( node_name )
104+ if arguments :
105+ cmd .extend ( arguments )
116106 return run_command (cmd , timeout )
117107
118108
119109@tool
120110def ros2_param (
121111 command : Literal ["delete" , "describe" , "dump" , "get" , "list" , "set" ],
122- node_name : str = "" ,
123- param_name : str = "" ,
112+ arguments : Optional [List [str ]] = None ,
124113 timeout : int = 5 ,
125114):
126115 """Run a ROS2 parameter command
127116 Args:
128117 command: The parameter command to run
129- node_name: Name of the node
130- param_name: Name of the parameter (required for get, set, delete)
118+ arguments: Additional arguments for the command as a list of strings
131119 timeout: Command timeout in seconds
132120 """
133- if command in ["get" , "set" , "delete" ]:
134- if not param_name :
135- raise ValueError (
136- "Parameter name is required for get, set, and delete commands"
137- )
138-
139121 cmd = ["ros2" , "param" , command ]
140- if node_name :
141- cmd .append (node_name )
142- if param_name :
143- cmd .append (param_name )
122+ if arguments :
123+ cmd .extend (arguments )
144124 return run_command (cmd , timeout )
145125
146126
147127@tool
148128def ros2_interface (
149129 command : Literal ["list" , "package" , "packages" , "proto" , "show" ],
150- interface_name : str = "" ,
130+ arguments : Optional [ List [ str ]] = None ,
151131 timeout : int = 5 ,
152132):
153133 """Run a ROS2 interface command
154134 Args:
155135 command: The interface command to run
156- interface_name: Name of the interface (required for show, proto)
136+ arguments: Additional arguments for the command as a list of strings
157137 timeout: Command timeout in seconds
158138 """
159- if command in ["show" , "proto" ]:
160- if not interface_name :
161- raise ValueError ("Interface name is required for show and proto commands" )
162-
163139 cmd = ["ros2" , "interface" , command ]
164- if interface_name :
165- cmd .append ( interface_name )
140+ if arguments :
141+ cmd .extend ( arguments )
166142 return run_command (cmd , timeout )
167143
168144
@@ -171,7 +147,7 @@ def ros2_topic(
171147 command : Literal [
172148 "bw" , "delay" , "echo" , "find" , "hz" , "info" , "list" , "pub" , "type"
173149 ],
174- topic_name : str = "" ,
150+ arguments : Optional [ List [ str ]] = None ,
175151 timeout : int = 5 ,
176152):
177153 """Run a ROS2 topic command
@@ -186,14 +162,10 @@ def ros2_topic(
186162 - list: Output a list of available topics
187163 - pub: Publish a message to a topic
188164 - type: Print a topic's type
189- topic_name: Name of the topic (required for all commands except list)
165+ arguments: Additional arguments for the command as a list of strings
190166 timeout: Command timeout in seconds
191167 """
192- if command in ["bw" , "delay" , "echo" , "hz" , "info" , "pub" , "type" ]:
193- if not topic_name :
194- raise ValueError ("Topic name is required for all commands except list" )
195-
196168 cmd = ["ros2" , "topic" , command ]
197- if topic_name :
198- cmd .append ( topic_name )
169+ if arguments :
170+ cmd .extend ( arguments )
199171 return run_command (cmd , timeout )
0 commit comments