197 lines
7.3 KiB
Python
Executable file
197 lines
7.3 KiB
Python
Executable file
#!/usr/bin/python
|
|
import rclpy
|
|
from rclpy.action import ActionServer
|
|
from rclpy.node import Node
|
|
import numpy as np
|
|
from rclpy.callback_groups import ReentrantCallbackGroup
|
|
from rclpy.executors import MultiThreadedExecutor
|
|
import math
|
|
from geometry_msgs.msg import Pose, PoseStamped
|
|
from rbs_skill_interfaces.action import MoveitSendPose
|
|
from scipy.spatial.transform import Rotation as R
|
|
from scipy.spatial.transform import Slerp
|
|
|
|
|
|
class CartesianMoveToPose(Node):
|
|
|
|
def __init__(self):
|
|
super().__init__('cartesian_move_to_pose') # pyright: ignore[]
|
|
|
|
self.declare_parameter("base_link", "base_link")
|
|
self.declare_parameter("robot_name", "")
|
|
|
|
self._callback_group = ReentrantCallbackGroup()
|
|
self._action_server = ActionServer(
|
|
self,
|
|
MoveitSendPose,
|
|
'cartesian_move_to_pose',
|
|
self.execute_callback, callback_group=self._callback_group)
|
|
# for multirobot setup where each robot name is a namespace
|
|
self.robot_name: str = self.get_parameter("robot_name").get_parameter_value().string_value
|
|
self.robot_name = self.robot_name.lstrip('/').rstrip('/')
|
|
self.robot_name = f"/{self.robot_name}" if self.robot_name else ""
|
|
self._pub = self.create_publisher(PoseStamped,
|
|
f"{self.robot_name}/cartesian_motion_controller/target_frame", 1,
|
|
callback_group=self._callback_group)
|
|
self.current_pose = None
|
|
self.goal_tolerance = 0.05
|
|
self.max_speed = 0.1
|
|
self.max_acceleration = 0.05
|
|
self.base_link = self.get_parameter("base_link").get_parameter_value().string_value
|
|
|
|
|
|
def on_pose_callback(self, msg: PoseStamped):
|
|
if isinstance(msg, PoseStamped):
|
|
self.current_pose = msg
|
|
|
|
def execute_callback(self, goal_handle):
|
|
self.get_logger().debug(f"Executing goal {goal_handle.request.target_pose}")
|
|
target_pose = goal_handle.request.target_pose
|
|
start_pose = self.current_pose.pose if self.current_pose else None
|
|
result = MoveitSendPose.Result()
|
|
|
|
if start_pose is None:
|
|
self.get_logger().error("Current pose is not available")
|
|
goal_handle.abort()
|
|
result.success = False
|
|
return result
|
|
|
|
trajectory = self.generate_trajectory(start_pose, target_pose)
|
|
for point in trajectory:
|
|
tp = PoseStamped()
|
|
tp.pose = point
|
|
tp.header.stamp = self.get_clock().now().to_msg()
|
|
tp.header.frame_id = self.base_link
|
|
self._pub.publish(tp)
|
|
rclpy.spin_once(self, timeout_sec=0.1)
|
|
|
|
goal_handle.succeed()
|
|
result.success = True
|
|
return result
|
|
|
|
def generate_trajectory(self, start_pose, target_pose):
|
|
start_position = np.array([
|
|
start_pose.position.x,
|
|
start_pose.position.y,
|
|
start_pose.position.z
|
|
])
|
|
target_position = np.array([
|
|
target_pose.position.x,
|
|
target_pose.position.y,
|
|
target_pose.position.z
|
|
])
|
|
|
|
start_orientation = R.from_quat([
|
|
start_pose.orientation.x,
|
|
start_pose.orientation.y,
|
|
start_pose.orientation.z,
|
|
start_pose.orientation.w
|
|
])
|
|
target_orientation = R.from_quat([
|
|
target_pose.orientation.x,
|
|
target_pose.orientation.y,
|
|
target_pose.orientation.z,
|
|
target_pose.orientation.w
|
|
])
|
|
|
|
distance = np.linalg.norm(target_position - start_position)
|
|
max_speed = self.max_speed
|
|
max_acceleration = self.max_acceleration
|
|
|
|
t_acc = max_speed / max_acceleration
|
|
d_acc = 0.5 * max_acceleration * t_acc**2
|
|
|
|
if distance < 2 * d_acc:
|
|
t_acc = math.sqrt(distance / max_acceleration)
|
|
t_flat = 0
|
|
else:
|
|
t_flat = (distance - 2 * d_acc) / max_speed
|
|
|
|
total_time = 2 * t_acc + t_flat
|
|
num_points = int(total_time * 10)
|
|
trajectory = []
|
|
|
|
times = np.linspace(0, total_time, num_points + 1)
|
|
key_rots = R.from_quat([start_orientation.as_quat(), target_orientation.as_quat()])
|
|
slerp = Slerp([0, total_time], key_rots)
|
|
|
|
for t in times:
|
|
if t < t_acc:
|
|
fraction = 0.5 * max_acceleration * t**2 / distance
|
|
elif t < t_acc + t_flat:
|
|
fraction = (d_acc + max_speed * (t - t_acc)) / distance
|
|
else:
|
|
t_decel = t - t_acc - t_flat
|
|
fraction = (d_acc + max_speed * t_flat + 0.5 * max_acceleration * t_decel**2) / distance
|
|
|
|
intermediate_position = start_position + fraction * (target_position - start_position)
|
|
intermediate_orientation = slerp([t])[0]
|
|
|
|
intermediate_pose = Pose()
|
|
intermediate_pose.position.x = intermediate_position[0]
|
|
intermediate_pose.position.y = intermediate_position[1]
|
|
intermediate_pose.position.z = intermediate_position[2]
|
|
intermediate_orientation_quat = intermediate_orientation.as_quat()
|
|
intermediate_pose.orientation.x = intermediate_orientation_quat[0]
|
|
intermediate_pose.orientation.y = intermediate_orientation_quat[1]
|
|
intermediate_pose.orientation.z = intermediate_orientation_quat[2]
|
|
intermediate_pose.orientation.w = intermediate_orientation_quat[3]
|
|
trajectory.append(intermediate_pose)
|
|
|
|
return trajectory
|
|
|
|
def get_distance_to_target(self, target_pose: Pose):
|
|
if self.current_pose is None or self.current_pose.pose is None:
|
|
self.get_logger().warn("Current pose is not available")
|
|
return None
|
|
|
|
current_pose = self.current_pose.pose
|
|
|
|
current_position = np.array([
|
|
current_pose.position.x,
|
|
current_pose.position.y,
|
|
current_pose.position.z
|
|
])
|
|
|
|
target_position = np.array([
|
|
target_pose.position.x,
|
|
target_pose.position.y,
|
|
target_pose.position.z
|
|
])
|
|
|
|
if np.any(np.isnan(current_position)) or np.any(np.isnan(target_position)):
|
|
self.get_logger().error("Invalid coordinates")
|
|
return None
|
|
|
|
distance = np.linalg.norm(current_position - target_position)
|
|
|
|
return distance
|
|
|
|
|
|
class PoseSubscriber(Node):
|
|
def __init__(self, parent: CartesianMoveToPose, robot_name: str):
|
|
super().__init__('pose_subscriber') # pyright: ignore[]
|
|
self.parent = parent
|
|
self._sub = self.create_subscription(PoseStamped,
|
|
f"{robot_name}/cartesian_motion_controller/current_pose",
|
|
self.parent.on_pose_callback, 1,
|
|
callback_group=self.parent._callback_group)
|
|
self.get_logger().info('PoseSubscriber node initialized')
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
|
|
cartesian_move_to_pose = CartesianMoveToPose()
|
|
pose_subscriber = PoseSubscriber(parent=cartesian_move_to_pose,
|
|
robot_name=cartesian_move_to_pose.robot_name)
|
|
|
|
executor = MultiThreadedExecutor()
|
|
executor.add_node(cartesian_move_to_pose)
|
|
executor.add_node(pose_subscriber)
|
|
|
|
executor.spin()
|
|
|
|
rclpy.shutdown()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|