runtime/rbss_objectdetection/scripts/od_yolo_lc.py

242 lines
8.5 KiB
Python
Raw Normal View History

2024-06-25 08:35:54 +00:00
#!/usr/bin/env python3
"""
object_detection_lifecycle_node_via_interface_node
ROS 2 program for Object Detection
@shalenikol release 0.3
"""
from typing import Optional
import os
import json
import rclpy
from rclpy.lifecycle import Node
from rclpy.lifecycle import Publisher
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
from rclpy.timer import Timer
from ament_index_python.packages import get_package_share_directory
from sensor_msgs.msg import Image
from rbs_skill_interfaces.msg import BoundBox
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
FILE_DOPE_CONFIG = "od_yolo_config.yaml"
CAMERA_LINK_DEFAULT = "rgbd_camera"
NODE_NAME_DEFAULT = "lc_yolo" # the name doesn't matter in this node (defined in Launch)
PARAM_SKILL_CFG = "lc_yolo_cfg"
class ObjectDetection(Node):
"""Our lifecycle node."""
def _Settings(self):
# Initialization service settings
for prop in self.skill_cfg["Settings"]:
nam = prop["name"]
val = prop["value"]
if nam == "publishDelay":
self.publishDelay = val
elif nam == "is_image_mode":
self._is_image_mode = val
elif nam == "cameraLink":
self._set_camera_topic(val)
def __init__(self, **kwargs):
"""Construct the node."""
self._count: int = 0
self._pub: Optional[Publisher] = None
self._timer: Optional[Timer] = None
self._image_cnt: int = 0
self._sub = None
self._is_image_mode = False
self.yolov8_weights_file = ""
self.model = None
# for other nodes
kwargs["allow_undeclared_parameters"] = True
kwargs["automatically_declare_parameters_from_overrides"] = True
super().__init__(NODE_NAME_DEFAULT, **kwargs)
str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(str_cfg)
self.nodeName = self.skill_cfg["Launch"]["name"]
self.camera_link = ""
self.topicImage = ""
self._set_camera_topic(CAMERA_LINK_DEFAULT)
self.publishDelay = 0.33
self.topicSrv = self.nodeName + "/" + self.skill_cfg["Interface"]["Output"][0]["name"]
self.topicDetectImage = self.nodeName + "/detect_image"
self._Settings()
# Used to convert between ROS and OpenCV images
self.br = CvBridge()
self.objName = ""
self.image_det = []
self.bbox_res = None
def _set_camera_topic(self, camera_link: str):
""" Service for camera name topics """
self.camera_link = camera_link
self.topicImage = "/" + camera_link + "/image"
def on_configure(self, state: State) -> TransitionCallbackReturn:
"""
Configure the node, after a configuring transition is requested.
:return: The state machine either invokes a transition to the "inactive" state or stays
in "unconfigured" depending on the return value.
TransitionCallbackReturn.SUCCESS transitions to "inactive".
TransitionCallbackReturn.FAILURE transitions to "unconfigured".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(json_param)
self._Settings()
dependency = {}
for comm in self.skill_cfg["BTAction"]:
for par in comm["param"]:
if par["type"] == "weights":
dependency = par["dependency"]
assert dependency, "no dependency"
self.yolov8_weights_file = dependency["weights_file"]
if not os.path.isfile(self.yolov8_weights_file):
self.get_logger().warning(f"No weights found <{self.yolov8_weights_file}>")
return TransitionCallbackReturn.FAILURE
self.objName = dependency["object_name"]
# Create the publisher.
if self._is_image_mode:
self._pub = self.create_lifecycle_publisher(Image, self.topicDetectImage, 3)
else:
self._pub = self.create_lifecycle_publisher(BoundBox, self.topicSrv, 10)
self._timer = self.create_timer(self.publishDelay, self.publish)
from ultralytics import YOLO
self.model = YOLO(self.yolov8_weights_file)
self.get_logger().info("on_configure() is called.")
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_activate() is called.')
# Create the main subscriber.
self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3)
return super().on_activate(state)
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_deactivate() is called.')
# Destroy the main subscriber.
self.destroy_subscription(self._sub)
return super().on_deactivate(state)
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
"""
Cleanup the node.
"""
# # очистим параметры
# node_param = rclpy.parameter.Parameter("setting_path", rclpy.Parameter.Type.STRING, "")
# all_node_param = [node_param]
# self.set_parameters(all_node_param)
self._is_image_mode = False
self.image_det = []
self.bbox_res = None
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
self.destroy_subscription(self._sub)
self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
"""
Shutdown the node.
"""
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
self.destroy_subscription(self._sub)
self.get_logger().info('on_shutdown() is called.')
return TransitionCallbackReturn.SUCCESS
def publish(self):
"""Publish a new message when enabled."""
self._count += 1
if self._pub is not None and self._pub.is_activated:
# Publish result
if self._is_image_mode:
if len(self.image_det) == 0:
return
# The 'cv2_to_imgmsg' method converts an OpenCV image to a ROS 2 image message
msg = self.br.cv2_to_imgmsg(self.image_det, encoding="bgr8")
else:
if not self.bbox_res:
return
msg = BoundBox()
msg.is_detection = False
#from ultralytics.engine.results
cconf = 0.0 # threshold
bb = None
for c in self.bbox_res.boxes:
idx = int(c.cls)
if self.bbox_res.names[idx] == self.objName:
conf = float(c.conf)
if cconf < conf:
cconf = conf
bb = c
if bb:
msg.is_detection = True
msg.name = self.objName
msg.x = float(bb.xywhn[0,0])
msg.y = float(bb.xywhn[0,1])
msg.w = float(bb.xywhn[0,2])
msg.h = float(bb.xywhn[0,3])
msg.conf = float(bb.conf)
# Publish the message.
self._pub.publish(msg)
def listener_callback(self, data):
"""
Image Callback function.
"""
if self.model:
self._image_cnt += 1
if self._image_cnt % 100 == 1:
self.get_logger().info(f"detection: {self._image_cnt}")
# Convert ROS Image message to OpenCV image
current_frame = self.br.imgmsg_to_cv2(data)
# run inference
results = self.model(current_frame)
if self._is_image_mode:
for r in results:
self.image_det = r.plot() # plot a BGR numpy array of predictions
else:
for r in results:
self.bbox_res = r
# self.get_logger().info(f"detection: end {self._image_cnt}")
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
lc_node = ObjectDetection()
executor.add_node(lc_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
lc_node.destroy_node()
if __name__ == '__main__':
main()