|  | 
|  | 1 | +# Copyright (C) 2025 Robotec.AI | 
|  | 2 | +# | 
|  | 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | 4 | +# you may not use this file except in compliance with the License. | 
|  | 5 | +# You may obtain a copy of the License at | 
|  | 6 | +# | 
|  | 7 | +#         http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 8 | +# | 
|  | 9 | +# Unless required by applicable law or agreed to in writing, software | 
|  | 10 | +# distributed under the License is distributed on an "AS IS" BASIS, | 
|  | 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | 12 | +# See the License for the specific language governing permissions and | 
|  | 13 | +# limitations under the License. | 
|  | 14 | + | 
|  | 15 | +from typing import Type | 
|  | 16 | + | 
|  | 17 | +from geometry_msgs.msg import PoseStamped, Quaternion | 
|  | 18 | +from nav2_msgs.action import NavigateToPose | 
|  | 19 | +from pydantic import BaseModel, Field | 
|  | 20 | +from rclpy.action import ActionClient | 
|  | 21 | +from tf_transformations import quaternion_from_euler | 
|  | 22 | + | 
|  | 23 | +from rai.tools.ros2.base import BaseROS2Tool | 
|  | 24 | + | 
|  | 25 | + | 
|  | 26 | +class NavigateToPoseBlockingToolInput(BaseModel): | 
|  | 27 | +    x: float = Field(..., description="The x coordinate of the pose") | 
|  | 28 | +    y: float = Field(..., description="The y coordinate of the pose") | 
|  | 29 | +    z: float = Field(..., description="The z coordinate of the pose") | 
|  | 30 | +    yaw: float = Field(..., description="The yaw angle of the pose") | 
|  | 31 | + | 
|  | 32 | + | 
|  | 33 | +class NavigateToPoseBlockingTool(BaseROS2Tool): | 
|  | 34 | +    name: str = "navigate_to_pose_blocking" | 
|  | 35 | +    description: str = "Navigate to a specific pose" | 
|  | 36 | +    frame_id: str = Field( | 
|  | 37 | +        default="map", description="The frame id of the Nav2 stack (map, odom, etc.)" | 
|  | 38 | +    ) | 
|  | 39 | +    action_name: str = Field( | 
|  | 40 | +        default="navigate_to_pose", description="The name of the Nav2 action" | 
|  | 41 | +    ) | 
|  | 42 | +    args_schema: Type[NavigateToPoseBlockingToolInput] = NavigateToPoseBlockingToolInput | 
|  | 43 | + | 
|  | 44 | +    def _run(self, x: float, y: float, z: float, yaw: float) -> str: | 
|  | 45 | +        action_client = ActionClient( | 
|  | 46 | +            self.connector.node, NavigateToPose, self.action_name | 
|  | 47 | +        ) | 
|  | 48 | + | 
|  | 49 | +        pose = PoseStamped() | 
|  | 50 | +        pose.header.frame_id = self.frame_id | 
|  | 51 | +        pose.header.stamp = self.connector.node.get_clock().now().to_msg() | 
|  | 52 | +        pose.pose.position.x = x | 
|  | 53 | +        pose.pose.position.y = y | 
|  | 54 | +        pose.pose.position.z = z | 
|  | 55 | +        quat = quaternion_from_euler(0, 0, yaw) | 
|  | 56 | +        pose.pose.orientation = Quaternion(x=quat[0], y=quat[1], z=quat[2], w=quat[3]) | 
|  | 57 | + | 
|  | 58 | +        goal = NavigateToPose.Goal() | 
|  | 59 | +        goal.pose = pose | 
|  | 60 | + | 
|  | 61 | +        result = action_client.send_goal(goal) | 
|  | 62 | + | 
|  | 63 | +        if result is None: | 
|  | 64 | +            return "Navigate to pose action failed. Please try again." | 
|  | 65 | + | 
|  | 66 | +        if result.result.error_code != 0: | 
|  | 67 | +            return f"Navigate to pose action failed. Error code: {result.result.error_code}" | 
|  | 68 | + | 
|  | 69 | +        return "Navigate to pose successful." | 
0 commit comments