#!/usr/bin/env python3 """ Move_to_pose_cartesian_node_via_interface_node ROS 2 program for Move to Pose skill @shalenikol release 0.2 """ import json import math import numpy as np from scipy.spatial.transform import Slerp, Rotation as R import rclpy from rclpy.action import ActionClient, ActionServer from rclpy.node import Node from rclpy.callback_groups import ReentrantCallbackGroup from rcl_interfaces.msg import SetParametersResult from geometry_msgs.msg import Pose, PoseStamped from rbs_skill_interfaces.action import MoveitSendPose NODE_NAME_DEFAULT = "mtp_cartesian" # this name must match the name in the description (["Module"]["node_name"]) PARAM_SKILL_CFG = "mtp_cartesian_cfg" SERVER_NAME = "cartesian_movetopose" TOPIC_CURRENT_POSE = "/cartesian_motion_controller/current_pose" TOPIC_TARGET_FRAME = "/cartesian_motion_controller/target_frame" SERVER_PAR1_BASE_LINK = "base_link" SERVER_PAR2_ROBOT_NAME = "robot_name" class MoveToPoseCartesianSkill(Node): """ skill node """ def _Settings(self): # Initialization service settings for prop in self.skill_cfg["Settings"]["output"]["params"]: nam = prop["name"] val = prop["value"] if nam == "server_name": self.server_name = val elif nam == "end_effector_velocity": self.end_effector_velocity = float(val) elif nam == "end_effector_acceleration": self.end_effector_acceleration = float(val) # for prop in self.skill_cfg["topicsOut"]: # if prop["type"] == OUT_TOPIC_TYPE: # self.topicSrv = prop["name"] def __init__(self, **kwargs): self.end_effector_velocity = 1.0 self.end_effector_acceleration = 1.0 # for other nodes kwargs["allow_undeclared_parameters"] = True kwargs["automatically_declare_parameters_from_overrides"] = True super().__init__(NODE_NAME_DEFAULT, **kwargs) str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value self.skill_cfg = json.loads(str_cfg) self.nodeName = NODE_NAME_DEFAULT self.server_name = SERVER_NAME self._Settings() self._action_client = ActionClient(self, MoveitSendPose, self.server_name) # "/"+ robot_name + self._cnt = 0 self.add_on_set_parameters_callback(self._on_set_param) def _on_set_param(self, parameter_list): self._cnt += 1 for parameter in parameter_list: # self.get_logger().info(f'{self._cnt}$ {parameter.name}={parameter.value}') if parameter.name == PARAM_SKILL_CFG: self.skill_cfg = json.loads(parameter.value) self._Settings() dependency = {} for comm in self.skill_cfg["BTAction"]: for par in comm["param"]: if par["type"] == "move_to_pose": dependency = par["dependency"] assert dependency, "no dependency" self.send_goal(dependency) self.get_logger().info(f"{self._cnt}) dependency = {dependency}") break return SetParametersResult(successful=True) def send_goal(self, dep): goal_msg = MoveitSendPose.Goal() pose = dep["pose"] # goal_msg.target_pose goal_msg.target_pose.position.x = pose["position"]["x"] goal_msg.target_pose.position.y = pose["position"]["y"] goal_msg.target_pose.position.z = pose["position"]["z"] goal_msg.target_pose.orientation.x = pose["orientation"]["x"] goal_msg.target_pose.orientation.y = pose["orientation"]["y"] goal_msg.target_pose.orientation.z = pose["orientation"]["z"] goal_msg.target_pose.orientation.w = pose["orientation"]["w"] goal_msg.robot_name = dep["robot_name"] goal_msg.end_effector_velocity = self.end_effector_velocity goal_msg.end_effector_acceleration = self.end_effector_acceleration self._action_client.wait_for_server() return self._action_client.send_goal_async(goal_msg) class CartesianMoveToPose(Node): """ action server """ def __init__(self, server_name:str = SERVER_NAME): super().__init__("cartesian_server") self.declare_parameter(SERVER_PAR1_BASE_LINK, "base_link") self.declare_parameter(SERVER_PAR2_ROBOT_NAME, "") self._callback_group = ReentrantCallbackGroup() self._action_server = ActionServer( self, MoveitSendPose, server_name, self.execute_callback, callback_group=self._callback_group) self.base_link: str = self.get_parameter(SERVER_PAR1_BASE_LINK).get_parameter_value().string_value # for multirobot setup where each robot name is a namespace self.robot_name: str = self.get_parameter(SERVER_PAR2_ROBOT_NAME).get_parameter_value().string_value self.robot_name = self.robot_name.lstrip('/').rstrip('/') self.robot_name = f"/{self.robot_name}" if self.robot_name else "" self._pub = self.create_publisher(PoseStamped, self.robot_name + TOPIC_TARGET_FRAME, 1, callback_group=self._callback_group) self.current_pose = None self.goal_tolerance = 0.05 self.max_speed = 0.1 self.max_acceleration = 0.05 def on_pose_callback(self, msg: PoseStamped): if isinstance(msg, PoseStamped): self.current_pose = msg def execute_callback(self, goal_handle): self.get_logger().debug(f"Executing goal {goal_handle.request.target_pose}") target_pose = goal_handle.request.target_pose start_pose = self.current_pose.pose if self.current_pose else None if start_pose is None: self.get_logger().error("Current pose is not available") goal_handle.abort() return MoveitSendPose.Result() trajectory = self.generate_trajectory(start_pose, target_pose) for point in trajectory: tp = PoseStamped() tp.pose = point tp.header.stamp = self.get_clock().now().to_msg() tp.header.frame_id = self.base_link self._pub.publish(tp) rclpy.spin_once(self, timeout_sec=0.1) goal_handle.succeed() result = MoveitSendPose.Result() result.success = True return result def generate_trajectory(self, start_pose, target_pose): start_position = np.array([ start_pose.position.x, start_pose.position.y, start_pose.position.z ]) target_position = np.array([ target_pose.position.x, target_pose.position.y, target_pose.position.z ]) start_orientation = R.from_quat([ start_pose.orientation.x, start_pose.orientation.y, start_pose.orientation.z, start_pose.orientation.w ]) target_orientation = R.from_quat([ target_pose.orientation.x, target_pose.orientation.y, target_pose.orientation.z, target_pose.orientation.w ]) distance = np.linalg.norm(target_position - start_position) max_speed = self.max_speed max_acceleration = self.max_acceleration t_acc = max_speed / max_acceleration d_acc = 0.5 * max_acceleration * t_acc**2 if distance < 2 * d_acc: t_acc = math.sqrt(distance / max_acceleration) t_flat = 0 else: t_flat = (distance - 2 * d_acc) / max_speed total_time = 2 * t_acc + t_flat num_points = int(total_time * 10) trajectory = [] times = np.linspace(0, total_time, num_points + 1) key_rots = R.from_quat([start_orientation.as_quat(), target_orientation.as_quat()]) slerp = Slerp([0, total_time], key_rots) for t in times: if t < t_acc: fraction = 0.5 * max_acceleration * t**2 / distance elif t < t_acc + t_flat: fraction = (d_acc + max_speed * (t - t_acc)) / distance else: t_decel = t - t_acc - t_flat fraction = (d_acc + max_speed * t_flat + 0.5 * max_acceleration * t_decel**2) / distance intermediate_position = start_position + fraction * (target_position - start_position) intermediate_orientation = slerp([t])[0] intermediate_pose = Pose() intermediate_pose.position.x = intermediate_position[0] intermediate_pose.position.y = intermediate_position[1] intermediate_pose.position.z = intermediate_position[2] intermediate_orientation_quat = intermediate_orientation.as_quat() intermediate_pose.orientation.x = intermediate_orientation_quat[0] intermediate_pose.orientation.y = intermediate_orientation_quat[1] intermediate_pose.orientation.z = intermediate_orientation_quat[2] intermediate_pose.orientation.w = intermediate_orientation_quat[3] trajectory.append(intermediate_pose) return trajectory class PoseSubscriber(Node): def __init__(self, parent: CartesianMoveToPose, robot_name: str): super().__init__("pose_subscriber") self.parent = parent self._sub = self.create_subscription(PoseStamped, robot_name + TOPIC_CURRENT_POSE, self.parent.on_pose_callback, 1, callback_group=self.parent._callback_group) self.get_logger().info("PoseSubscriber node initialized") def main(): rclpy.init() node = MoveToPoseCartesianSkill() cartesian_mtp_node = CartesianMoveToPose(node.server_name) pose_subscriber = PoseSubscriber(parent=cartesian_mtp_node, robot_name=cartesian_mtp_node.robot_name) executor = rclpy.executors.MultiThreadedExecutor() executor.add_node(cartesian_mtp_node) executor.add_node(pose_subscriber) executor.add_node(node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): pose_subscriber.destroy_node() cartesian_mtp_node.destroy_node() node.destroy_node() if __name__ == '__main__': main()