update skills: recording demo, object detection

This commit is contained in:
shalenikol 2024-10-07 17:23:19 +03:00 committed by Bill Finger
parent b42c2e633e
commit ec604bccb8
2 changed files with 59 additions and 41 deletions

View file

@ -3,7 +3,7 @@
recording_demo_lifecycle_node_via_RosBag recording_demo_lifecycle_node_via_RosBag
ROS 2 program for recording a demo via RosBag ROS 2 program for recording a demo via RosBag
@shalenikol release 0.1 @shalenikol release 0.2
""" """
import os import os
import shutil import shutil
@ -46,20 +46,24 @@ class RecordingDemo(Node):
str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(str_cfg) self.skill_cfg = json.loads(str_cfg)
sets = self.skill_cfg["Settings"] sets = self.skill_cfg["Settings"]["output"]["params"]
for s in sets: for s in sets:
if s["name"] == "output_path": if s["name"] == "output_path":
self.output_path = s["value"] self.output_path = s["value"]
# check for output folder existence
if os.path.isdir(self.output_path): if os.path.isdir(self.output_path):
shutil.rmtree(self.output_path) x = 1
while os.path.isdir(self.output_path + str(x)):
x += 1
shutil.move(self.output_path, self.output_path + str(x))
self.writer = rosbag2_py.SequentialWriter() self.writer = rosbag2_py.SequentialWriter()
storage_options = rosbag2_py._storage.StorageOptions( # storage_options = rosbag2_py._storage.StorageOptions(
uri=self.output_path, # uri=self.output_path,
storage_id=WRITER_ID) # storage_id=WRITER_ID)
converter_options = rosbag2_py._storage.ConverterOptions("", "") # converter_options = rosbag2_py._storage.ConverterOptions("", "")
self.writer.open(storage_options, converter_options) # self.writer.open(storage_options, converter_options)
def get_list_topics(self) -> list: def get_list_topics(self) -> list:
topic_names_and_types = get_topic_names_and_types(node=self, include_hidden_topics=False) topic_names_and_types = get_topic_names_and_types(node=self, include_hidden_topics=False)
@ -82,6 +86,9 @@ class RecordingDemo(Node):
TransitionCallbackReturn.FAILURE transitions to "unconfigured". TransitionCallbackReturn.FAILURE transitions to "unconfigured".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
""" """
storage_options = rosbag2_py._storage.StorageOptions(uri=self.output_path, storage_id=WRITER_ID)
converter_options = rosbag2_py._storage.ConverterOptions("", "")
self.writer.open(storage_options, converter_options)
self.topics = self.get_list_topics() self.topics = self.get_list_topics()
for id, topic in enumerate(self.topics): for id, topic in enumerate(self.topics):
@ -125,6 +132,8 @@ class RecordingDemo(Node):
self.destroy_subscription(sub) self.destroy_subscription(sub)
self._sub.clear() self._sub.clear()
self.writer.close()
self.get_logger().info('on_cleanup() is called.') self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS return TransitionCallbackReturn.SUCCESS

View file

