#!/usr/bin/python import rclpy from rclpy.action import ActionServer from rclpy.node import Node import numpy as np from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.executors import MultiThreadedExecutor from geometry_msgs.msg import Pose, PoseStamped from rbs_skill_interfaces.action import MoveitSendPose class PoseSubscriber(Node): def __init__(self, parent=None): super().__init__('pose_subscriber') self.parent = parent self._sub = self.create_subscription(PoseStamped, "/cartesian_motion_controller/current_pose", self.parent.on_pose_callback, 1, callback_group=self.parent._callback_group) self.get_logger().info('PoseSubscriber node initialized') class CartesianMoveToPose(Node): def __init__(self): super().__init__('cartesian_move_to_pose') self._callback_group = ReentrantCallbackGroup() self._action_server = ActionServer( self, MoveitSendPose, 'cartesian_move_to_pose', self.execute_callback, callback_group=self._callback_group) self._pub = self.create_publisher(PoseStamped, "/cartesian_motion_controller/target_frame", 1, callback_group=self._callback_group) self.current_pose = None self.goal_tolerance = 0.05 def on_pose_callback(self, msg: PoseStamped): if isinstance(msg, PoseStamped): self.current_pose = msg def execute_callback(self, goal_handle): self.get_logger().debug(f"Executing goal {goal_handle.request.target_pose}") tp = PoseStamped() tp.pose = goal_handle.request.target_pose tp.header.stamp = self.get_clock().now().to_msg() tp.header.frame_id = "base_link" while self.get_distance_to_target(tp.pose) >= self.goal_tolerance: self._pub.publish(tp) goal_handle.succeed() result = MoveitSendPose.Result() result.success = True return result def get_distance_to_target(self, target_pose: Pose): if self.current_pose is None or self.current_pose.pose is None: self.get_logger().warn("Current pose is not available") return None current_pose = self.current_pose.pose current_position = np.array([ current_pose.position.x, current_pose.position.y, current_pose.position.z, current_pose.orientation.x, current_pose.orientation.y, current_pose.orientation.z ]) target_position = np.array([ target_pose.position.x, target_pose.position.y, target_pose.position.z, target_pose.orientation.x, target_pose.orientation.y, target_pose.orientation.z ]) # Проверка на наличие значений в массивах координат if np.any(np.isnan(current_position)) or np.any(np.isnan(target_position)): self.get_logger().error("Invalid coordinates") return None # Вычисляем расстояние между текущей и целевой позициями distance = np.linalg.norm(current_position - target_position) return distance def main(args=None): rclpy.init(args=args) cartesian_move_to_pose = CartesianMoveToPose() pose_subscriber = PoseSubscriber(parent=cartesian_move_to_pose) executor = MultiThreadedExecutor() executor.add_node(cartesian_move_to_pose) executor.add_node(pose_subscriber) executor.spin() rclpy.shutdown() if __name__ == '__main__': main()