79 lines
2.4 KiB
Python
Executable file
79 lines
2.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
# import os
|
|
# from typing import Dict
|
|
|
|
import rclpy
|
|
# import tf2_py
|
|
# import yaml
|
|
# from ament_index_python.packages import get_package_share_directory
|
|
from geometry_msgs.msg import Pose
|
|
from rbs_utils_interfaces.srv import GetRelativeNamedPose
|
|
from rclpy.node import Node
|
|
from rclpy.service import Service
|
|
from tf2_py import TransformException
|
|
from tf2_ros import TransformStamped
|
|
from tf2_ros.buffer import Buffer
|
|
from tf2_ros.transform_listener import TransformListener
|
|
|
|
|
|
class GetNamedPoseService(Node):
|
|
def __init__(self) -> None:
|
|
super().__init__("get_tf_frame_pose_node")
|
|
self.srv: Service = self.create_service(
|
|
GetRelativeNamedPose, "get_tf_frame_pose", self.get_named_pose
|
|
)
|
|
|
|
self.tf_buffer = Buffer()
|
|
self.tf_listner = TransformListener(self.tf_buffer, self)
|
|
|
|
def get_named_pose(
|
|
self,
|
|
request: GetRelativeNamedPose.Request,
|
|
response: GetRelativeNamedPose.Response,
|
|
) -> GetRelativeNamedPose.Response:
|
|
t = TransformStamped()
|
|
|
|
try:
|
|
t = self.tf_buffer.lookup_transform(
|
|
request.relative_to, request.pose_name, self.get_clock().now()
|
|
)
|
|
except TransformException as ex:
|
|
self.get_logger().info(
|
|
f"Could not transform from {request.pose_name} to {request.relative_to}: {ex}"
|
|
)
|
|
response.ok = False
|
|
return response
|
|
|
|
p = Pose()
|
|
p.position.x = t.transform.translation.x
|
|
p.position.y = t.transform.translation.y
|
|
p.position.z = t.transform.translation.z
|
|
|
|
p.orientation.x = t.transform.rotation.x
|
|
p.orientation.y = t.transform.rotation.y
|
|
p.orientation.z = t.transform.rotation.z
|
|
p.orientation.w = t.transform.rotation.w
|
|
|
|
response.named_pose.pose = p
|
|
response.named_pose.relative_to = request.relative_to
|
|
response.named_pose.name = request.pose_name
|
|
response.ok = True
|
|
return response
|
|
|
|
|
|
def main():
|
|
rclpy.init()
|
|
executor = rclpy.executors.SingleThreadedExecutor()
|
|
# executor = rclpy.executors.MultiThreadedExecutor() # can't be used
|
|
i_node = GetNamedPoseService()
|
|
executor.add_node(i_node)
|
|
try:
|
|
executor.spin()
|
|
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
|
|
i_node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|