#!/usr/bin/env python3 """ pose_estimation_lifecycle_node ROS 2 program for 6D Pose Estimation @shalenikol release 0.3 """ from typing import Optional import os import subprocess import shutil import json import tempfile from pathlib import Path import transforms3d as t3d import rclpy from rclpy.lifecycle import Node from rclpy.lifecycle import Publisher from rclpy.lifecycle import State from rclpy.lifecycle import TransitionCallbackReturn from rclpy.timer import Timer from ament_index_python.packages import get_package_share_directory from sensor_msgs.msg import Image, CameraInfo from geometry_msgs.msg import Pose, TransformStamped from tf2_ros import TransformBroadcaster, TransformException from tf2_ros.buffer import Buffer from tf2_ros.transform_listener import TransformListener from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images import cv2 # OpenCV library class PoseEstimator(Node): """Our lifecycle node.""" def _InitService(self): # Initialization service data p = os.path.join(get_package_share_directory("rbs_perception"), "config", "pose_estimation_config.json") # load config with open(p, "r") as f: y = json.load(f) for name, val in y.items(): if name == "nodeName": self.nodeName = val elif name == "cameraLink": self.camera_link = val elif name == "topicImage": self.topicImage = val elif name == "topicCameraInfo": self.topicCameraInfo = val elif name == "topicDepth": self.topicDepth = val elif name == "publishDelay": self.publishDelay = val elif name == "topicSrv": self.topicSrv = val elif name == "tf2_send_pose": self.tf2_send_pose = val elif name == "mesh_scale": self.mesh_scale = val def __init__(self, node_name, **kwargs): """Construct the node.""" self._count: int = 0 self._pub: Optional[Publisher] = None self._timer: Optional[Timer] = None self._image_cnt: int = 0 self._sub = None self._sub_info = None self._sub_depth = None self._is_camerainfo = False self._K = [[0., 0., 0.], [0., 0., 0.], [0., 0., 0.]] self._res = [0, 0] self._pose = [[1., 0., 0., 0.], [0., 0., 0.]] # pose in format "TWO" (megapose) self.tf2_send_pose = 0 self.mesh_scale = 1.0 self.quat_rotate_xyz = [0.5, 0.5, -0.5, -0.5] self.megapose_model = None self.darknet_path = "" self.nodeName = node_name self.camera_link = "outer_rgbd_camera" self.topicImage = "/outer_rgb_camera/image" self.topicCameraInfo = "/outer_rgb_camera/camera_info" self.topicDepth = "/outer_rgbd_camera/depth_image" self.publishDelay = 2.0 self.topicSrv = self.nodeName + "/detect6Dpose" self._InitService() self.camera_pose = Pose() #self.get_camera_pose() self.tmpdir = tempfile.gettempdir() self.mytemppath = Path(self.tmpdir) / "rbs_per" self.mytemppath.mkdir(exist_ok=True) # for other nodes kwargs["allow_undeclared_parameters"] = True kwargs["automatically_declare_parameters_from_overrides"] = True super().__init__(self.nodeName, **kwargs) self.declare_parameter("mesh_path", rclpy.Parameter.Type.STRING) self.tf_buffer = Buffer() self.tf_listener = TransformListener(self.tf_buffer, self) # Initialize the transform broadcaster self.tf_broadcaster = TransformBroadcaster(self) # Used to convert between ROS and OpenCV images self.br = CvBridge() self.objName = "" self.objMeshFile = "" self.objPath = "" def publish(self): """Publish a new message when enabled.""" self._count += 1 if self._pub is not None and self._pub.is_activated: # опубликуем результат оценки позы # publish pose estimation result q = self._pose[0] t = self._pose[1] # transform from View space in World space # rotate q_mir = t3d.quaternions.qmult(self.quat_rotate_xyz, q) # translate (xyz = +z-x-y) t_mir = [t[2], -t[0], -t[1]] cam_p = self.camera_pose.position #cam_Q = self.camera_pose.orientation # q_rot = [msgQ.w, msgQ.x, msgQ.y, msgQ.z] # q = t3d.quaternions.qinverse(q) #q[0] = -q[0] # q_mir = t3d.quaternions.qmult(q_rot,q) #q_mir = q p = Pose() p.position.x = t_mir[0] + cam_p.x p.position.y = t_mir[1] + cam_p.y p.position.z = t_mir[2] + cam_p.z p.orientation.w = q_mir[0] p.orientation.x = q_mir[1] p.orientation.y = q_mir[2] p.orientation.z = q_mir[3] self._pub.publish(p) if self.tf2_send_pose: self.tf_obj_pose(p.position,q_mir) #(self._pose) def tf_obj_pose(self, tr, q): """ Передача позиции объекта в tf2 """ t = TransformStamped() # assign pose to corresponding tf variables t.header.stamp = self.get_clock().now().to_msg() t.header.frame_id = 'world' t.child_frame_id = self.objName # coordinates #tr = pose[1] t.transform.translation.x = tr.x #[0] t.transform.translation.y = tr.y #[1] t.transform.translation.z = tr.z #[2] # rotation #q = pose[0] t.transform.rotation.w = q[0] t.transform.rotation.x = q[1] t.transform.rotation.y = q[2] t.transform.rotation.z = q[3] # Send the transformation self.tf_broadcaster.sendTransform(t) def on_configure(self, state: State) -> TransitionCallbackReturn: """ Configure the node, after a configuring transition is requested. :return: The state machine either invokes a transition to the "inactive" state or stays in "unconfigured" depending on the return value. TransitionCallbackReturn.SUCCESS transitions to "inactive". TransitionCallbackReturn.FAILURE transitions to "unconfigured". TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" """ gtpose = None str_param = self.get_parameter("mesh_path").get_parameter_value().string_value if str_param[0] == "!": # filename json json_file = str_param[1:] if not os.path.isfile(json_file): self.get_logger().info("not JSon '"+ json_file +"'") return TransitionCallbackReturn.FAILURE str_param = Path(json_file).read_text() try: y = json.loads(str_param) except json.JSONDecodeError as e: self.get_logger().info(f"JSon error: {e}") return TransitionCallbackReturn.FAILURE if "mesh_path" not in y: self.get_logger().info("JSon 'mesh_path' not set") return TransitionCallbackReturn.FAILURE mesh_path = y["mesh_path"] if "gtpose" in y: gtpose = y["gtpose"] if "darknet_path" in y: self.darknet_path = y["darknet_path"] else: mesh_path = str_param if not os.path.isfile(mesh_path): self.get_logger().info("Parameter 'mesh_path' not set") return TransitionCallbackReturn.FAILURE data = os.path.basename(mesh_path) self.objName = os.path.splitext(data)[0] self.objMeshFile = mesh_path self.objPath = self.mytemppath / "examples" self.objPath.mkdir(exist_ok=True) self.objPath /= self.objName self.objPath.mkdir(exist_ok=True) tPath = self.objPath / "inputs" tPath.mkdir(exist_ok=True) tPath = self.objPath / "meshes" tPath.mkdir(exist_ok=True) tPath /= self.objName tPath.mkdir(exist_ok=True) shutil.copyfile(self.objMeshFile, str(tPath / (self.objName+".ply"))) # Create the subscribers. self._sub_info = self.create_subscription(CameraInfo, self.topicCameraInfo, self.listener_camera_info, 2) self._sub_depth = self.create_subscription(Image, self.topicDepth, self.listener_depth, 3) # Create the publisher. self._pub = self.create_lifecycle_publisher(Pose, self.topicSrv, 10) self._timer = self.create_timer(self.publishDelay, self.publish) if gtpose == None: # Preload Megapose model from megapose.scripts.run_inference_on_example import ModelPreload self.megapose_model = ModelPreload(self.objPath,"megapose-1.0-RGB-multi-hypothesis") else: self._pose = [t3d.euler.euler2quat(gtpose[3], gtpose[4], gtpose[5]), gtpose[:3]] self.get_logger().info('on_configure() is called.') return TransitionCallbackReturn.SUCCESS def on_activate(self, state: State) -> TransitionCallbackReturn: # Log self.get_logger().info('on_activate() is called.') # Create the main subscriber. self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3) return super().on_activate(state) def on_deactivate(self, state: State) -> TransitionCallbackReturn: # Log self.get_logger().info('on_deactivate() is called.') # Destroy the main subscriber. self.destroy_subscription(self._sub) return super().on_deactivate(state) def on_cleanup(self, state: State) -> TransitionCallbackReturn: """ Cleanup the node. :return: The state machine either invokes a transition to the "unconfigured" state or stays in "inactive" depending on the return value. TransitionCallbackReturn.SUCCESS transitions to "unconfigured". TransitionCallbackReturn.FAILURE transitions to "inactive". TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" """ # очистим параметры node_param = rclpy.parameter.Parameter("mesh_path", rclpy.Parameter.Type.STRING, "") all_node_param = [node_param] self.set_parameters(all_node_param) self._is_camerainfo = False self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.destroy_subscription(self._sub) self.destroy_subscription(self._sub_info) self.destroy_subscription(self._sub_depth) self.get_logger().info('on_cleanup() is called.') return TransitionCallbackReturn.SUCCESS def on_shutdown(self, state: State) -> TransitionCallbackReturn: """ Shutdown the node. :return: The state machine either invokes a transition to the "finalized" state or stays in the current state depending on the return value. TransitionCallbackReturn.SUCCESS transitions to "unconfigured". TransitionCallbackReturn.FAILURE transitions to "inactive". TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" """ self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.destroy_subscription(self._sub) self.destroy_subscription(self._sub_info) self.destroy_subscription(self._sub_depth) self.get_logger().info('on_shutdown() is called.') return TransitionCallbackReturn.SUCCESS def listener_camera_info(self, data): """ CameraInfo callback function. """ if self._is_camerainfo: # повторно инфо камеры не читаем return self._res = [data.height, data.width] k_ = data.k self._K = [ [k_[0], k_[1], k_[2]], [k_[3], k_[4], k_[5]], [k_[6], k_[7], k_[8]] ] # self._K = [ # [k_[0]*2.0, k_[1], data.width / 2.0], # k_[2]], # # [k_[3], k_[4]*2.0, data.height / 2.0], # k_[5]], # # [k_[6], k_[7], k_[8]] #self.mesh_scale] # ] # *** for test tPath = self.objPath / "inputs" output_fn = tPath / "object_data.json" output_json_dict = { "label": self.objName, "bbox_modal": [168,173,260,252] # 0.2 0 0 -> [288,170,392,253] } data = [] data.append(output_json_dict) output_fn.write_text(json.dumps(data)) # *** #{"K": [[25.0, 0.0, 8.65], [0.0, 25.0, 6.5], [0.0, 0.0, 1.0]], "resolution": [480, 640]} output_fn = self.objPath / "camera_data.json" output_json_dict = { "K": self._K, "resolution": self._res } data = [] data.append(output_json_dict) output_fn.write_text(json.dumps(output_json_dict)) # установим признак получения инфо камеры self._is_camerainfo = True def listener_depth(self, data): """ Depth image callback function. """ # Convert ROS Image message to OpenCV image current_frame = self.br.imgmsg_to_cv2(data) # Save depth image for Megapose cv2.imwrite(str(self.objPath / "image_depth.png"), current_frame) def load_result(self, example_dir: Path, json_name = "object_data.json"): f = example_dir / "outputs" / json_name if os.path.isfile(f): data = f.read_text() else: data = "No result file: '" + str(f) + "'" return data def rel2bbox(self, rel_coord): bb_w = rel_coord["width"] bb_h = rel_coord["height"] x = int((rel_coord["center_x"] - bb_w/2.) * self._res[1]) y = int((rel_coord["center_y"] - bb_h/2.) * self._res[0]) w = int(bb_w * self._res[1]) h = int(bb_h * self._res[0]) return [x,y,w,h] def yolo2megapose(self, res_json: str, path_to: Path) -> bool: str_param = Path(res_json).read_text() y = json.loads(str_param)[0] conf = 0.5 # threshold of detection found_coord = None for detections in y["objects"]: if detections["name"] == self.objName: c_conf = detections["confidence"] if c_conf > conf: conf = c_conf found_coord = detections["relative_coordinates"] if found_coord: bbox = self.rel2bbox(found_coord) else: bbox = [2, 2, self._res[1]-4, self._res[0]-4] #tPath = path_to / "inputs" output_fn = path_to / "inputs/object_data.json" output_json_dict = { "label": self.objName, "bbox_modal": bbox #[288,170,392,253] } data = [] data.append(output_json_dict) output_fn.write_text(json.dumps(data)) return bool(found_coord) def listener_callback(self, data): """ Image Callback function. """ if not self._is_camerainfo: self.get_logger().warning("No data from CameraInfo") return # get camera pose camera_name = self.camera_link #self.topicImage.split('/')[1] try: t = self.tf_buffer.lookup_transform("world", camera_name, rclpy.time.Time()) except TransformException as ex: self.get_logger().info(f'Could not transform {camera_name} to world: {ex}') return self.camera_pose.position.x = t.transform.translation.x self.camera_pose.position.y = t.transform.translation.y self.camera_pose.position.z = t.transform.translation.z self.camera_pose.orientation.w = t.transform.rotation.w self.camera_pose.orientation.x = t.transform.rotation.x self.camera_pose.orientation.y = t.transform.rotation.y self.camera_pose.orientation.z = t.transform.rotation.z # Convert ROS Image message to OpenCV image current_frame = self.br.imgmsg_to_cv2(data) # Save image for Megapose frame_im = str(self.objPath / "image_rgb.png") cv2.imwrite(frame_im, current_frame) self._image_cnt += 1 detected = False darknet_bin = os.path.join(self.darknet_path, "darknet") if os.path.isfile(darknet_bin): # object detection (YoloV4 - darknet) self.get_logger().info(f"darknet: begin {self._image_cnt}") result_json = str(self.objPath / "res.json") subprocess.run([darknet_bin, "detector", "test", "yolov4_objs2.data", "yolov4_objs2.cfg", "yolov4.weights", "-dont_show", "-ext_output", # -thresh 0.5 "-out", result_json, frame_im], cwd = self.darknet_path #, stdout = subprocess.PIPE ) detected = self.yolo2megapose(result_json, self.objPath) self.get_logger().info(f"darknet: end {self._image_cnt}") if detected and self.megapose_model: # 6D pose estimation self.get_logger().info(f"megapose: begin {self._image_cnt} {self.objPath}") from megapose.scripts.run_inference_on_example import run_inference_rbs run_inference_rbs(self.megapose_model) data = self.load_result(self.objPath) if data[0] == "[": y = json.loads(data)[0] self._pose = y["TWO"] self.get_logger().info(f"megapose: end {self._image_cnt}") def main(): rclpy.init() executor = rclpy.executors.SingleThreadedExecutor() lc_node = PoseEstimator("lc_pose_estimator") executor.add_node(lc_node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): lc_node.destroy_node() if __name__ == '__main__': main()