#!/usr/bin/env python3 """ object_detection_lifecycle_node_via_interface_node ROS 2 program for Object Detection @shalenikol release 0.3 """ from typing import Optional import os import json import rclpy from rclpy.lifecycle import Node from rclpy.lifecycle import Publisher from rclpy.lifecycle import State from rclpy.lifecycle import TransitionCallbackReturn from rclpy.timer import Timer from ament_index_python.packages import get_package_share_directory from sensor_msgs.msg import Image from rbs_skill_interfaces.msg import BoundBox from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images FILE_DOPE_CONFIG = "od_yolo_config.yaml" CAMERA_LINK_DEFAULT = "rgbd_camera" NODE_NAME_DEFAULT = "lc_yolo" # the name doesn't matter in this node (defined in Launch) PARAM_SKILL_CFG = "lc_yolo_cfg" class ObjectDetection(Node): """Our lifecycle node.""" def _Settings(self): # Initialization service settings for prop in self.skill_cfg["Settings"]: nam = prop["name"] val = prop["value"] if nam == "publishDelay": self.publishDelay = val elif nam == "is_image_mode": self._is_image_mode = val elif nam == "cameraLink": self._set_camera_topic(val) def __init__(self, **kwargs): """Construct the node.""" self._count: int = 0 self._pub: Optional[Publisher] = None self._timer: Optional[Timer] = None self._image_cnt: int = 0 self._sub = None self._is_image_mode = False self.yolov8_weights_file = "" self.model = None # for other nodes kwargs["allow_undeclared_parameters"] = True kwargs["automatically_declare_parameters_from_overrides"] = True super().__init__(NODE_NAME_DEFAULT, **kwargs) str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value self.skill_cfg = json.loads(str_cfg) self.nodeName = self.skill_cfg["Launch"]["name"] self.camera_link = "" self.topicImage = "" self._set_camera_topic(CAMERA_LINK_DEFAULT) self.publishDelay = 0.33 self.topicSrv = self.nodeName + "/" + self.skill_cfg["Interface"]["Output"][0]["name"] self.topicDetectImage = self.nodeName + "/detect_image" self._Settings() # Used to convert between ROS and OpenCV images self.br = CvBridge() self.objName = "" self.image_det = [] self.bbox_res = None def _set_camera_topic(self, camera_link: str): """ Service for camera name topics """ self.camera_link = camera_link self.topicImage = "/" + camera_link + "/image" def on_configure(self, state: State) -> TransitionCallbackReturn: """ Configure the node, after a configuring transition is requested. :return: The state machine either invokes a transition to the "inactive" state or stays in "unconfigured" depending on the return value. TransitionCallbackReturn.SUCCESS transitions to "inactive". TransitionCallbackReturn.FAILURE transitions to "unconfigured". TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" """ json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value self.skill_cfg = json.loads(json_param) self._Settings() dependency = {} for comm in self.skill_cfg["BTAction"]: for par in comm["param"]: if par["type"] == "weights": dependency = par["dependency"] assert dependency, "no dependency" self.yolov8_weights_file = dependency["weights_file"] if not os.path.isfile(self.yolov8_weights_file): self.get_logger().warning(f"No weights found <{self.yolov8_weights_file}>") return TransitionCallbackReturn.FAILURE self.objName = dependency["object_name"] # Create the publisher. if self._is_image_mode: self._pub = self.create_lifecycle_publisher(Image, self.topicDetectImage, 3) else: self._pub = self.create_lifecycle_publisher(BoundBox, self.topicSrv, 10) self._timer = self.create_timer(self.publishDelay, self.publish) from ultralytics import YOLO self.model = YOLO(self.yolov8_weights_file) self.get_logger().info("on_configure() is called.") return TransitionCallbackReturn.SUCCESS def on_activate(self, state: State) -> TransitionCallbackReturn: self.get_logger().info('on_activate() is called.') # Create the main subscriber. self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3) return super().on_activate(state) def on_deactivate(self, state: State) -> TransitionCallbackReturn: self.get_logger().info('on_deactivate() is called.') # Destroy the main subscriber. self.destroy_subscription(self._sub) return super().on_deactivate(state) def on_cleanup(self, state: State) -> TransitionCallbackReturn: """ Cleanup the node. """ # # очистим параметры # node_param = rclpy.parameter.Parameter("setting_path", rclpy.Parameter.Type.STRING, "") # all_node_param = [node_param] # self.set_parameters(all_node_param) self._is_image_mode = False self.image_det = [] self.bbox_res = None self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.destroy_subscription(self._sub) self.get_logger().info('on_cleanup() is called.') return TransitionCallbackReturn.SUCCESS def on_shutdown(self, state: State) -> TransitionCallbackReturn: """ Shutdown the node. """ self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.destroy_subscription(self._sub) self.get_logger().info('on_shutdown() is called.') return TransitionCallbackReturn.SUCCESS def publish(self): """Publish a new message when enabled.""" self._count += 1 if self._pub is not None and self._pub.is_activated: # Publish result if self._is_image_mode: if len(self.image_det) == 0: return # The 'cv2_to_imgmsg' method converts an OpenCV image to a ROS 2 image message msg = self.br.cv2_to_imgmsg(self.image_det, encoding="bgr8") else: if not self.bbox_res: return msg = BoundBox() msg.is_detection = False #from ultralytics.engine.results cconf = 0.0 # threshold bb = None for c in self.bbox_res.boxes: idx = int(c.cls) if self.bbox_res.names[idx] == self.objName: conf = float(c.conf) if cconf < conf: cconf = conf bb = c if bb: msg.is_detection = True msg.name = self.objName msg.x = float(bb.xywhn[0,0]) msg.y = float(bb.xywhn[0,1]) msg.w = float(bb.xywhn[0,2]) msg.h = float(bb.xywhn[0,3]) msg.conf = float(bb.conf) # Publish the message. self._pub.publish(msg) def listener_callback(self, data): """ Image Callback function. """ if self.model: self._image_cnt += 1 if self._image_cnt % 100 == 1: self.get_logger().info(f"detection: {self._image_cnt}") # Convert ROS Image message to OpenCV image current_frame = self.br.imgmsg_to_cv2(data) # run inference results = self.model(current_frame) if self._is_image_mode: for r in results: self.image_det = r.plot() # plot a BGR numpy array of predictions else: for r in results: self.bbox_res = r # self.get_logger().info(f"detection: end {self._image_cnt}") def main(): rclpy.init() executor = rclpy.executors.SingleThreadedExecutor() lc_node = ObjectDetection() executor.add_node(lc_node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): lc_node.destroy_node() if __name__ == '__main__': main()