#!/usr/bin/env python3 """ object_detection_lifecycle_node_via_interface_node ROS 2 program for Object Detection @shalenikol release 0.3 """ from typing import Optional import os import json import rclpy from rclpy.lifecycle import Node from rclpy.lifecycle import Publisher from rclpy.lifecycle import LifecycleState from rclpy.lifecycle import TransitionCallbackReturn from lifecycle_msgs.msg import State # from lifecycle_msgs.srv import GetState from rclpy.timer import Timer from ament_index_python.packages import get_package_share_directory from sensor_msgs.msg import Image from rbs_skill_interfaces.msg import BoundBox from rbs_skill_interfaces.srv import RbsBt from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images # CAMERA_LINK_DEFAULT = "rgbd_camera" CONDITION_SRV_NAME = "/condition" FILE_DOPE_CONFIG = "od_yolo_config.yaml" NODE_NAME_DEFAULT = "lc_yolo" PARAM_SKILL_CFG = "lc_yolo_cfg" OUT_TOPIC_DEFAULT = "/object_detection" OUT_TOPIC_IMAGE = "/detect_image" OUT_TOPIC_TYPE = "rbs_skill_interfaces/msg/BoundBox" class ObjectDetection(Node): """Our lifecycle node.""" def __init__(self, **kwargs): """Construct the node.""" self._count: int = 0 self._pub: Optional[Publisher] = None self._timer: Optional[Timer] = None self._image_cnt: int = 0 self._sub = None self._is_image_mode = False self.yolov8_weights_file = "" self.model = None # for other nodes kwargs["allow_undeclared_parameters"] = True kwargs["automatically_declare_parameters_from_overrides"] = True super().__init__(NODE_NAME_DEFAULT, **kwargs) str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value self.skill_cfg = json.loads(str_cfg) self.nodeName = NODE_NAME_DEFAULT # self.skill_cfg["Module"]["node_name"] #["Launch"]["name"] self.topicImage = "" # input topic self.publishDelay = 0.33 self.topicSrv = self.nodeName + OUT_TOPIC_DEFAULT # self.skill_cfg["topicsOut"][0]["name"] # self.nodeName + "/" + self.skill_cfg["Interface"]["Output"][0]["name"] self.topicDetectImage = self.nodeName + OUT_TOPIC_IMAGE # self._Settings() # Used to convert between ROS and OpenCV images self.br = CvBridge() self.objName = "" self.image_det = [] self.bbox_res = None self._srv_condition = self.create_service(RbsBt, NODE_NAME_DEFAULT + CONDITION_SRV_NAME, self.condition_callback) # , callback_group=self.cb_group) def _Settings(self): # Initialization service settings # TODO # for prop in self.skill_cfg["Settings"]: # nam = prop["name"] # val = prop["value"] # if nam == "publishDelay": # self.publishDelay = val # elif nam == "is_image_mode": # self._is_image_mode = val for prop in self.skill_cfg["topicsOut"]: if prop["type"] == OUT_TOPIC_TYPE: self.topicSrv = prop["name"] def condition_callback(self, request, response): _is = False if request.command == "isDetectionRun": # self._cli_getstate = self.create_client(GetState, f"{self.nodeName}/get_state") # req = GetState.Request() # self.get_logger().info(f"condition_callback : before call") # res = self._cli_getstate.call(req) # the program freezes id_,_ = self._state_machine.current_state _is = (id_ == State.PRIMARY_STATE_ACTIVE) response.ok = _is return response def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn: """ Configure the node, after a configuring transition is requested. :return: The state machine either invokes a transition to the "inactive" state or stays in "unconfigured" depending on the return value. TransitionCallbackReturn.SUCCESS transitions to "inactive". TransitionCallbackReturn.FAILURE transitions to "unconfigured". TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" """ json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value self.skill_cfg = json.loads(json_param) self._Settings() dependency = {} for comm in self.skill_cfg["BTAction"]: for par in comm["param"]: if par["type"] == "weights": dependency = par["dependency"] elif par["type"] == "topic": self.topicImage = par["dependency"]["topicOut"] assert dependency, "no weights dependency" assert self.topicImage, "no input topic dependency" self.yolov8_weights_file = dependency["weights_file"] if not os.path.isfile(self.yolov8_weights_file): self.get_logger().warning(f"No weights found <{self.yolov8_weights_file}>") return TransitionCallbackReturn.FAILURE self.objName = dependency["object_name"] # Create the publisher. if self._is_image_mode: self._pub = self.create_lifecycle_publisher(Image, self.topicDetectImage, 3) else: self._pub = self.create_lifecycle_publisher(BoundBox, self.topicSrv, 10) self._timer = self.create_timer(self.publishDelay, self.publish) from ultralytics import YOLO self.model = YOLO(self.yolov8_weights_file) self.get_logger().info("on_configure() is called.") return TransitionCallbackReturn.SUCCESS def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn: self.get_logger().info('on_activate() is called.') # Create the main subscriber. self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3) return super().on_activate(state) def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn: self.get_logger().info('on_deactivate() is called.') # Destroy the main subscriber. self.destroy_subscription(self._sub) return super().on_deactivate(state) def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn: """ Cleanup the node. """ # # очистим параметры # node_param = rclpy.parameter.Parameter("setting_path", rclpy.Parameter.Type.STRING, "") # all_node_param = [node_param] # self.set_parameters(all_node_param) self._is_image_mode = False self.image_det = [] self.bbox_res = None self.topicImage = "" self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.destroy_subscription(self._sub) self.get_logger().info('on_cleanup() is called.') return TransitionCallbackReturn.SUCCESS def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn: """ Shutdown the node. """ self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.destroy_subscription(self._sub) self.get_logger().info('on_shutdown() is called.') return TransitionCallbackReturn.SUCCESS def publish(self): """Publish a new message when enabled.""" self._count += 1 if self._pub is not None and self._pub.is_activated: # Publish result if self._is_image_mode: if len(self.image_det) == 0: return # The 'cv2_to_imgmsg' method converts an OpenCV image to a ROS 2 image message msg = self.br.cv2_to_imgmsg(self.image_det, encoding="bgr8") else: if not self.bbox_res: return msg = BoundBox() msg.is_detection = False #from ultralytics.engine.results cconf = 0.0 # threshold bb = None for c in self.bbox_res.boxes: idx = int(c.cls) if self.bbox_res.names[idx] == self.objName: conf = float(c.conf) if cconf < conf: cconf = conf bb = c if bb: msg.is_detection = True msg.name = self.objName msg.x = float(bb.xywhn[0,0]) msg.y = float(bb.xywhn[0,1]) msg.w = float(bb.xywhn[0,2]) msg.h = float(bb.xywhn[0,3]) msg.conf = float(bb.conf) # Publish the message. self._pub.publish(msg) def listener_callback(self, data): """ Image Callback function. """ if self.model: self._image_cnt += 1 if self._image_cnt % 100 == 1: self.get_logger().info(f"detection: {self._image_cnt}") # Convert ROS Image message to OpenCV image current_frame = self.br.imgmsg_to_cv2(data) # run inference results = self.model(current_frame) if self._is_image_mode: for r in results: self.image_det = r.plot() # plot a BGR numpy array of predictions else: for r in results: self.bbox_res = r # self.get_logger().info(f"detection: end {self._image_cnt}") def main(): rclpy.init() executor = rclpy.executors.SingleThreadedExecutor() lc_node = ObjectDetection() executor.add_node(lc_node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): lc_node.destroy_node() if __name__ == '__main__': main()