101 lines
3.7 KiB
Python
Executable file
101 lines
3.7 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import os
|
|
from typing import Dict
|
|
|
|
import rclpy
|
|
import yaml
|
|
from ament_index_python.packages import get_package_share_directory
|
|
from geometry_msgs.msg import Pose
|
|
from rbs_skill_interfaces.srv import GetPickPlacePoses
|
|
from rclpy.node import Node
|
|
from rclpy.service import Service
|
|
|
|
|
|
class GetGraspPickPoses(Node):
|
|
def __init__(self) -> None:
|
|
super().__init__("get_grasp_pick_poses")
|
|
self.srv: Service = self.create_service(
|
|
GetPickPlacePoses, "get_pick_place_poses", self.get_grasp_pick_poses
|
|
)
|
|
yaml_file_path: str = os.path.join(
|
|
get_package_share_directory("rbs_mill_assist"), "config", "grasping.yaml"
|
|
)
|
|
with open(yaml_file_path, "r", encoding="utf-8") as file:
|
|
self.grasping: Dict = yaml.safe_load(file)
|
|
|
|
def create_pose(self, pose_data: Dict) -> Pose:
|
|
"""
|
|
Helper function to create a Pose from the given pose data.
|
|
"""
|
|
pose = Pose()
|
|
if pose_data:
|
|
pose.position.x = pose_data.get("position", {}).get("x", 0.0)
|
|
pose.position.y = pose_data.get("position", {}).get("y", 0.0)
|
|
pose.position.z = pose_data.get("position", {}).get("z", 0.0)
|
|
pose.orientation.x = pose_data.get("orientation", {}).get("x", 0.0)
|
|
pose.orientation.y = pose_data.get("orientation", {}).get("y", 0.0)
|
|
pose.orientation.z = pose_data.get("orientation", {}).get("z", 0.0)
|
|
pose.orientation.w = pose_data.get("orientation", {}).get("w", 1.0)
|
|
return pose
|
|
|
|
def get_grasp_pick_poses(
|
|
self, request: GetPickPlacePoses.Request, response: GetPickPlacePoses.Response
|
|
) -> GetPickPlacePoses.Response:
|
|
if request.action_name == "pick":
|
|
pregrasp_pose = self.grasping.get("pregrasp_pose", None)
|
|
grasp_pose = self.grasping.get("grasp_pose", None)
|
|
postgrasp_pose = self.grasping.get("postgrasp_pose", None)
|
|
|
|
if None in [pregrasp_pose, grasp_pose, postgrasp_pose]:
|
|
response.ok = False
|
|
self.get_logger().error(
|
|
"Missing one or more of the pregrasp, grasp, or postgrasp poses."
|
|
)
|
|
return response
|
|
|
|
# Create Pose messages
|
|
response.grasp = [
|
|
self.create_pose(pregrasp_pose),
|
|
self.create_pose(grasp_pose),
|
|
self.create_pose(postgrasp_pose),
|
|
]
|
|
|
|
elif request.action_name == "place":
|
|
preplace_pose = self.grasping.get("preplace_pose", None)
|
|
place_pose = self.grasping.get("place_pose", None)
|
|
postplace_pose = self.grasping.get("postplace_pose", None)
|
|
|
|
if None in [preplace_pose, place_pose, postplace_pose]:
|
|
response.ok = False
|
|
self.get_logger().error(
|
|
"Missing one or more of the preplace, place, or postplace poses."
|
|
)
|
|
return response
|
|
|
|
# Create Pose messages for placing
|
|
response.place = [
|
|
self.create_pose(preplace_pose),
|
|
self.create_pose(place_pose),
|
|
self.create_pose(postplace_pose),
|
|
]
|
|
|
|
response.ok = True
|
|
self.get_logger().info(f"Handled action: {request.action_name}")
|
|
return response
|
|
|
|
|
|
def main():
|
|
rclpy.init()
|
|
executor = rclpy.executors.SingleThreadedExecutor()
|
|
# executor = rclpy.executors.MultiThreadedExecutor() # can't be used
|
|
i_node = GetGraspPickPoses()
|
|
executor.add_node(i_node)
|
|
try:
|
|
executor.spin()
|
|
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
|
|
i_node.destroy_node()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|