#!/usr/bin/python import rclpy from rclpy.action import ActionServer from rclpy.node import Node import numpy as np from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.executors import MultiThreadedExecutor import math from geometry_msgs.msg import Pose, PoseStamped from rbs_skill_interfaces.action import MoveitSendPose from scipy.spatial.transform import Rotation as R from scipy.spatial.transform import Slerp class CartesianMoveToPose(Node): def __init__(self): super().__init__('cartesian_move_to_pose') # pyright: ignore[] self.declare_parameter("base_link", "base_link") self.declare_parameter("robot_name", "") self._callback_group = ReentrantCallbackGroup() self._action_server = ActionServer( self, MoveitSendPose, 'cartesian_move_to_pose', self.execute_callback, callback_group=self._callback_group) # for multirobot setup where each robot name is a namespace self.robot_name: str = self.get_parameter("robot_name").get_parameter_value().string_value self.robot_name = self.robot_name.lstrip('/').rstrip('/') self.robot_name = f"/{self.robot_name}" if self.robot_name else "" self._pub = self.create_publisher(PoseStamped, f"{self.robot_name}/cartesian_motion_controller/target_frame", 1, callback_group=self._callback_group) self.current_pose = None self.goal_tolerance = 0.05 self.max_speed = 0.1 self.max_acceleration = 0.05 self.base_link = self.get_parameter("base_link").get_parameter_value().string_value def on_pose_callback(self, msg: PoseStamped): if isinstance(msg, PoseStamped): self.current_pose = msg def execute_callback(self, goal_handle): self.get_logger().debug(f"Executing goal {goal_handle.request.target_pose}") target_pose = goal_handle.request.target_pose start_pose = self.current_pose.pose if self.current_pose else None result = MoveitSendPose.Result() if start_pose is None: self.get_logger().error("Current pose is not available") goal_handle.abort() result.success = False return result trajectory = self.generate_trajectory(start_pose, target_pose) for point in trajectory: tp = PoseStamped() tp.pose = point tp.header.stamp = self.get_clock().now().to_msg() tp.header.frame_id = self.base_link self._pub.publish(tp) rclpy.spin_once(self, timeout_sec=0.1) goal_handle.succeed() result.success = True return result def generate_trajectory(self, start_pose, target_pose): start_position = np.array([ start_pose.position.x, start_pose.position.y, start_pose.position.z ]) target_position = np.array([ target_pose.position.x, target_pose.position.y, target_pose.position.z ]) start_orientation = R.from_quat([ start_pose.orientation.x, start_pose.orientation.y, start_pose.orientation.z, start_pose.orientation.w ]) target_orientation = R.from_quat([ target_pose.orientation.x, target_pose.orientation.y, target_pose.orientation.z, target_pose.orientation.w ]) distance = np.linalg.norm(target_position - start_position) max_speed = self.max_speed max_acceleration = self.max_acceleration t_acc = max_speed / max_acceleration d_acc = 0.5 * max_acceleration * t_acc**2 if distance < 2 * d_acc: t_acc = math.sqrt(distance / max_acceleration) t_flat = 0 else: t_flat = (distance - 2 * d_acc) / max_speed total_time = 2 * t_acc + t_flat num_points = int(total_time * 10) trajectory = [] times = np.linspace(0, total_time, num_points + 1) key_rots = R.from_quat([start_orientation.as_quat(), target_orientation.as_quat()]) slerp = Slerp([0, total_time], key_rots) for t in times: if t < t_acc: fraction = 0.5 * max_acceleration * t**2 / distance elif t < t_acc + t_flat: fraction = (d_acc + max_speed * (t - t_acc)) / distance else: t_decel = t - t_acc - t_flat fraction = (d_acc + max_speed * t_flat + 0.5 * max_acceleration * t_decel**2) / distance intermediate_position = start_position + fraction * (target_position - start_position) intermediate_orientation = slerp([t])[0] intermediate_pose = Pose() intermediate_pose.position.x = intermediate_position[0] intermediate_pose.position.y = intermediate_position[1] intermediate_pose.position.z = intermediate_position[2] intermediate_orientation_quat = intermediate_orientation.as_quat() intermediate_pose.orientation.x = intermediate_orientation_quat[0] intermediate_pose.orientation.y = intermediate_orientation_quat[1] intermediate_pose.orientation.z = intermediate_orientation_quat[2] intermediate_pose.orientation.w = intermediate_orientation_quat[3] trajectory.append(intermediate_pose) return trajectory def get_distance_to_target(self, target_pose: Pose): if self.current_pose is None or self.current_pose.pose is None: self.get_logger().warn("Current pose is not available") return None current_pose = self.current_pose.pose current_position = np.array([ current_pose.position.x, current_pose.position.y, current_pose.position.z ]) target_position = np.array([ target_pose.position.x, target_pose.position.y, target_pose.position.z ]) if np.any(np.isnan(current_position)) or np.any(np.isnan(target_position)): self.get_logger().error("Invalid coordinates") return None distance = np.linalg.norm(current_position - target_position) return distance class PoseSubscriber(Node): def __init__(self, parent: CartesianMoveToPose, robot_name: str): super().__init__('pose_subscriber') # pyright: ignore[] self.parent = parent self._sub = self.create_subscription(PoseStamped, f"{robot_name}/cartesian_motion_controller/current_pose", self.parent.on_pose_callback, 1, callback_group=self.parent._callback_group) self.get_logger().info('PoseSubscriber node initialized') def main(args=None): rclpy.init(args=args) cartesian_move_to_pose = CartesianMoveToPose() pose_subscriber = PoseSubscriber(parent=cartesian_move_to_pose, robot_name=cartesian_move_to_pose.robot_name) executor = MultiThreadedExecutor() executor.add_node(cartesian_move_to_pose) executor.add_node(pose_subscriber) executor.spin() rclpy.shutdown() if __name__ == '__main__': main()