#!/usr/bin/env python3 """ detection_service ROS 2 program for 6D Pose Estimation @shalenikol release 0.1 """ # Import the necessary libraries import rclpy # Python library for ROS 2 from rclpy.node import Node # Handles the creation of nodes from sensor_msgs.msg import Image # Image is the message type from geometry_msgs.msg import Quaternion from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images import cv2 # OpenCV library from rbs_skill_interfaces.srv import DetectObject from rbs_skill_interfaces.msg import ObjectInfo #import subprocess import os import shutil import json import tempfile from pathlib import Path import numpy as np from ament_index_python.packages import get_package_share_directory import megapose from megapose.scripts.run_inference_on_example import run_inference """ # encoder for numpy array def np_encoder(object): if isinstance(object, (np.generic, np.ndarray)): return object.item() """ class ImageSubscriber(Node): """ Create an ImageSubscriber class, which is a subclass of the Node class. """ def _InitService(self): # Initialization service data p = os.path.join(get_package_share_directory("rbs_perception"), "config", "pose_estimation_config.json") # load config with open(p, "r") as f: y = json.load(f) for name, val in y.items(): if name == "topicName": self.topicName = val elif name == "topicImage": self.topicImage = val elif name == "topicPubName": self.topicPubName = val elif name == "topicSrv": self.topicSrv = val def _getCameraParam(self): # {"K": [[924.855, 0.0, 320.0], [0.0, 923.076, 240.0], [0.0, 0.0, 1.0]], "resolution": [480, 640]} """ focal length: 25 mm (field of view: 38.1712°) sensor type: Micro Four Thirds System (Стандарт MFT микро 4/3) sensor_width: 17.3 mm sensor_height: 13 mm """ resolution = [480, 640] #np.array([480, 640]) intrinsic_matrix = [ #np.array([ [924.855, 0.0, resolution[1] / 2.0], [0.0, 923.076, resolution[0] / 2.0], [0.0, 0.0, 1.0] ] #) return intrinsic_matrix, resolution def __init__(self): """ Class constructor to set up the node """ self.topicName = "image_sub2" self.topicImage = "/ground_true/camera_node" self.topicPubName = "pose6D_images" self.topicSrv = "detect6Dpose" self._InitService() self.tmpdir = tempfile.gettempdir() self.mytemppath = Path(self.tmpdir) / "rbs_per" self.mytemppath.mkdir(exist_ok=True) self.K_, self.res_ = self._getCameraParam() #os.environ["MEGAPOSE_DATA_DIR"] = str(self.mytemppath) # Initiate the Node class's constructor and give it a name super().__init__(self.topicName) self.subscription = None self.objName = "" self.objMeshFile = "" self.objPath = "" # Used to convert between ROS and OpenCV images self.br = CvBridge() self.cnt = 0 #self.get_logger().info(f"__init__ : __file__ = {__file__} tmpdir = {self.tmpdir}") self.service = self.create_service(DetectObject, self.topicSrv, self.service_callback) def service_callback(self, request, response): self.get_logger().info(f"Incoming request for object detection ObjectInfo(name: {request.object.name}, mesh_path: {request.object.mesh_path})") if not os.path.isfile(request.object.mesh_path): response.call_status = False response.error_msg = f"{request.object.mesh_path}: no such file" return response if request.object.id == -1: self.subscription = None # ? сброс подписки response.call_status = True return response if self.subscription == None: self.objName = request.object.name self.objMeshFile = request.object.mesh_path self.objPath = self.mytemppath / "examples" self.objPath.mkdir(exist_ok=True) self.objPath /= self.objName self.objPath.mkdir(exist_ok=True) tPath = self.objPath / "inputs" tPath.mkdir(exist_ok=True) #{"label": "fork", "bbox_modal": [329, 189, 430, 270]} output_fn = tPath / "object_data.json" output_json_dict = { "label": self.objName, "bbox_modal": [2,2,self.res_[1]-1,self.res_[0]-1] } data = [] data.append(output_json_dict) output_fn.write_text(json.dumps(data)) tPath = self.objPath / "meshes" tPath.mkdir(exist_ok=True) tPath /= self.objName tPath.mkdir(exist_ok=True) shutil.copyfile(self.objMeshFile, str(tPath / (self.objName+".ply"))) #{"K": [[25.0, 0.0, 8.65], [0.0, 25.0, 6.5], [0.0, 0.0, 1.0]], "resolution": [480, 640]} output_fn = self.objPath / "camera_data.json" output_json_dict = { "K": self.K_, "resolution": self.res_ } data = [] data.append(output_json_dict) output_fn.write_text(json.dumps(data)) # Create the subscriber. This subscriber will receive an Image from the video_frames topic. The queue size is 3 messages. self.subscription = self.create_subscription(Image, self.topicImage, self.listener_callback, 3) # Create the publisher. This publisher will publish an Quaternion to the 'pose6D_' topic. The queue size is 10 messages. self.publisher = self.create_publisher(Quaternion, "pose6D_"+self.objName, 10) response.call_status = True else: response.call_status = True return response def load_result(self, example_dir: Path, json_name = "object_data.json"): f = example_dir / "outputs" / json_name if os.path.isfile(f): data = f.read_text() else: data = "No result file: '" + str(f) + "'" return data def listener_callback(self, data): """ Callback function. """ # Display the message on the console self.get_logger().info("Receiving video frame") # Convert ROS Image message to OpenCV image current_frame = self.br.imgmsg_to_cv2(data) # Save image for Megapose cv2.imwrite(str(self.objPath / "image_rgb.png"), current_frame) self.cnt += 1 # 6D pose estimation self.get_logger().info(f"megapose: begin {self.cnt}") run_inference(self.objPath,"megapose-1.0-RGB-multi-hypothesis") # опубликуем результат оценки позы data = self.load_result(self.objPath) if data[0] == "[": y = json.loads(data)[0] pose = y['TWO'] quat = pose[0] #pose[1] - 3D перемещение self.publisher.publish(Quaternion(x=quat[1],y=quat[2],z=quat[3],w=quat[0])) self.get_logger().info(f"megapose: end {self.cnt}") cv2.waitKey(1) def main(args=None): # Initialize the rclpy library rclpy.init(args=args) # Create the node image_subscriber = ImageSubscriber() # Spin the node so the callback function is called. rclpy.spin(image_subscriber) # Destroy the node explicitly # (optional - otherwise it will be done automatically # when the garbage collector destroys the node object) image_subscriber.destroy_node() # Shutdown the ROS client library for Python rclpy.shutdown() if __name__ == '__main__': main()