#!/usr/bin/env python3 """ object_detection_lifecycle_node ROS 2 program for Object Detection @shalenikol release 0.1 """ from typing import Optional import os import json from pathlib import Path import rclpy from rclpy.lifecycle import Node from rclpy.lifecycle import Publisher from rclpy.lifecycle import State from rclpy.lifecycle import TransitionCallbackReturn from rclpy.timer import Timer from ament_index_python.packages import get_package_share_directory from sensor_msgs.msg import Image from rbs_skill_interfaces.msg import BoundBox from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images #import cv2 # OpenCV library class Detection(Node): """Our lifecycle node.""" def _InitService(self): # Initialization service data p = os.path.join(get_package_share_directory("rbs_perception"), "config", "detection_config.json") # load config with open(p, "r") as f: y = json.load(f) for name, val in y.items(): if name == "nodeName": self.nodeName = val elif name == "cameraLink": self.camera_link = val elif name == "topicImage": self.topicImage = val elif name == "publishDelay": self.publishDelay = val elif name == "topicSrv": self.topicSrv = val def __init__(self, node_name, **kwargs): """Construct the node.""" self._count: int = 0 self._pub: Optional[Publisher] = None self._timer: Optional[Timer] = None self._image_cnt: int = 0 self._sub = None self._is_image_mode = False self.yolov8_weights_file = "" self.model = None self.nodeName = node_name self.camera_link = "outer_rgbd_camera" self.topicImage = "/outer_rgb_camera/image" self.publishDelay = 0.33 self.topicSrv = self.nodeName + "/object_detect" self.topicDetectImage = self.nodeName + "/detect_image" self._InitService() # for other nodes kwargs["allow_undeclared_parameters"] = True kwargs["automatically_declare_parameters_from_overrides"] = True super().__init__(self.nodeName, **kwargs) self.declare_parameter("setting_path", rclpy.Parameter.Type.STRING) # Used to convert between ROS and OpenCV images self.br = CvBridge() self.objName = "" self.image_det = [] self.bbox_res = None def on_configure(self, state: State) -> TransitionCallbackReturn: """ Configure the node, after a configuring transition is requested. :return: The state machine either invokes a transition to the "inactive" state or stays in "unconfigured" depending on the return value. TransitionCallbackReturn.SUCCESS transitions to "inactive". TransitionCallbackReturn.FAILURE transitions to "unconfigured". TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" """ str_param = self.get_parameter("setting_path").get_parameter_value().string_value if str_param[0] == "!": # filename json json_file = str_param[1:] if not os.path.isfile(json_file): self.get_logger().info("not JSon '"+ json_file +"'") return TransitionCallbackReturn.FAILURE str_param = Path(json_file).read_text() try: y = json.loads(str_param) except json.JSONDecodeError as e: self.get_logger().info(f"JSon error: {e}") return TransitionCallbackReturn.FAILURE if "yolov8_weights_file" in y: self.yolov8_weights_file = y["yolov8_weights_file"] if "objName" in y: self.objName = y["objName"] if "mode" in y: self._is_image_mode = (y["mode"] == "image_res") else: self.yolov8_weights_file = str_param if not os.path.isfile(self.yolov8_weights_file): self.get_logger().info(f"Parameter 'yolov8_weights_file' not valid: {self.yolov8_weights_file}") return TransitionCallbackReturn.FAILURE # Create the publisher. if self._is_image_mode: self._pub = self.create_lifecycle_publisher(Image, self.topicDetectImage, 3) else: self._pub = self.create_lifecycle_publisher(BoundBox, self.topicSrv, 10) self._timer = self.create_timer(self.publishDelay, self.publish) from ultralytics import YOLO self.model = YOLO(self.yolov8_weights_file) self.get_logger().info("on_configure() is called.") return TransitionCallbackReturn.SUCCESS def on_activate(self, state: State) -> TransitionCallbackReturn: # Log self.get_logger().info('on_activate() is called.') # Create the main subscriber. self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3) return super().on_activate(state) def on_deactivate(self, state: State) -> TransitionCallbackReturn: # Log self.get_logger().info('on_deactivate() is called.') # Destroy the main subscriber. self.destroy_subscription(self._sub) return super().on_deactivate(state) def on_cleanup(self, state: State) -> TransitionCallbackReturn: """ Cleanup the node. :return: The state machine either invokes a transition to the "unconfigured" state or stays in "inactive" depending on the return value. TransitionCallbackReturn.SUCCESS transitions to "unconfigured". TransitionCallbackReturn.FAILURE transitions to "inactive". TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" """ # очистим параметры node_param = rclpy.parameter.Parameter("setting_path", rclpy.Parameter.Type.STRING, "") all_node_param = [node_param] self.set_parameters(all_node_param) self._is_image_mode = False self.image_det = [] self.bbox_res = None self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.destroy_subscription(self._sub) self.get_logger().info('on_cleanup() is called.') return TransitionCallbackReturn.SUCCESS def on_shutdown(self, state: State) -> TransitionCallbackReturn: """ Shutdown the node. :return: The state machine either invokes a transition to the "finalized" state or stays in the current state depending on the return value. TransitionCallbackReturn.SUCCESS transitions to "unconfigured". TransitionCallbackReturn.FAILURE transitions to "inactive". TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" """ self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.destroy_subscription(self._sub) self.get_logger().info('on_shutdown() is called.') return TransitionCallbackReturn.SUCCESS def publish(self): """Publish a new message when enabled.""" self._count += 1 if self._pub is not None and self._pub.is_activated: # Publish result if self._is_image_mode: if len(self.image_det) == 0: return # The 'cv2_to_imgmsg' method converts an OpenCV image to a ROS 2 image message msg = self.br.cv2_to_imgmsg(self.image_det, encoding="bgr8") else: if not self.bbox_res: return msg = BoundBox() msg.is_detection = False #from ultralytics.engine.results cconf = 0.0 # threshold bb = None for c in self.bbox_res.boxes: idx = int(c.cls) if self.bbox_res.names[idx] == self.objName: conf = float(c.conf) if cconf < conf: cconf = conf bb = c if bb: msg.is_detection = True msg.name = self.objName msg.x = float(bb.xywhn[0,0]) msg.y = float(bb.xywhn[0,1]) msg.w = float(bb.xywhn[0,2]) msg.h = float(bb.xywhn[0,3]) msg.conf = float(bb.conf) # Publish the message. self._pub.publish(msg) def listener_callback(self, data): """ Image Callback function. """ if self.model: self._image_cnt += 1 self.get_logger().info(f"detection: begin {self._image_cnt}") # Convert ROS Image message to OpenCV image current_frame = self.br.imgmsg_to_cv2(data) # run inference results = self.model(current_frame) if self._is_image_mode: for r in results: self.image_det = r.plot() # plot a BGR numpy array of predictions else: for r in results: self.bbox_res = r self.get_logger().info(f"detection: end {self._image_cnt}") def main(): rclpy.init() executor = rclpy.executors.SingleThreadedExecutor() lc_node = Detection("lc_detection") executor.add_node(lc_node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): lc_node.destroy_node() if __name__ == '__main__': main()