refactor: get poses from TF

This commit is contained in:
Ilya Uraev 2025-04-07 12:43:31 +03:00
parent 4849bb8c09
commit 21baf934a9
6 changed files with 212 additions and 181 deletions

View file

@ -5,6 +5,7 @@ find_package(rbs_skill_interfaces REQUIRED)
find_package(rbs_utils_interfaces REQUIRED) find_package(rbs_utils_interfaces REQUIRED)
find_package(geometry_msgs REQUIRED) find_package(geometry_msgs REQUIRED)
find_package(std_srvs REQUIRED) find_package(std_srvs REQUIRED)
find_package(rbs_mill_interfaces REQUIRED)
# Behaviortree interfaces # Behaviortree interfaces
set(dependencies set(dependencies
@ -14,13 +15,17 @@ set(dependencies
behaviortree_ros2 behaviortree_ros2
rbs_utils_interfaces rbs_utils_interfaces
std_srvs std_srvs
rbs_mill_interfaces
) )
add_library(vacuum_gripper_toggle SHARED plugins/vacuum_gripper_toggle.cpp) add_library(vacuum_gripper_toggle SHARED plugins/vacuum_gripper_toggle.cpp)
list(APPEND plugin_libs vacuum_gripper_toggle) list(APPEND plugin_libs vacuum_gripper_toggle)
add_library(get_grasp_place_pose SHARED plugins/get_grasp_place_pose.cpp) add_library(get_grasp_poses SHARED plugins/get_grasp_pose.cpp)
list(APPEND plugin_libs get_grasp_place_pose) list(APPEND plugin_libs get_grasp_poses)
add_library(get_place_poses SHARED plugins/get_place_pose.cpp)
list(APPEND plugin_libs get_place_poses)
add_library(get_named_pose SHARED plugins/get_named_pose.cpp) add_library(get_named_pose SHARED plugins/get_named_pose.cpp)
list(APPEND plugin_libs get_named_pose) list(APPEND plugin_libs get_named_pose)

View file

@ -2,7 +2,10 @@
#include <filesystem> #include <filesystem>
#include <rclcpp/logger.hpp> #include <rclcpp/logger.hpp>
#include <rclcpp/node.hpp> #include <rclcpp/node.hpp>
#include <string>
#include <tf2/LinearMath/Quaternion.hpp> #include <tf2/LinearMath/Quaternion.hpp>
#include <utility>
#include <vector>
#include <yaml-cpp/node/node.h> #include <yaml-cpp/node/node.h>
#include <yaml-cpp/node/parse.h> #include <yaml-cpp/node/parse.h>
#include <yaml-cpp/yaml.h> #include <yaml-cpp/yaml.h>
@ -25,12 +28,16 @@ public:
inline std::vector<std::string> getModelNames() { inline std::vector<std::string> getModelNames() {
return _model_names; return _model_names;
} }
inline std::vector<std::string> getModelParents() {
return _model_parents;
}
private: private:
std::vector<std::string> _model_names; std::vector<std::string> _model_names;
std::vector<std::string> _model_mesh_filenames; std::vector<std::string> _model_mesh_filenames;
std::vector<geometry_msgs::msg::Pose> _model_poses; std::vector<geometry_msgs::msg::Pose> _model_poses;
std::vector<std::string> _model_mesh_filepaths; std::vector<std::string> _model_mesh_filepaths;
std::vector<std::string> _model_parents;
bool isQuat(const YAML::Node &node) { bool isQuat(const YAML::Node &node) {
if (node["x"] && node["y"] && node["z"] && node["w"]) { if (node["x"] && node["y"] && node["z"] && node["w"]) {
@ -81,11 +88,16 @@ private:
} }
std::string model_name = model["name"].as<std::string>(); std::string model_name = model["name"].as<std::string>();
std::string model_parent = "world";
if (model["relative_to"].IsDefined()) {
model_parent = model["relative_to"].as<std::string>();
}
std::string model_mesh_filename = std::string model_mesh_filename =
model["mesh_filename"].as<std::string>(); model["mesh_filename"].as<std::string>();
_model_names.push_back(std::move(model_name)); _model_names.push_back(std::move(model_name));
_model_mesh_filenames.push_back(std::move(model_mesh_filename)); _model_mesh_filenames.push_back(std::move(model_mesh_filename));
_model_parents.push_back(std::move(model_parent));
if (!model_dir.empty()) { if (!model_dir.empty()) {
std::filesystem::path mesh_path = std::filesystem::path(model_dir) / std::filesystem::path mesh_path = std::filesystem::path(model_dir) /

View file

@ -1,7 +1,8 @@
from launch.event_handlers import OnExecutionComplete, OnProcessExit, OnProcessStart
from launch_param_builder import get_package_share_directory from launch_param_builder import get_package_share_directory
from launch_ros.actions import Node from launch_ros.actions import Node
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction from launch.actions import DeclareLaunchArgument, OpaqueFunction, RegisterEventHandler, TimerAction
from launch.substitutions import ( from launch.substitutions import (
LaunchConfiguration, LaunchConfiguration,
) )
@ -117,7 +118,6 @@ def launch_setup(context, *args, **kwargs):
( (
moveit_config.planning_pipelines["ompl"] moveit_config.planning_pipelines["ompl"]
.setdefault("arm", {}) .setdefault("arm", {})
# .setdefault("planner_configs", {})
.update( .update(
{"planner_configs":planners_ids} {"planner_configs":planners_ids}
) )
@ -138,6 +138,33 @@ def launch_setup(context, *args, **kwargs):
parameters=[moveit_config.to_dict(), {"use_sim_time": True}, {"places_config": places_config_filepath}], parameters=[moveit_config.to_dict(), {"use_sim_time": True}, {"places_config": places_config_filepath}],
) )
ref_frames_config_filepath = os.path.join(
get_package_share_directory("rbs_mill_assist"),
"world/config",
"ref_frames.yaml",
)
places_frame_publisher = Node(
package="rbs_mill_assist",
executable="places_frame_publisher",
parameters=[
{"use_sim_time": use_sim_time},
{"places_config_filepath": places_config_filepath},
{"ref_frames_config_filepath": ref_frames_config_filepath},
],
)
handler = RegisterEventHandler(
OnProcessStart(
target_action=places_frame_publisher,
on_start=[planning_scene_publisher],
)
)
rviz_node = Node( rviz_node = Node(
package="rviz2", package="rviz2",
executable="rviz2", executable="rviz2",
@ -155,10 +182,18 @@ def launch_setup(context, *args, **kwargs):
], ],
) )
delay_planning_scene = TimerAction(
actions=[planning_scene_publisher],
period=5.0
)
nodes_to_start = [ nodes_to_start = [
move_group_node, move_group_node,
planning_scene_publisher,
rviz_node, rviz_node,
places_frame_publisher,
planning_scene_publisher,
# delay_planning_scene
# handler
] ]
return nodes_to_start return nodes_to_start

View file

@ -1,5 +1,6 @@
import os import os
from launch.conditions import IfCondition, UnlessCondition
import xacro import xacro
from ament_index_python.packages import get_package_share_directory from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription from launch import LaunchDescription
@ -175,7 +176,8 @@ def launch_setup(context, *args, **kwargs):
"-x", "0.0", "-x", "0.0",
"-z", "0.0", "-z", "0.0",
"-y", "0.0", "-y", "0.0",
"-topic", "/robot_description", # "-topic", "/robot_description",
"-string", robot_description_content
], ],
output="screen", output="screen",
parameters=[{"use_sim_time": True}], parameters=[{"use_sim_time": True}],
@ -191,42 +193,19 @@ def launch_setup(context, *args, **kwargs):
grasping_service = Node(package="rbs_mill_assist", executable="grasping_service.py") grasping_service = Node(package="rbs_mill_assist", executable="grasping_service.py")
get_named_pose_service = Node( get_named_pose_service = Node(
package="rbs_mill_assist", executable="get_key_pose_frame.py" package="rbs_mill_assist",
executable="get_key_pose_frame.py",
parameters=[{"use_sim_time": use_sim_time}],
) )
# get_workspace = Node(
# package="rbs_mill_assist",
# executable="get_workspace",
# parameters=[
# {
# "urdf_path": os.path.join(
# get_package_share_directory("rbs_mill_assist"),
# "urdf",
# "current.urdf",
# ),
# "ee_link": ee_link_name,
# "use_sim_time": True,
# "robot_position": [0.0, 0.0, 0.0],
# }
# ],
# )
#
# publish_ee_pose = Node(
# package="rbs_mill_assist",
# executable="publish_ee_pose_node",
# parameters=[
# {"use_sim_time": use_sim_time},
# {"base_link": base_link_name},
# {"ee_link": ee_link_name},
# ],
# )
places_config_filepath = os.path.join( places_config_filepath = os.path.join(
get_package_share_directory("rbs_mill_assist"), "world/config", "places.yaml" get_package_share_directory("rbs_mill_assist"), "world/config", "places.yaml"
) )
ref_frames_config_filepath = os.path.join( ref_frames_config_filepath = os.path.join(
get_package_share_directory("rbs_mill_assist"), "world/config", "slots.yaml" get_package_share_directory("rbs_mill_assist"),
"world/config",
"ref_frames.yaml",
) )
places_frame_publisher = Node( places_frame_publisher = Node(
package="rbs_mill_assist", package="rbs_mill_assist",
@ -234,8 +213,9 @@ def launch_setup(context, *args, **kwargs):
parameters=[ parameters=[
{"use_sim_time": use_sim_time}, {"use_sim_time": use_sim_time},
{"places_config_filepath": places_config_filepath}, {"places_config_filepath": places_config_filepath},
{"ref_frames_config_filepath": ref_frames_config_filepath} {"ref_frames_config_filepath": ref_frames_config_filepath},
], ],
condition=UnlessCondition(use_moveit)
) )
nodes_to_start = [ nodes_to_start = [

View file

@ -1,57 +1,64 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os # import os
from typing import Dict # from typing import Dict
import rclpy import rclpy
import yaml # import tf2_py
from ament_index_python.packages import get_package_share_directory # import yaml
# from ament_index_python.packages import get_package_share_directory
from geometry_msgs.msg import Pose from geometry_msgs.msg import Pose
from rbs_utils_interfaces.srv import GetNamedPose from rbs_utils_interfaces.srv import GetRelativeNamedPose
from rclpy.node import Node from rclpy.node import Node
from rclpy.service import Service from rclpy.service import Service
from tf2_py import TransformException
from tf2_ros import TransformStamped
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
class GetNamedPoseService(Node): class GetNamedPoseService(Node):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__("get_named_pose_service") super().__init__("get_tf_frame_pose_node")
self.srv: Service = self.create_service( self.srv: Service = self.create_service(
GetNamedPose, "get_named_pose", self.get_named_pose GetRelativeNamedPose, "get_tf_frame_pose", self.get_named_pose
) )
yaml_file_path: str = os.path.join(
get_package_share_directory("rbs_mill_assist"), "config", "key_poses.yaml"
)
with open(yaml_file_path, "r", encoding="utf-8") as file:
self.key_poses: Dict = yaml.safe_load(file)
def create_pose(self, pose_data: Dict) -> Pose: self.tf_buffer = Buffer()
""" self.tf_listner = TransformListener(self.tf_buffer, self)
Helper function to create a Pose from the given pose data.
"""
pose = Pose()
if pose_data:
pose.position.x = pose_data.get("position", {}).get("x", 0.0)
pose.position.y = pose_data.get("position", {}).get("y", 0.0)
pose.position.z = pose_data.get("position", {}).get("z", 0.0)
pose.orientation.x = pose_data.get("orientation", {}).get("x", 0.0)
pose.orientation.y = pose_data.get("orientation", {}).get("y", 0.0)
pose.orientation.z = pose_data.get("orientation", {}).get("z", 0.0)
pose.orientation.w = pose_data.get("orientation", {}).get("w", 1.0)
return pose
def get_named_pose( def get_named_pose(
self, request: GetNamedPose.Request, response: GetNamedPose.Response self,
) -> GetNamedPose.Response: request: GetRelativeNamedPose.Request,
response: GetRelativeNamedPose.Response,
) -> GetRelativeNamedPose.Response:
t = TransformStamped()
key_pose = self.key_poses.get(request.pose_name, None) try:
if key_pose is None: t = self.tf_buffer.lookup_transform(
request.relative_to, request.pose_name, self.get_clock().now()
)
except TransformException as ex:
self.get_logger().info(
f"Could not transform from {request.pose_name} to {request.relative_to}: {ex}"
)
response.ok = False response.ok = False
return response return response
response.named_pose.name = request.pose_name p = Pose()
response.named_pose.pose = self.create_pose(key_pose) p.position.x = t.transform.translation.x
response.ok = True p.position.y = t.transform.translation.y
p.position.z = t.transform.translation.z
p.orientation.x = t.transform.rotation.x
p.orientation.y = t.transform.rotation.y
p.orientation.z = t.transform.rotation.z
p.orientation.w = t.transform.rotation.w
response.named_pose.pose = p
response.named_pose.relative_to = request.relative_to
response.named_pose.name = request.pose_name
response.ok = True
return response return response

View file

@ -1,140 +1,132 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
from typing import Dict
import rclpy import rclpy
import yaml import math
from ament_index_python.packages import get_package_share_directory from tf2_ros import Buffer, TransformListener, TransformException, TransformStamped
from geometry_msgs.msg import Pose from geometry_msgs.msg import Pose, Quaternion, Point
from rbs_skill_interfaces.srv import GetPickPlacePoses from rbs_mill_interfaces.srv import GetGraspingPose
from rclpy.node import Node from rclpy.node import Node
from rclpy.service import Service from rclpy.service import Service
from scipy.spatial.transform import Rotation as R
class GetGraspPickPoses(Node): class GetGraspPickPoses(Node):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__("get_grasp_pick_poses") super().__init__("get_grasp_pick_poses")
self.srv: Service = self.create_service( self.srv: Service = self.create_service(
GetPickPlacePoses, "get_pick_place_poses", self.get_grasp_pick_poses GetGraspingPose, "get_grasping_pose", self.get_grasp_pick_poses
) )
yaml_file_path: str = os.path.join(
get_package_share_directory("rbs_mill_assist"), "config", "grasping.yaml"
)
with open(yaml_file_path, "r", encoding="utf-8") as file:
self.grasping: Dict = yaml.safe_load(file)
def create_pose(self, pose_data: Dict) -> Pose: self.tf_buffer = Buffer()
""" self.tf_listener = TransformListener(self.tf_buffer, self)
Helper function to create a Pose from the given pose data.
""" # Precompute the rotation correction quaternion
pose = Pose() self.rotation_correction = self.euler_to_quat(0, math.pi, math.pi/2)
if pose_data:
pose.position.x = pose_data.get("position", {}).get("x", 0.0)
pose.position.y = pose_data.get("position", {}).get("y", 0.0)
pose.position.z = pose_data.get("position", {}).get("z", 0.0)
pose.orientation.x = pose_data.get("orientation", {}).get("x", 0.0)
pose.orientation.y = pose_data.get("orientation", {}).get("y", 0.0)
pose.orientation.z = pose_data.get("orientation", {}).get("z", 0.0)
pose.orientation.w = pose_data.get("orientation", {}).get("w", 1.0)
return pose
def get_grasp_pick_poses( def get_grasp_pick_poses(
self, request: GetPickPlacePoses.Request, response: GetPickPlacePoses.Response self,
) -> GetPickPlacePoses.Response: request: GetGraspingPose.Request,
if request.action_name == "to_graver.pick": response: GetGraspingPose.Response,
to_graver = self.grasping.get("to_graver", None) ) -> GetGraspingPose.Response:
pregrasp_pose = to_graver.get("pregrasp_pose", None) if request.action_type not in ("pick", "place"):
grasp_pose = to_graver.get("grasp_pose", None) self.get_logger().error(f"Unsupported action type: {request.action_type}")
postgrasp_pose = to_graver.get("postgrasp_pose", None) response.ok = False
return response
if None in [pregrasp_pose, grasp_pose, postgrasp_pose]: # Common logic for pick/place
response.ok = False tf = self.get_tf_pose(request.pose_name, request.relative_to)
self.get_logger().error( #tf_base = self.get_tf_pose(request.pose_name, "base_link")
"Missing one or more of the pregrasp, grasp, or postgrasp poses." if tf is None:
) response.ok = False
return response return response
# Create Pose messages # Apply transformations
response.grasp = [ main_pose, pre_pose, post_pose = self.process_tf_and_generate_poses(tf)
self.create_pose(pregrasp_pose),
self.create_pose(grasp_pose),
self.create_pose(postgrasp_pose),
]
elif request.action_name == "to_graver.place":
to_graver = self.grasping.get("to_graver", None)
preplace_pose = to_graver.get("preplace_pose", None)
place_pose = to_graver.get("place_pose", None)
postplace_pose = to_graver.get("postplace_pose", None)
if None in [preplace_pose, place_pose, postplace_pose]: if request.action_type == "pick":
response.ok = False response.grasp_poses.pose = main_pose
self.get_logger().error( response.grasp_poses.pre_pose = pre_pose
"Missing one or more of the preplace, place, or postplace poses." response.grasp_poses.post_pose = post_pose
) else:
return response response.place_poses.pose = main_pose
response.place_poses.pre_pose = pre_pose
# Create Pose messages for placing response.place_poses.post_pose = post_pose
response.place = [
self.create_pose(preplace_pose),
self.create_pose(place_pose),
self.create_pose(postplace_pose),
]
elif request.action_name == "from_graver.pick":
from_graver = self.grasping.get("from_graver", None)
pregrasp_pose = from_graver.get("pregrasp_pose", None)
grasp_pose = from_graver.get("grasp_pose", None)
postgrasp_pose = from_graver.get("postgrasp_pose", None)
if None in [pregrasp_pose, grasp_pose, postgrasp_pose]:
response.ok = False
self.get_logger().error(
"Missing one or more of the pregrasp, grasp, or postgrasp poses."
)
return response
# Create Pose messages
response.grasp = [
self.create_pose(pregrasp_pose),
self.create_pose(grasp_pose),
self.create_pose(postgrasp_pose),
]
elif request.action_name == "from_graver.place":
from_graver = self.grasping.get("from_graver", None)
preplace_pose = from_graver.get("preplace_pose", None)
place_pose = from_graver.get("place_pose", None)
postplace_pose = from_graver.get("postplace_pose", None)
if None in [preplace_pose, place_pose, postplace_pose]:
response.ok = False
self.get_logger().error(
"Missing one or more of the preplace, place, or postplace poses."
)
return response
# Create Pose messages for placing
response.place = [
self.create_pose(preplace_pose),
self.create_pose(place_pose),
self.create_pose(postplace_pose),
]
response.ok = True response.ok = True
self.get_logger().info(f"Handled action: {request.action_name}")
return response return response
def process_tf_and_generate_poses(self, tf: TransformStamped) -> tuple[Pose, Pose, Pose]:
"""Processes TF and generates main poses."""
# Apply rotation correction
tf.transform.rotation = self.multiply_quaternions(
tf.transform.rotation,
self.rotation_correction
)
pose = self.get_pose_from_tf(tf)
return pose, self.add_offset(pose, z=0.1), self.add_offset(pose, z=0.1)
def multiply_quaternions(self, q1: Quaternion, q2: Quaternion) -> Quaternion:
"""Multiplies two quaternions using scipy."""
r1 = R.from_quat([q1.x, q1.y, q1.z, q1.w])
r2 = R.from_quat([q2.x, q2.y, q2.z, q2.w])
result = (r1 * r2).as_quat() # type: ignore
return Quaternion(x=result[0], y=result[1], z=result[2], w=result[3])
def add_offset(self, pose: Pose, x: float = 0.0, y: float = 0.0, z: float = 0.0) -> Pose:
"""Creates a new pose with offset."""
return Pose(
position=Point(
x=pose.position.x + x,
y=pose.position.y + y,
z=pose.position.z + z
),
orientation=pose.orientation
)
def euler_to_quat(self, roll: float, pitch: float, yaw: float) -> Quaternion:
"""Converts Euler angles to quaternion."""
q = R.from_euler("xyz", [roll, pitch, yaw], degrees=False).as_quat()
return Quaternion(x=q[0], y=q[1], z=q[2], w=q[3])
def get_pose_from_tf(self, tf: TransformStamped) -> Pose:
"""Creates Pose from TransformStamped."""
return Pose(
position=Point(
x=tf.transform.translation.x,
y=tf.transform.translation.y,
z=tf.transform.translation.z
),
orientation=Quaternion(
x=tf.transform.rotation.x,
y=tf.transform.rotation.y,
z=tf.transform.rotation.z,
w=tf.transform.rotation.w
)
)
def get_tf_pose(self, source_frame: str, target_frame: str) -> TransformStamped | None:
"""Gets coordinate transform between frames."""
try:
return self.tf_buffer.lookup_transform(
target_frame,
source_frame,
rclpy.time.Time()
)
except TransformException as ex:
self.get_logger().error(
f"Transform error: {target_frame} -> {source_frame}: {ex}"
)
return None
def main(): def main():
rclpy.init() rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor() node = GetGraspPickPoses()
# executor = rclpy.executors.MultiThreadedExecutor() # can't be used
i_node = GetGraspPickPoses()
executor.add_node(i_node)
try: try:
executor.spin() rclpy.spin(node)
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): except KeyboardInterrupt:
i_node.destroy_node() pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__": if __name__ == "__main__":
main() main()