add condition "isAruco"

This commit is contained in:
shalenikol 2025-04-16 10:25:09 +03:00
parent 9d0098a115
commit f6ef0f122e

View file

@ -32,12 +32,11 @@ Parameters:
markers_visualization_topic - topic to publish markers visualization (default /aruco/poses) markers_visualization_topic - topic to publish markers visualization (default /aruco/poses)
output_image_topic - topic to publish annotated image (default /aruco/image) output_image_topic - topic to publish annotated image (default /aruco/image)
Version: 2025-04-08 Version: 2025-04-16
""" """
# ROS2 imports # ROS2 imports
import rclpy import rclpy
# import rclpy.node
from rclpy.qos import qos_profile_sensor_data from rclpy.qos import qos_profile_sensor_data
from cv_bridge import CvBridge from cv_bridge import CvBridge
import message_filters import message_filters
@ -58,11 +57,12 @@ from rbss_aruco_pe.pose_estimation import pose_estimation
# ROS2 message imports # ROS2 message imports
from sensor_msgs.msg import CameraInfo, Image from sensor_msgs.msg import CameraInfo, Image
from geometry_msgs.msg import PoseArray from geometry_msgs.msg import PoseArray
# from rcl_interfaces.msg import ParameterDescriptor, ParameterType
from rbs_skill_interfaces.msg import ArucoMarkers from rbs_skill_interfaces.msg import ArucoMarkers
from rbs_skill_interfaces.srv import RbsBt
NODE_NAME_DEFAULT = "aruco_pe" NODE_NAME_DEFAULT = "aruco_pe"
PARAM_SKILL_CFG = "aruco_pe_cfg" PARAM_SKILL_CFG = "aruco_pe_cfg"
CONDITION_SRV_NAME = "/condition"
DEFAULT_ARUCO_DICTIONARY = "DICT_6X6_50" DEFAULT_ARUCO_DICTIONARY = "DICT_6X6_50"
DEFAULT_MARKER_SIZE = 0.04 DEFAULT_MARKER_SIZE = 0.04
@ -129,6 +129,9 @@ class ArucoLcNode(Node):
self.markers_pub = self.create_lifecycle_publisher(ArucoMarkers, self.detected_markers_topic, 5) self.markers_pub = self.create_lifecycle_publisher(ArucoMarkers, self.detected_markers_topic, 5)
self.image_pub = self.create_lifecycle_publisher(Image, self.output_image_topic, 5) self.image_pub = self.create_lifecycle_publisher(Image, self.output_image_topic, 5)
self.markers = ArucoMarkers()
self._srv_condition = self.create_service(RbsBt, NODE_NAME_DEFAULT + CONDITION_SRV_NAME, self.condition_callback)
self.clear_param() self.clear_param()
def clear_param(self): def clear_param(self):
@ -165,6 +168,24 @@ class ArucoLcNode(Node):
elif nam == "is_image_mode": elif nam == "is_image_mode":
self.is_image_mode = (val == "True") self.is_image_mode = (val == "True")
def condition_callback(self, request, response):
"""
Condition "isAruco" determines whether there is a marker with the given id in the frame
"""
_is = False
if request.command == "isAruco":
json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(json_param)
for comm in self.skill_cfg["BTAction"]:
if comm["name"] == "isAruco":
for par in comm["param"]:
if par["type"] == "formBuilder":
aruco_id = int(par["dependency"]["output"]["aruco_id"])
_is = (aruco_id in self.markers.marker_ids) # marker found in the image
self.get_logger().info(f"{aruco_id=} {_is=}")
response.ok = _is
return response
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn: def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
""" """
Configure the node, after a configuring transition is requested. Configure the node, after a configuring transition is requested.
@ -265,18 +286,18 @@ class ArucoLcNode(Node):
cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding="rgb8") cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding="rgb8")
# create the ArucoMarkers and PoseArray messages # create the ArucoMarkers and PoseArray messages
markers = ArucoMarkers() self.markers = ArucoMarkers()
pose_array = PoseArray() pose_array = PoseArray()
# Set the frame id and timestamp for the markers and pose array # Set the frame id and timestamp for the markers and pose array
if self.camera_frame == "": if self.camera_frame == "":
markers.header.frame_id = self.info_msg.header.frame_id self.markers.header.frame_id = self.info_msg.header.frame_id
pose_array.header.frame_id = self.info_msg.header.frame_id pose_array.header.frame_id = self.info_msg.header.frame_id
else: else:
markers.header.frame_id = self.camera_frame self.markers.header.frame_id = self.camera_frame
pose_array.header.frame_id = self.camera_frame pose_array.header.frame_id = self.camera_frame
markers.header.stamp = img_msg.header.stamp self.markers.header.stamp = img_msg.header.stamp
pose_array.header.stamp = img_msg.header.stamp pose_array.header.stamp = img_msg.header.stamp
""" """
@ -288,20 +309,20 @@ class ArucoLcNode(Node):
""" """
# call the pose estimation function # call the pose estimation function
frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None, frame, pose_array, self.markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
aruco_dict=self.aruco_dictionary, aruco_dict=self.aruco_dictionary,
marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat, marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers) distortion_coefficients=self.distortion, pose_array=pose_array, markers=self.markers)
# frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None, # frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
# aruco_detector=self.aruco_detector, # aruco_detector=self.aruco_detector,
# marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat, # marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
# distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers) # distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
# if some markers are detected # if some markers are detected
if len(markers.marker_ids) > 0: if len(self.markers.marker_ids) > 0:
# Publish the results with the poses and markes positions # Publish the results with the poses and markes positions
self.poses_pub.publish(pose_array) self.poses_pub.publish(pose_array)
self.markers_pub.publish(markers) self.markers_pub.publish(self.markers)
# publish the image frame with computed markers positions over the image # publish the image frame with computed markers positions over the image
self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8")) self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))