runtime/rbs_perception/scripts/detection_service.py

133 lines
4 KiB
Python
Executable file

#!/usr/bin/env python3
"""
detection_service
ROS 2 program for Object Detection
25.04.2023 @shalenikol release 0.1
"""
# Import the necessary libraries
import rclpy # Python library for ROS 2
from rclpy.node import Node # Handles the creation of nodes
from sensor_msgs.msg import Image # Image is the message type
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
import cv2 # OpenCV library
import subprocess
import os
import json
from ament_index_python.packages import get_package_share_directory
class ImageSubscriber(Node):
"""
Create an ImageSubscriber class, which is a subclass of the Node class.
"""
def _InitService(self):
# Initialization service data
p = os.path.join(get_package_share_directory("rbs_perception"), "config", "detection_config.json")
# load config
with open(p, "r") as f:
y = json.load(f)
for name, val in y.items():
if name == "topicName":
self.topicName = val
elif name == "topicImage":
self.topicImage = val
elif name == "weightsFile":
self.weightsFile = val
elif name == "topicPubName":
self.topicPubName = val
def __init__(self):
"""
Class constructor to set up the node
"""
self.topicName = "image_subscriber"
self.topicImage = "/ground_true/camera_node"
self.weightsFile = "yolov4_objs2_final.weights"
self.topicPubName = "detection_images"
self._InitService()
# Initiate the Node class's constructor and give it a name
super().__init__(self.topicName)
# Create the subscriber. This subscriber will receive an Image
# from the video_frames topic. The queue size is 10 messages.
self.subscription = self.create_subscription(
Image,
self.topicImage,
self.listener_callback,
3)
self.subscription # prevent unused variable warning
# Create the publisher. This publisher will publish an Image
# to the detection_frames topic. The queue size is 5 messages.
self.publisher_ = self.create_publisher(Image, self.topicPubName, 5)
# Used to convert between ROS and OpenCV images
self.br = CvBridge()
self.cnt = 0
def listener_callback(self, data):
"""
Callback function.
"""
# Display the message on the console
self.get_logger().info("Receiving video frame")
# Convert ROS Image message to OpenCV image
current_frame = self.br.imgmsg_to_cv2(data)
# Display image
#cv2.imshow("camera", current_frame)
frame_jpg = "cur_frame.jpg"
result_json = "res.json"
cv2.imwrite(frame_jpg, current_frame)
self.cnt += 1
# object detection
self.get_logger().info(f"darknet: begin {self.cnt}") # __file__ = <<<{__file__}>>>
subprocess.run(["darknet", "detector", "test",
"yolov4_objs2.data",
"yolov4_objs2.cfg",
self.weightsFile,
"-dont_show", "-ext_output",
"-out", result_json, frame_jpg]
) # f"res{self.cnt}.json"
# удалим временный файл darknet
f = "bad.list"
if os.path.isfile(f):
os.remove(f)
self.get_logger().info(f"darknet: end {self.cnt}")
f = "predictions.jpg"
if os.path.isfile(f):
frame = cv2.imread(f)
# Publish the image.
# The 'cv2_to_imgmsg' method converts an OpenCV
# image to a ROS 2 image message
self.publisher_.publish(self.br.cv2_to_imgmsg(frame, encoding="rgb8"))
cv2.waitKey(1)
def main(args=None):
# Initialize the rclpy library
rclpy.init(args=args)
# Create the node
image_subscriber = ImageSubscriber()
# Spin the node so the callback function is called.
rclpy.spin(image_subscriber)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
image_subscriber.destroy_node()
# Shutdown the ROS client library for Python
rclpy.shutdown()
if __name__ == '__main__':
main()