runtime/rbs_skill_servers/scripts/move_to_pose.py

109 lines
3.8 KiB
Python
Raw Normal View History

2024-04-22 15:01:40 +03:00
#!/usr/bin/python
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
import numpy as np
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from geometry_msgs.msg import Pose, PoseStamped
from rbs_skill_interfaces.action import MoveitSendPose
class PoseSubscriber(Node):
def __init__(self, parent=None):
super().__init__('pose_subscriber')
self.parent = parent
self._sub = self.create_subscription(PoseStamped,
"/cartesian_motion_controller/current_pose",
self.parent.on_pose_callback, 1,
callback_group=self.parent._callback_group)
self.get_logger().info('PoseSubscriber node initialized')
class CartesianMoveToPose(Node):
def __init__(self):
super().__init__('cartesian_move_to_pose')
self._callback_group = ReentrantCallbackGroup()
self._action_server = ActionServer(
self,
MoveitSendPose,
'cartesian_move_to_pose',
self.execute_callback, callback_group=self._callback_group)
self._pub = self.create_publisher(PoseStamped,
"/cartesian_motion_controller/target_frame", 1,
callback_group=self._callback_group)
self.current_pose = None
self.goal_tolerance = 0.05
def on_pose_callback(self, msg: PoseStamped):
if isinstance(msg, PoseStamped):
self.current_pose = msg
def execute_callback(self, goal_handle):
self.get_logger().debug(f"Executing goal {goal_handle.request.target_pose}")
tp = PoseStamped()
tp.pose = goal_handle.request.target_pose
tp.header.stamp = self.get_clock().now().to_msg()
tp.header.frame_id = "base_link"
while self.get_distance_to_target(tp.pose) >= self.goal_tolerance:
self._pub.publish(tp)
goal_handle.succeed()
result = MoveitSendPose.Result()
result.success = True
return result
def get_distance_to_target(self, target_pose: Pose):
if self.current_pose is None or self.current_pose.pose is None:
self.get_logger().warn("Current pose is not available")
return None
current_pose = self.current_pose.pose
current_position = np.array([
current_pose.position.x,
current_pose.position.y,
current_pose.position.z,
current_pose.orientation.x,
current_pose.orientation.y,
current_pose.orientation.z
])
target_position = np.array([
target_pose.position.x,
target_pose.position.y,
target_pose.position.z,
target_pose.orientation.x,
target_pose.orientation.y,
target_pose.orientation.z
])
# Проверка на наличие значений в массивах координат
if np.any(np.isnan(current_position)) or np.any(np.isnan(target_position)):
self.get_logger().error("Invalid coordinates")
return None
# Вычисляем расстояние между текущей и целевой позициями
distance = np.linalg.norm(current_position - target_position)
return distance
def main(args=None):
rclpy.init(args=args)
cartesian_move_to_pose = CartesianMoveToPose()
pose_subscriber = PoseSubscriber(parent=cartesian_move_to_pose)
executor = MultiThreadedExecutor()
executor.add_node(cartesian_move_to_pose)
executor.add_node(pose_subscriber)
executor.spin()
rclpy.shutdown()
if __name__ == '__main__':
main()