Cartesian Move_to_pose Skill for BT Interface Node
This commit is contained in:
parent
bf965bb750
commit
b4b452297d
18 changed files with 743 additions and 64 deletions
264
rbss_movetopose/scripts/mtp_cartesian.py
Executable file
264
rbss_movetopose/scripts/mtp_cartesian.py
Executable file
|
@ -0,0 +1,264 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Move_to_pose_cartesian_node_via_interface_node
|
||||
ROS 2 program for Move to Pose skill
|
||||
|
||||
@shalenikol release 0.1
|
||||
"""
|
||||
import json
|
||||
import math
|
||||
import numpy as np
|
||||
from scipy.spatial.transform import Slerp, Rotation as R
|
||||
|
||||
import rclpy
|
||||
from rclpy.action import ActionClient, ActionServer
|
||||
from rclpy.node import Node
|
||||
from rclpy.callback_groups import ReentrantCallbackGroup
|
||||
|
||||
from rcl_interfaces.msg import SetParametersResult
|
||||
from geometry_msgs.msg import Pose, PoseStamped
|
||||
|
||||
from rbs_skill_interfaces.action import MoveitSendPose
|
||||
|
||||
NODE_NAME_DEFAULT = "mtp_cartesian" # the name doesn't matter in this node (defined in Launch)
|
||||
PARAM_SKILL_CFG = "mtp_cartesian_cfg"
|
||||
SERVER_NAME = "cartesian_movetopose"
|
||||
TOPIC_CURRENT_POSE = "/cartesian_motion_controller/current_pose"
|
||||
TOPIC_TARGET_FRAME = "/cartesian_motion_controller/target_frame"
|
||||
SERVER_PAR1_BASE_LINK = "base_link"
|
||||
SERVER_PAR2_ROBOT_NAME = "robot_name"
|
||||
|
||||
class MoveToPoseCartesianSkill(Node):
|
||||
""" <Move to pose> skill node """
|
||||
def _Settings(self):
|
||||
# Initialization service settings
|
||||
for prop in self.skill_cfg["Settings"]:
|
||||
nam = prop["name"]
|
||||
val = prop["value"]
|
||||
if nam == "server_name":
|
||||
self.server_name = val
|
||||
elif nam == "end_effector_velocity":
|
||||
self.end_effector_velocity = val
|
||||
elif nam == "end_effector_acceleration":
|
||||
self.end_effector_acceleration = val
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.end_effector_velocity = 1.0
|
||||
self.end_effector_acceleration = 1.0
|
||||
# for other nodes
|
||||
kwargs["allow_undeclared_parameters"] = True
|
||||
kwargs["automatically_declare_parameters_from_overrides"] = True
|
||||
super().__init__(NODE_NAME_DEFAULT, **kwargs)
|
||||
str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
|
||||
|
||||
self.skill_cfg = json.loads(str_cfg)
|
||||
self.nodeName = self.skill_cfg["Module"]["node_name"] #["Launch"]["name"]
|
||||
|
||||
self.server_name = SERVER_NAME #"cartesian_move_to_pose"
|
||||
self._Settings()
|
||||
|
||||
self._action_client = ActionClient(self, MoveitSendPose, self.server_name) # "/"+ robot_name +
|
||||
|
||||
self._cnt = 0
|
||||
self.add_on_set_parameters_callback(self._on_set_param)
|
||||
|
||||
def _on_set_param(self, parameter_list):
|
||||
self._cnt += 1
|
||||
for parameter in parameter_list:
|
||||
# self.get_logger().info(f'{self._cnt}$ {parameter.name}={parameter.value}')
|
||||
if parameter.name == PARAM_SKILL_CFG:
|
||||
self.skill_cfg = json.loads(parameter.value)
|
||||
|
||||
self._Settings()
|
||||
dependency = {}
|
||||
for comm in self.skill_cfg["BTAction"]:
|
||||
for par in comm["param"]:
|
||||
if par["type"] == "move_to_pose":
|
||||
dependency = par["dependency"]
|
||||
assert dependency, "no dependency"
|
||||
|
||||
self.send_goal(dependency)
|
||||
self.get_logger().info(f"{self._cnt}) dependency = {dependency}")
|
||||
break
|
||||
return SetParametersResult(successful=True)
|
||||
|
||||
def send_goal(self, dep):
|
||||
goal_msg = MoveitSendPose.Goal()
|
||||
pose = dep["pose"]
|
||||
# goal_msg.target_pose
|
||||
goal_msg.target_pose.position.x = pose["position"]["x"]
|
||||
goal_msg.target_pose.position.y = pose["position"]["y"]
|
||||
goal_msg.target_pose.position.z = pose["position"]["z"]
|
||||
|
||||
goal_msg.target_pose.orientation.x = pose["orientation"]["x"]
|
||||
goal_msg.target_pose.orientation.y = pose["orientation"]["y"]
|
||||
goal_msg.target_pose.orientation.z = pose["orientation"]["z"]
|
||||
goal_msg.target_pose.orientation.w = pose["orientation"]["w"]
|
||||
|
||||
goal_msg.robot_name = dep["robot_name"]
|
||||
goal_msg.end_effector_velocity = self.end_effector_velocity
|
||||
goal_msg.end_effector_acceleration = self.end_effector_acceleration
|
||||
|
||||
self._action_client.wait_for_server()
|
||||
|
||||
return self._action_client.send_goal_async(goal_msg)
|
||||
|
||||
class CartesianMoveToPose(Node):
|
||||
""" <Move to pose> action server """
|
||||
def __init__(self, server_name:str = SERVER_NAME):
|
||||
super().__init__("cartesian_server")
|
||||
|
||||
self.declare_parameter(SERVER_PAR1_BASE_LINK, "base_link")
|
||||
self.declare_parameter(SERVER_PAR2_ROBOT_NAME, "")
|
||||
|
||||
self._callback_group = ReentrantCallbackGroup()
|
||||
self._action_server = ActionServer(
|
||||
self,
|
||||
MoveitSendPose,
|
||||
server_name,
|
||||
self.execute_callback, callback_group=self._callback_group)
|
||||
|
||||
self.base_link: str = self.get_parameter(SERVER_PAR1_BASE_LINK).get_parameter_value().string_value
|
||||
# for multirobot setup where each robot name is a namespace
|
||||
self.robot_name: str = self.get_parameter(SERVER_PAR2_ROBOT_NAME).get_parameter_value().string_value
|
||||
self.robot_name = self.robot_name.lstrip('/').rstrip('/')
|
||||
self.robot_name = f"/{self.robot_name}" if self.robot_name else ""
|
||||
self._pub = self.create_publisher(PoseStamped,
|
||||
self.robot_name + TOPIC_TARGET_FRAME, 1,
|
||||
callback_group=self._callback_group)
|
||||
self.current_pose = None
|
||||
self.goal_tolerance = 0.05
|
||||
self.max_speed = 0.1
|
||||
self.max_acceleration = 0.05
|
||||
|
||||
def on_pose_callback(self, msg: PoseStamped):
|
||||
if isinstance(msg, PoseStamped):
|
||||
self.current_pose = msg
|
||||
|
||||
def execute_callback(self, goal_handle):
|
||||
self.get_logger().debug(f"Executing goal {goal_handle.request.target_pose}")
|
||||
target_pose = goal_handle.request.target_pose
|
||||
start_pose = self.current_pose.pose if self.current_pose else None
|
||||
|
||||
if start_pose is None:
|
||||
self.get_logger().error("Current pose is not available")
|
||||
goal_handle.abort()
|
||||
return MoveitSendPose.Result()
|
||||
|
||||
trajectory = self.generate_trajectory(start_pose, target_pose)
|
||||
for point in trajectory:
|
||||
tp = PoseStamped()
|
||||
tp.pose = point
|
||||
tp.header.stamp = self.get_clock().now().to_msg()
|
||||
tp.header.frame_id = self.base_link
|
||||
self._pub.publish(tp)
|
||||
rclpy.spin_once(self, timeout_sec=0.1)
|
||||
|
||||
goal_handle.succeed()
|
||||
|
||||
result = MoveitSendPose.Result()
|
||||
result.success = True
|
||||
return result
|
||||
|
||||
def generate_trajectory(self, start_pose, target_pose):
|
||||
start_position = np.array([
|
||||
start_pose.position.x,
|
||||
start_pose.position.y,
|
||||
start_pose.position.z
|
||||
])
|
||||
target_position = np.array([
|
||||
target_pose.position.x,
|
||||
target_pose.position.y,
|
||||
target_pose.position.z
|
||||
])
|
||||
|
||||
start_orientation = R.from_quat([
|
||||
start_pose.orientation.x,
|
||||
start_pose.orientation.y,
|
||||
start_pose.orientation.z,
|
||||
start_pose.orientation.w
|
||||
])
|
||||
target_orientation = R.from_quat([
|
||||
target_pose.orientation.x,
|
||||
target_pose.orientation.y,
|
||||
target_pose.orientation.z,
|
||||
target_pose.orientation.w
|
||||
])
|
||||
|
||||
distance = np.linalg.norm(target_position - start_position)
|
||||
max_speed = self.max_speed
|
||||
max_acceleration = self.max_acceleration
|
||||
|
||||
t_acc = max_speed / max_acceleration
|
||||
d_acc = 0.5 * max_acceleration * t_acc**2
|
||||
|
||||
if distance < 2 * d_acc:
|
||||
t_acc = math.sqrt(distance / max_acceleration)
|
||||
t_flat = 0
|
||||
else:
|
||||
t_flat = (distance - 2 * d_acc) / max_speed
|
||||
|
||||
total_time = 2 * t_acc + t_flat
|
||||
num_points = int(total_time * 10)
|
||||
trajectory = []
|
||||
|
||||
times = np.linspace(0, total_time, num_points + 1)
|
||||
key_rots = R.from_quat([start_orientation.as_quat(), target_orientation.as_quat()])
|
||||
slerp = Slerp([0, total_time], key_rots)
|
||||
|
||||
for t in times:
|
||||
if t < t_acc:
|
||||
fraction = 0.5 * max_acceleration * t**2 / distance
|
||||
elif t < t_acc + t_flat:
|
||||
fraction = (d_acc + max_speed * (t - t_acc)) / distance
|
||||
else:
|
||||
t_decel = t - t_acc - t_flat
|
||||
fraction = (d_acc + max_speed * t_flat + 0.5 * max_acceleration * t_decel**2) / distance
|
||||
|
||||
intermediate_position = start_position + fraction * (target_position - start_position)
|
||||
intermediate_orientation = slerp([t])[0]
|
||||
|
||||
intermediate_pose = Pose()
|
||||
intermediate_pose.position.x = intermediate_position[0]
|
||||
intermediate_pose.position.y = intermediate_position[1]
|
||||
intermediate_pose.position.z = intermediate_position[2]
|
||||
intermediate_orientation_quat = intermediate_orientation.as_quat()
|
||||
intermediate_pose.orientation.x = intermediate_orientation_quat[0]
|
||||
intermediate_pose.orientation.y = intermediate_orientation_quat[1]
|
||||
intermediate_pose.orientation.z = intermediate_orientation_quat[2]
|
||||
intermediate_pose.orientation.w = intermediate_orientation_quat[3]
|
||||
trajectory.append(intermediate_pose)
|
||||
|
||||
return trajectory
|
||||
|
||||
class PoseSubscriber(Node):
|
||||
def __init__(self, parent: CartesianMoveToPose, robot_name: str):
|
||||
super().__init__("pose_subscriber")
|
||||
self.parent = parent
|
||||
self._sub = self.create_subscription(PoseStamped,
|
||||
robot_name + TOPIC_CURRENT_POSE,
|
||||
self.parent.on_pose_callback, 1,
|
||||
callback_group=self.parent._callback_group)
|
||||
self.get_logger().info("PoseSubscriber node initialized")
|
||||
|
||||
def main():
|
||||
rclpy.init()
|
||||
|
||||
node = MoveToPoseCartesianSkill()
|
||||
cartesian_mtp_node = CartesianMoveToPose(node.server_name)
|
||||
pose_subscriber = PoseSubscriber(parent=cartesian_mtp_node,
|
||||
robot_name=cartesian_mtp_node.robot_name)
|
||||
|
||||
executor = rclpy.executors.MultiThreadedExecutor()
|
||||
executor.add_node(cartesian_mtp_node)
|
||||
executor.add_node(pose_subscriber)
|
||||
executor.add_node(node)
|
||||
try:
|
||||
executor.spin()
|
||||
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
|
||||
pose_subscriber.destroy_node()
|
||||
cartesian_mtp_node.destroy_node()
|
||||
node.destroy_node()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Add table
Add a link
Reference in a new issue