#!/usr/bin/env python3 """ Move_to_pose_cartesian_node_via_interface_node ROS 2 program for Move to Pose skill @shalenikol release 0.3 """ import json import rclpy from rclpy.action import ActionClient from rclpy.node import Node from rcl_interfaces.msg import SetParametersResult from rbs_skill_interfaces.action import MoveitSendPose from rbs_skill_interfaces.srv import RbsBt NODE_NAME_DEFAULT = "mtp_cartesian" # this name must match the name in the description (["Module"]["node_name"]) PARAM_SKILL_CFG = "mtp_cartesian_cfg" COMPLETION_SRV_NAME = "/completion" SERVER_NAME = "cartesian_movetopose" class MoveToPoseCartesianSkill(Node): """ skill node """ def _Settings(self): # Initialization service settings for prop in self.skill_cfg["Settings"]["output"]["params"]: nam = prop["name"] val = prop["value"] if nam == "server_name": self.server_name = val elif nam == "end_effector_velocity": self.end_effector_velocity = float(val) elif nam == "end_effector_acceleration": self.end_effector_acceleration = float(val) # for prop in self.skill_cfg["topicsOut"]: # if prop["type"] == OUT_TOPIC_TYPE: # self.topicSrv = prop["name"] def __init__(self, **kwargs): self._tmode = 0 self._completion = False self.end_effector_velocity = 1.0 self.end_effector_acceleration = 1.0 # for other nodes kwargs["allow_undeclared_parameters"] = True kwargs["automatically_declare_parameters_from_overrides"] = True super().__init__(NODE_NAME_DEFAULT, **kwargs) str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value self.skill_cfg = json.loads(str_cfg) self.nodeName = NODE_NAME_DEFAULT self.server_name = SERVER_NAME self._Settings() self._action_client = ActionClient(self, MoveitSendPose, self.server_name) # "/"+ robot_name + self._srv_completion = self.create_service(RbsBt, NODE_NAME_DEFAULT + COMPLETION_SRV_NAME, self.completion_callback) self._cnt = 0 self.add_on_set_parameters_callback(self._on_set_param) def _on_set_param(self, parameter_list): self._cnt += 1 for parameter in parameter_list: # self.get_logger().info(f'{self._cnt}$ {parameter.name}={parameter.value}') if parameter.name == PARAM_SKILL_CFG: self.skill_cfg = json.loads(parameter.value) self._Settings() self.dependency = {} for comm in self.skill_cfg["BTAction"]: for par in comm["param"]: if par["type"] == "move_to_pose": self.dependency = par["dependency"] assert self.dependency, "no dependency" self._completion = False # run new action self.act_timer = self.create_timer(0.01, self.t_goal) break return SetParametersResult(successful=True) def completion_callback(self, request, response): # if request.command == "isCompletion": response.ok = self._completion return response def t_goal(self): if self._tmode == 0: self.get_logger().info(f"{self._cnt}) dependency = {self.dependency}") goal_msg = self.set_goal_msg(self.dependency) self._action_client.wait_for_server() self.goal_fut = self._action_client.send_goal_async(goal_msg) self.get_logger().info(f"goal {self._cnt}): waiting for completion...") self._tmode = 1 elif self._tmode == 1: if self.goal_fut.result(): result_future = self.goal_fut.result() # ClientGoalHandle # stop timer self.act_timer.cancel() self._tmode = 0 result_exe = result_future.get_result_async() result_exe.add_done_callback(self.get_result_callback) def get_result_callback(self, future): self._completion = True result = future.result().result self.get_logger().info(f"result_callback: goal - {result.success}") def set_goal_msg(self, dep): goal_msg = MoveitSendPose.Goal() pose = dep["pose"] goal_msg.target_pose.position.x = pose["position"]["x"] goal_msg.target_pose.position.y = pose["position"]["y"] goal_msg.target_pose.position.z = pose["position"]["z"] goal_msg.target_pose.orientation.x = pose["orientation"]["x"] goal_msg.target_pose.orientation.y = pose["orientation"]["y"] goal_msg.target_pose.orientation.z = pose["orientation"]["z"] goal_msg.target_pose.orientation.w = pose["orientation"]["w"] goal_msg.robot_name = dep["robot_name"] goal_msg.end_effector_velocity = self.end_effector_velocity goal_msg.end_effector_acceleration = self.end_effector_acceleration return goal_msg def main(): rclpy.init() node = MoveToPoseCartesianSkill() executor = rclpy.executors.SingleThreadedExecutor() executor.add_node(node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): node.destroy_node() if __name__ == '__main__': main()