@ -13,8 +13,10 @@ import json
import rclpy import rclpy
from rclpy.lifecycle import Node from rclpy.lifecycle import Node
from rclpy.lifecycle import Publisher from rclpy.lifecycle import Publisher
from rclpy.lifecycle import State from rclpy.lifecycle import LifecycleState
from rclpy.lifecycle import TransitionCallbackReturn from rclpy.lifecycle import TransitionCallbackReturn
from lifecycle_msgs.msg import State
# from lifecycle_msgs.srv import GetState
from rclpy.timer import Timer from rclpy.timer import Timer
from ament_index_python.packages import get_package_share_directory from ament_index_python.packages import get_package_share_directory
@ -24,26 +26,17 @@ from rbs_skill_interfaces.srv import RbsBt
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
# CAMERA_LINK_DEFAULT = "rgbd_camera"
CONDITION_SRV_NAME = "/condition" CONDITION_SRV_NAME = "/condition"
FILE_DOPE_CONFIG = "od_yolo_config.yaml" FILE_DOPE_CONFIG = "od_yolo_config.yaml"
CAMERA_LINK_DEFAULT = "rgbd_camera"
NODE_NAME_DEFAULT = "lc_yolo" NODE_NAME_DEFAULT = "lc_yolo"
PARAM_SKILL_CFG = "lc_yolo_cfg" PARAM_SKILL_CFG = "lc_yolo_cfg"
OUT_TOPIC_DEFAULT = "/object_detection"
OUT_TOPIC_IMAGE = "/detect_image"
OUT_TOPIC_TYPE = "rbs_skill_interfaces/msg/BoundBox"
class ObjectDetection(Node): class ObjectDetection(Node):
"""Our lifecycle node.""" """Our lifecycle node."""
def _Settings(self):
# Initialization service settings
for prop in self.skill_cfg["Settings"]:
nam = prop["name"]
val = prop["value"]
if nam == "publishDelay":
self.publishDelay = val
elif nam == "is_image_mode":
self._is_image_mode = val
elif nam == "cameraLink":
self._set_camera_topic(val)
def __init__(self, **kwargs): def __init__(self, **kwargs):
"""Construct the node.""" """Construct the node."""
self._count: int = 0 self._count: int = 0
@ -64,13 +57,11 @@ class ObjectDetection(Node):
self.skill_cfg = json.loads(str_cfg) self.skill_cfg = json.loads(str_cfg)
self.nodeName = NODE_NAME_DEFAULT # self.skill_cfg["Module"]["node_name"] #["Launch"]["name"] self.nodeName = NODE_NAME_DEFAULT # self.skill_cfg["Module"]["node_name"] #["Launch"]["name"]
self.camera_link = "" self.topicImage = "" # input topic
self.topicImage = ""
self._set_camera_topic(CAMERA_LINK_DEFAULT)
self.publishDelay = 0.33 self.publishDelay = 0.33
self.topicSrv = self.nodeName + "/" + self.skill_cfg["Interface"]["Output"][0]["name"] self.topicSrv = self.nodeName + OUT_TOPIC_DEFAULT # self.skill_cfg["topicsOut"][0]["name"] # self.nodeName + "/" + self.skill_cfg["Interface"]["Output"][0]["name"]
self.topicDetectImage = self.nodeName + "/detect_image" self.topicDetectImage = self.nodeName + OUT_TOPIC_IMAGE
self._Settings() # self._Settings()
# Used to convert between ROS and OpenCV images # Used to convert between ROS and OpenCV images
self.br = CvBridge() self.br = CvBridge()
@ -82,20 +73,34 @@ class ObjectDetection(Node):
self._srv_condition = self.create_service(RbsBt, NODE_NAME_DEFAULT + CONDITION_SRV_NAME, self.condition_callback) self._srv_condition = self.create_service(RbsBt, NODE_NAME_DEFAULT + CONDITION_SRV_NAME, self.condition_callback)
# , callback_group=self.cb_group) # , callback_group=self.cb_group)
def _Settings(self):
# Initialization service settings
# TODO
# for prop in self.skill_cfg["Settings"]:
# nam = prop["name"]
# val = prop["value"]
# if nam == "publishDelay":
# self.publishDelay = val
# elif nam == "is_image_mode":
# self._is_image_mode = val
for prop in self.skill_cfg["topicsOut"]:
if prop["type"] == OUT_TOPIC_TYPE:
self.topicSrv = prop["name"]
def condition_callback(self, request, response): def condition_callback(self, request, response):
# is_success = self._interface(request.sid, request.action, request.command)
_is = False _is = False
if request.command == "isDetectionRun": if request.command == "isDetectionRun":
_is = True # self._cli_getstate = self.create_client(GetState, f"{self.nodeName}/get_state")
# req = GetState.Request()
# self.get_logger().info(f"condition_callback : before call")
# res = self._cli_getstate.call(req) # the program freezes
id_,_ = self._state_machine.current_state
_is = (id_ == State.PRIMARY_STATE_ACTIVE)
response.ok = _is response.ok = _is
return response return response
def _set_camera_topic(self, camera_link: str): def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
""" Service for camera name topics """
self.camera_link = camera_link
self.topicImage = "/" + camera_link + "/image"
def on_configure(self, state: State) -> TransitionCallbackReturn:
""" """
Configure the node, after a configuring transition is requested. Configure the node, after a configuring transition is requested.
@ -113,7 +118,10 @@ class ObjectDetection(Node):
for par in comm["param"]: for par in comm["param"]:
if par["type"] == "weights": if par["type"] == "weights":
dependency = par["dependency"] dependency = par["dependency"]
assert dependency, "no dependency" elif par["type"] == "topic":
self.topicImage = par["dependency"]["topicOut"]
assert dependency, "no weights dependency"
assert self.topicImage, "no input topic dependency"
self.yolov8_weights_file = dependency["weights_file"] self.yolov8_weights_file = dependency["weights_file"]
if not os.path.isfile(self.yolov8_weights_file): if not os.path.isfile(self.yolov8_weights_file):
@ -136,19 +144,19 @@ class ObjectDetection(Node):
self.get_logger().info("on_configure() is called.") self.get_logger().info("on_configure() is called.")
return TransitionCallbackReturn.SUCCESS return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn: def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('on_activate() is called.') self.get_logger().info('on_activate() is called.')
# Create the main subscriber. # Create the main subscriber.
self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3) self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3)
return super().on_activate(state) return super().on_activate(state)
def on_deactivate(self, state: State) -> TransitionCallbackReturn: def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('on_deactivate() is called.') self.get_logger().info('on_deactivate() is called.')
# Destroy the main subscriber. # Destroy the main subscriber.
self.destroy_subscription(self._sub) self.destroy_subscription(self._sub)
return super().on_deactivate(state) return super().on_deactivate(state)
def on_cleanup(self, state: State) -> TransitionCallbackReturn: def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
""" """
Cleanup the node. Cleanup the node.
""" """
@ -160,6 +168,7 @@ class ObjectDetection(Node):
self._is_image_mode = False self._is_image_mode = False
self.image_det = [] self.image_det = []
self.bbox_res = None self.bbox_res = None
self.topicImage = ""
self.destroy_timer(self._timer) self.destroy_timer(self._timer)
self.destroy_publisher(self._pub) self.destroy_publisher(self._pub)
@ -168,7 +177,7 @@ class ObjectDetection(Node):
self.get_logger().info('on_cleanup() is called.') self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: State) -> TransitionCallbackReturn: def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn:
""" """
Shutdown the node. Shutdown the node.
""" """