runtime/rbs_perception/scripts/detection_lifecycle.py

258 lines
No EOL
9.5 KiB
Python
Executable file

#!/usr/bin/env python3
"""
object_detection_lifecycle_node
ROS 2 program for Object Detection
@shalenikol release 0.1
"""
from typing import Optional
import os
import json
from pathlib import Path
import rclpy
from rclpy.lifecycle import Node
from rclpy.lifecycle import Publisher
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
from rclpy.timer import Timer
from ament_index_python.packages import get_package_share_directory
from sensor_msgs.msg import Image
from rbs_skill_interfaces.msg import BoundBox
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
#import cv2 # OpenCV library
class Detection(Node):
"""Our lifecycle node."""
def _InitService(self):
# Initialization service data
p = os.path.join(get_package_share_directory("rbs_perception"), "config", "detection_config.json")
# load config
with open(p, "r") as f:
y = json.load(f)
for name, val in y.items():
if name == "nodeName":
self.nodeName = val
elif name == "cameraLink":
self.camera_link = val
elif name == "topicImage":
self.topicImage = val
elif name == "publishDelay":
self.publishDelay = val
elif name == "topicSrv":
self.topicSrv = val
def __init__(self, node_name, **kwargs):
"""Construct the node."""
self._count: int = 0
self._pub: Optional[Publisher] = None
self._timer: Optional[Timer] = None
self._image_cnt: int = 0
self._sub = None
self._is_image_mode = False
self.yolov8_weights_file = ""
self.model = None
self.nodeName = node_name
self.camera_link = "outer_rgbd_camera"
self.topicImage = "/outer_rgb_camera/image"
self.publishDelay = 0.33
self.topicSrv = self.nodeName + "/object_detect"
self.topicDetectImage = self.nodeName + "/detect_image"
self._InitService()
# for other nodes
kwargs["allow_undeclared_parameters"] = True
kwargs["automatically_declare_parameters_from_overrides"] = True
super().__init__(self.nodeName, **kwargs)
self.declare_parameter("setting_path", rclpy.Parameter.Type.STRING)
# Used to convert between ROS and OpenCV images
self.br = CvBridge()
self.objName = ""
self.image_det = []
self.bbox_res = None
def on_configure(self, state: State) -> TransitionCallbackReturn:
"""
Configure the node, after a configuring transition is requested.
:return: The state machine either invokes a transition to the "inactive" state or stays
in "unconfigured" depending on the return value.
TransitionCallbackReturn.SUCCESS transitions to "inactive".
TransitionCallbackReturn.FAILURE transitions to "unconfigured".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
str_param = self.get_parameter("setting_path").get_parameter_value().string_value
if str_param[0] == "!": # filename json
json_file = str_param[1:]
if not os.path.isfile(json_file):
self.get_logger().info("not JSon '"+ json_file +"'")
return TransitionCallbackReturn.FAILURE
str_param = Path(json_file).read_text()
try:
y = json.loads(str_param)
except json.JSONDecodeError as e:
self.get_logger().info(f"JSon error: {e}")
return TransitionCallbackReturn.FAILURE
if "yolov8_weights_file" in y:
self.yolov8_weights_file = y["yolov8_weights_file"]
if "objName" in y:
self.objName = y["objName"]
if "mode" in y:
self._is_image_mode = (y["mode"] == "image_res")
else:
self.yolov8_weights_file = str_param
if not os.path.isfile(self.yolov8_weights_file):
self.get_logger().info(f"Parameter 'yolov8_weights_file' not valid: {self.yolov8_weights_file}")
return TransitionCallbackReturn.FAILURE
# Create the publisher.
if self._is_image_mode:
self._pub = self.create_lifecycle_publisher(Image, self.topicDetectImage, 3)
else:
self._pub = self.create_lifecycle_publisher(BoundBox, self.topicSrv, 10)
self._timer = self.create_timer(self.publishDelay, self.publish)
from ultralytics import YOLO
self.model = YOLO(self.yolov8_weights_file)
self.get_logger().info("on_configure() is called.")
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
# Log
self.get_logger().info('on_activate() is called.')
# Create the main subscriber.
self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3)
return super().on_activate(state)
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
# Log
self.get_logger().info('on_deactivate() is called.')
# Destroy the main subscriber.
self.destroy_subscription(self._sub)
return super().on_deactivate(state)
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
"""
Cleanup the node.
:return: The state machine either invokes a transition to the "unconfigured" state or stays
in "inactive" depending on the return value.
TransitionCallbackReturn.SUCCESS transitions to "unconfigured".
TransitionCallbackReturn.FAILURE transitions to "inactive".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
# очистим параметры
node_param = rclpy.parameter.Parameter("setting_path", rclpy.Parameter.Type.STRING, "")
all_node_param = [node_param]
self.set_parameters(all_node_param)
self._is_image_mode = False
self.image_det = []
self.bbox_res = None
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
self.destroy_subscription(self._sub)
self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
"""
Shutdown the node.
:return: The state machine either invokes a transition to the "finalized" state or stays
in the current state depending on the return value.
TransitionCallbackReturn.SUCCESS transitions to "unconfigured".
TransitionCallbackReturn.FAILURE transitions to "inactive".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
self.destroy_subscription(self._sub)
self.get_logger().info('on_shutdown() is called.')
return TransitionCallbackReturn.SUCCESS
def publish(self):
"""Publish a new message when enabled."""
self._count += 1
if self._pub is not None and self._pub.is_activated:
# Publish result
if self._is_image_mode:
if len(self.image_det) == 0:
return
# The 'cv2_to_imgmsg' method converts an OpenCV image to a ROS 2 image message
msg = self.br.cv2_to_imgmsg(self.image_det, encoding="bgr8")
else:
if not self.bbox_res:
return
msg = BoundBox()
msg.is_detection = False
#from ultralytics.engine.results
cconf = 0.0 # threshold
bb = None
for c in self.bbox_res.boxes:
idx = int(c.cls)
if self.bbox_res.names[idx] == self.objName:
conf = float(c.conf)
if cconf < conf:
cconf = conf
bb = c
if bb:
msg.is_detection = True
msg.name = self.objName
msg.x = float(bb.xywhn[0,0])
msg.y = float(bb.xywhn[0,1])
msg.w = float(bb.xywhn[0,2])
msg.h = float(bb.xywhn[0,3])
msg.conf = float(bb.conf)
# Publish the message.
self._pub.publish(msg)
def listener_callback(self, data):
"""
Image Callback function.
"""
if self.model:
self._image_cnt += 1
self.get_logger().info(f"detection: begin {self._image_cnt}")
# Convert ROS Image message to OpenCV image
current_frame = self.br.imgmsg_to_cv2(data)
# run inference
results = self.model(current_frame)
if self._is_image_mode:
for r in results:
self.image_det = r.plot() # plot a BGR numpy array of predictions
else:
for r in results:
self.bbox_res = r
self.get_logger().info(f"detection: end {self._image_cnt}")
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
lc_node = Detection("lc_detection")
executor.add_node(lc_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
lc_node.destroy_node()
if __name__ == '__main__':
main()