runtime/rbss_objectdetection/scripts/od_yolo_lc.py

264 lines
9.7 KiB
Python
Raw Permalink Normal View History

2024-06-25 08:35:54 +00:00
#!/usr/bin/env python3
"""
object_detection_lifecycle_node_via_interface_node
ROS 2 program for Object Detection
@shalenikol release 0.3
"""
from typing import Optional
import os
import json
import rclpy
from rclpy.lifecycle import Node
from rclpy.lifecycle import Publisher
from rclpy.lifecycle import LifecycleState
2024-06-25 08:35:54 +00:00
from rclpy.lifecycle import TransitionCallbackReturn
from lifecycle_msgs.msg import State
# from lifecycle_msgs.srv import GetState
2024-06-25 08:35:54 +00:00
from rclpy.timer import Timer
from ament_index_python.packages import get_package_share_directory
from sensor_msgs.msg import Image
from rbs_skill_interfaces.msg import BoundBox
from rbs_skill_interfaces.srv import RbsBt
2024-06-25 08:35:54 +00:00
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
# CAMERA_LINK_DEFAULT = "rgbd_camera"
CONDITION_SRV_NAME = "/condition"
2024-06-25 08:35:54 +00:00
FILE_DOPE_CONFIG = "od_yolo_config.yaml"
NODE_NAME_DEFAULT = "lc_yolo"
2024-06-25 08:35:54 +00:00
PARAM_SKILL_CFG = "lc_yolo_cfg"
OUT_TOPIC_DEFAULT = "/object_detection"
OUT_TOPIC_IMAGE = "/detect_image"
OUT_TOPIC_TYPE = "rbs_skill_interfaces/msg/BoundBox"
2024-06-25 08:35:54 +00:00
class ObjectDetection(Node):
"""Our lifecycle node."""
def __init__(self, **kwargs):
"""Construct the node."""
self._count: int = 0
self._pub: Optional[Publisher] = None
self._timer: Optional[Timer] = None
self._image_cnt: int = 0
self._sub = None
self._is_image_mode = False
self.yolov8_weights_file = ""
self.model = None
# for other nodes
kwargs["allow_undeclared_parameters"] = True
kwargs["automatically_declare_parameters_from_overrides"] = True
super().__init__(NODE_NAME_DEFAULT, **kwargs)
str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(str_cfg)
self.nodeName = NODE_NAME_DEFAULT # self.skill_cfg["Module"]["node_name"] #["Launch"]["name"]
2024-06-25 08:35:54 +00:00
self.topicImage = "" # input topic
2024-06-25 08:35:54 +00:00
self.publishDelay = 0.33
self.topicSrv = self.nodeName + OUT_TOPIC_DEFAULT # self.skill_cfg["topicsOut"][0]["name"] # self.nodeName + "/" + self.skill_cfg["Interface"]["Output"][0]["name"]
self.topicDetectImage = self.nodeName + OUT_TOPIC_IMAGE
# self._Settings()
2024-06-25 08:35:54 +00:00
# Used to convert between ROS and OpenCV images
self.br = CvBridge()
self.objName = ""
self.image_det = []
self.bbox_res = None
self._srv_condition = self.create_service(RbsBt, NODE_NAME_DEFAULT + CONDITION_SRV_NAME, self.condition_callback)
# , callback_group=self.cb_group)
def _Settings(self):
# Initialization service settings
# TODO
# for prop in self.skill_cfg["Settings"]:
# nam = prop["name"]
# val = prop["value"]
# if nam == "publishDelay":
# self.publishDelay = val
# elif nam == "is_image_mode":
# self._is_image_mode = val
for prop in self.skill_cfg["topicsOut"]:
if prop["type"] == OUT_TOPIC_TYPE:
self.topicSrv = prop["name"]
def condition_callback(self, request, response):
_is = False
if request.command == "isDetectionRun":
# self._cli_getstate = self.create_client(GetState, f"{self.nodeName}/get_state")
# req = GetState.Request()
# self.get_logger().info(f"condition_callback : before call")
# res = self._cli_getstate.call(req) # the program freezes
id_,_ = self._state_machine.current_state
_is = (id_ == State.PRIMARY_STATE_ACTIVE)
response.ok = _is
return response
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
2024-06-25 08:35:54 +00:00
"""
Configure the node, after a configuring transition is requested.
:return: The state machine either invokes a transition to the "inactive" state or stays
in "unconfigured" depending on the return value.
TransitionCallbackReturn.SUCCESS transitions to "inactive".
TransitionCallbackReturn.FAILURE transitions to "unconfigured".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(json_param)
self._Settings()
dependency = {}
for comm in self.skill_cfg["BTAction"]:
for par in comm["param"]:
if par["type"] == "weights":
dependency = par["dependency"]
elif par["type"] == "topic":
self.topicImage = par["dependency"]["topicOut"]
assert dependency, "no weights dependency"
assert self.topicImage, "no input topic dependency"
2024-06-25 08:35:54 +00:00
self.yolov8_weights_file = dependency["weights_file"]
if not os.path.isfile(self.yolov8_weights_file):
self.get_logger().warning(f"No weights found <{self.yolov8_weights_file}>")
return TransitionCallbackReturn.FAILURE
self.objName = dependency["object_name"]
# Create the publisher.
if self._is_image_mode:
self._pub = self.create_lifecycle_publisher(Image, self.topicDetectImage, 3)
else:
self._pub = self.create_lifecycle_publisher(BoundBox, self.topicSrv, 10)
self._timer = self.create_timer(self.publishDelay, self.publish)
from ultralytics import YOLO
self.model = YOLO(self.yolov8_weights_file)
self.get_logger().info("on_configure() is called.")
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
2024-06-25 08:35:54 +00:00
self.get_logger().info('on_activate() is called.')
# Create the main subscriber.
self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3)
return super().on_activate(state)
def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
2024-06-25 08:35:54 +00:00
self.get_logger().info('on_deactivate() is called.')
# Destroy the main subscriber.
self.destroy_subscription(self._sub)
return super().on_deactivate(state)
def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
2024-06-25 08:35:54 +00:00
"""
Cleanup the node.
"""
# # очистим параметры
# node_param = rclpy.parameter.Parameter("setting_path", rclpy.Parameter.Type.STRING, "")
# all_node_param = [node_param]
# self.set_parameters(all_node_param)
self._is_image_mode = False
self.image_det = []
self.bbox_res = None
self.topicImage = ""
2024-06-25 08:35:54 +00:00
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
self.destroy_subscription(self._sub)
self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn:
2024-06-25 08:35:54 +00:00
"""
Shutdown the node.
"""
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
self.destroy_subscription(self._sub)
self.get_logger().info('on_shutdown() is called.')
return TransitionCallbackReturn.SUCCESS
def publish(self):
"""Publish a new message when enabled."""
self._count += 1
if self._pub is not None and self._pub.is_activated:
# Publish result
if self._is_image_mode:
if len(self.image_det) == 0:
return
# The 'cv2_to_imgmsg' method converts an OpenCV image to a ROS 2 image message
msg = self.br.cv2_to_imgmsg(self.image_det, encoding="bgr8")
else:
if not self.bbox_res:
return
msg = BoundBox()
msg.is_detection = False
#from ultralytics.engine.results
cconf = 0.0 # threshold
bb = None
for c in self.bbox_res.boxes:
idx = int(c.cls)
if self.bbox_res.names[idx] == self.objName:
conf = float(c.conf)
if cconf < conf:
cconf = conf
bb = c
if bb:
msg.is_detection = True
msg.name = self.objName
msg.x = float(bb.xywhn[0,0])
msg.y = float(bb.xywhn[0,1])
msg.w = float(bb.xywhn[0,2])
msg.h = float(bb.xywhn[0,3])
msg.conf = float(bb.conf)
# Publish the message.
self._pub.publish(msg)
def listener_callback(self, data):
"""
Image Callback function.
"""
if self.model:
self._image_cnt += 1
if self._image_cnt % 100 == 1:
self.get_logger().info(f"detection: {self._image_cnt}")
# Convert ROS Image message to OpenCV image
current_frame = self.br.imgmsg_to_cv2(data)
# run inference
results = self.model(current_frame)
if self._is_image_mode:
for r in results:
self.image_det = r.plot() # plot a BGR numpy array of predictions
else:
for r in results:
self.bbox_res = r
# self.get_logger().info(f"detection: end {self._image_cnt}")
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
lc_node = ObjectDetection()
executor.add_node(lc_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
lc_node.destroy_node()
if __name__ == '__main__':
main()