runtime/rbss_aruco_pe/scripts/aruco_node_lc.py

379 lines
17 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Code taken from:
https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation
ROS2 wrapper code taken from:
https://github.com/JMU-ROBOTICS-VIVA/ros2_aruco/tree/main
This node locates Aruco AR markers in images and publishes their ids and poses.
Subscriptions:
/camera/image/image (sensor_msgs.msg.Image)
/camera/image/camera_info (sensor_msgs.msg.CameraInfo)
Published Topics:
/aruco/poses (geometry_msgs.msg.PoseArray)
Pose of all detected markers (suitable for rviz visualization)
/aruco/markers (aruco_interfaces.msg.ArucoMarkers)
Provides an array of all poses along with the corresponding
marker ids.
/aruco/image (sensor_msgs.msg.Image)
Annotated image with marker locations and ids, with markers drawn on it
Parameters:
marker_size - size of the markers in meters (default 0.04)
aruco_dictionary_id - dictionary that was used to generate markers (default DICT_6X6_50)
image_topic - image topic to subscribe to (default /camera/color/image_raw)
camera_info_topic - camera info topic to subscribe to (default /camera/camera_info)
camera_frame - camera optical frame to use (default "camera_depth_optical_frame")
detected_markers_topic - topic to publish detected markers (default /aruco/markers)
markers_visualization_topic - topic to publish markers visualization (default /aruco/poses)
output_image_topic - topic to publish annotated image (default /aruco/image)
Version: 2025-04-16
"""
# ROS2 imports
import rclpy
from rclpy.qos import qos_profile_sensor_data
from cv_bridge import CvBridge
import message_filters
from rclpy.lifecycle import Node
from rclpy.lifecycle import LifecycleState
from rclpy.lifecycle import TransitionCallbackReturn
# Python imports
import numpy as np
import cv2
import json
# Local imports for custom defined functions
from rbss_aruco_pe.utils import ARUCO_DICT
from rbss_aruco_pe.pose_estimation import pose_estimation
# ROS2 message imports
from sensor_msgs.msg import CameraInfo, Image
from geometry_msgs.msg import PoseArray
from rbs_skill_interfaces.msg import ArucoMarkers
from rbs_skill_interfaces.srv import RbsBt
NODE_NAME_DEFAULT = "aruco_pe"
PARAM_SKILL_CFG = "aruco_pe_cfg"
CONDITION_SRV_NAME = "/condition"
DEFAULT_ARUCO_DICTIONARY = "DICT_6X6_50"
DEFAULT_MARKER_SIZE = 0.04
# Input topics
INP_IMAGE = "/camera/image/image"
INP_DEPTH_IMAGE = "/camera/image/depth_image"
INP_CAMERA_INFO = "/camera/image/camera_info"
CAMERA_FRAME = "camera_color_optical_frame" # Camera link frame of reference
# Output topics
OUT_DEFAULT_DETECTED_MARKERS = "/aruco/markers"
OUT_TT_DETECTED_MARKERS = "rbs_skill_interfaces/msg/ArucoMarkers"
OUT_DEFAULT_MARKERS_VISUALISATION = "/aruco/poses"
OUT_TT_MARKERS_VISUALISATION = "geometry_msgs/msg/PoseArray"
OUT_DEFAULT_IMAGE = "/aruco/image"
OUT_TT_IMAGE = "sensor_msgs/msg/Image"
class ArucoLcNode(Node):
"""Our lifecycle node."""
def __init__(self, **kwargs):
# default value
self.dictionary_id_name = DEFAULT_ARUCO_DICTIONARY
self.marker_size = DEFAULT_MARKER_SIZE # Size of the markers in meters
self.use_depth_input = False # Use depth image for 3D pose estimation
self.is_image_mode = False # Publish Image with detected Aruco markers
self.image_topic = INP_IMAGE
self.info_topic = INP_CAMERA_INFO
self.depth_image_topic = INP_DEPTH_IMAGE
self.camera_frame = CAMERA_FRAME
self.detected_markers_topic = OUT_DEFAULT_DETECTED_MARKERS
self.markers_visualization_topic = OUT_DEFAULT_MARKERS_VISUALISATION
self.output_image_topic = OUT_DEFAULT_IMAGE
# for other nodes
kwargs["allow_undeclared_parameters"] = True
kwargs["automatically_declare_parameters_from_overrides"] = True
super().__init__(NODE_NAME_DEFAULT, **kwargs)
str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(str_cfg)
self.nodeName = NODE_NAME_DEFAULT
self.initialize_parameters()
# Make sure we have a valid dictionary id:
try:
dictionary_id = cv2.aruco.__getattribute__(self.dictionary_id_name)
# check if the dictionary_id is a valid dictionary inside ARUCO_DICT values
if dictionary_id not in ARUCO_DICT.values():
raise AttributeError
except AttributeError:
self.get_logger().error(f"bad aruco_dictionary_id: {self.dictionary_id_name}")
options = "\n".join([s for s in ARUCO_DICT])
self.get_logger().error(f"valid options: {options}")
return False
self.aruco_dictionary = cv2.aruco.getPredefinedDictionary(dictionary_id)
self.bridge = CvBridge()
# Set up publishers
self.poses_pub = self.create_lifecycle_publisher(PoseArray, self.markers_visualization_topic, 5)
self.markers_pub = self.create_lifecycle_publisher(ArucoMarkers, self.detected_markers_topic, 5)
self.image_pub = self.create_lifecycle_publisher(Image, self.output_image_topic, 5)
self.markers = ArucoMarkers()
self._srv_condition = self.create_service(RbsBt, NODE_NAME_DEFAULT + CONDITION_SRV_NAME, self.condition_callback)
self.clear_param()
def clear_param(self):
# Set up fields for camera parameters
self.info_msg = None
self.intrinsic_mat = None
self.distortion = None
def initialize_parameters(self):
"""
Read parameters from skill config (via interface node)
"""
self._Settings()
for prop in self.skill_cfg["topicsOut"]:
ty = prop["type"]
if ty == OUT_TT_DETECTED_MARKERS:
self.detected_markers_topic = prop["name"]
elif ty == OUT_TT_MARKERS_VISUALISATION:
self.markers_visualization_topic = prop["name"]
elif ty == OUT_TT_IMAGE:
self.output_image_topic = prop["name"]
def _Settings(self):
# Initialization service settings
for prop in self.skill_cfg["Settings"]["output"]["params"]:
nam = prop["name"]
val = prop["value"]
if nam == "aruco_dictionary_id":
self.aruco_dictionary_id = val
elif nam == "marker_size":
self.marker_size = float(val)
elif nam == "camera_frame": # TODO перенести в dependencies для "arucoConfig"
self.camera_frame = val
elif nam == "is_image_mode":
self.is_image_mode = (val == "True")
def condition_callback(self, request, response):
"""
Condition "isAruco" determines whether there is a marker with the given id in the frame
"""
_is = False
if request.command == "isAruco":
json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(json_param)
for comm in self.skill_cfg["BTAction"]:
if comm["name"] == "isAruco":
for par in comm["param"]:
if par["type"] == "formBuilder":
aruco_id = int(par["dependency"]["output"]["aruco_id"])
_is = (aruco_id in self.markers.marker_ids) # marker found in the image
self.get_logger().info(f"{aruco_id=} {_is=}")
response.ok = _is
return response
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
"""
Configure the node, after a configuring transition is requested.
"""
json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(json_param)
for comm in self.skill_cfg["BTAction"]:
for par in comm["param"]:
dependency = par["dependency"]
if par["type"] == "topic":
topic_type = dependency["topicType"]
topic_name = dependency["topicOut"]
if "CameraInfo" in topic_type:
self.info_topic = topic_name
elif "Image" in topic_type:
if "depth" in topic_name:
self.depth_image_topic = topic_name
else:
self.image_topic = topic_name
# camera info topic for the camera calibration parameters
self.info_sub = self.create_subscription(CameraInfo, self.info_topic, self.info_callback, qos_profile_sensor_data)
self.get_logger().info("on_configure is called.")
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
# select the type of input to use for the pose estimation
if self.use_depth_input:
# use both rgb and depth image topics for the pose estimation
# create a message filter to synchronize the image and depth image topics
self.image_sub = message_filters.Subscriber(self, Image, self.image_topic, qos_profile=qos_profile_sensor_data)
self.depth_image_sub = message_filters.Subscriber(self, Image, self.depth_image_topic, qos_profile=qos_profile_sensor_data)
# create synchronizer between the 2 topics using message filters and approximate time policy
# slop is the maximum time difference between messages that are considered synchronized
self.synchronizer = message_filters.ApproximateTimeSynchronizer(
[self.image_sub, self.depth_image_sub], queue_size=10, slop=0.05
)
self.synchronizer.registerCallback(self.rgb_depth_sync_callback)
else:
# rely only on the rgb image topic for the pose estimation
# create a subscription to the image topic
self.image_sub = self.create_subscription(Image, self.image_topic, self.image_callback, qos_profile_sensor_data)
self.get_logger().info("on_activate is called")
return super().on_activate(state)
def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
if self.use_depth_input:
# TODO not tested
self.destroy_subscription(self.image_sub)
self.destroy_subscription(self.depth_image_sub)
else:
# Destroy the Image subscriber.
self.destroy_subscription(self.image_sub)
self.get_logger().info("on_deactivate is called")
return super().on_deactivate(state)
def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
self.destroy_subscription(self.info_sub)
self.clear_param()
self.get_logger().info("on_cleanup is called")
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info("on_shutdown is called")
return TransitionCallbackReturn.SUCCESS
def info_callback(self, info_msg):
self.info_msg = info_msg
# get the intrinsic matrix and distortion coefficients from the camera info
self.intrinsic_mat = np.reshape(np.array(self.info_msg.k), (3, 3))
self.distortion = np.array(self.info_msg.d)
self.get_logger().info("Camera info received.")
self.get_logger().info(f"Intrinsic matrix: {self.intrinsic_mat}")
self.get_logger().info(f"Distortion coefficients: {self.distortion}")
self.get_logger().info(f"Camera frame: {self.info_msg.width}x{self.info_msg.height}")
# Assume that camera parameters will remain the same...
self.destroy_subscription(self.info_sub)
def image_callback(self, img_msg: Image):
if self.info_msg is None:
self.get_logger().warn("No camera info has been received!")
return
# convert the image messages to cv2 format
cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding="rgb8")
# create the ArucoMarkers and PoseArray messages
self.markers = ArucoMarkers()
pose_array = PoseArray()
# Set the frame id and timestamp for the markers and pose array
if self.camera_frame == "":
self.markers.header.frame_id = self.info_msg.header.frame_id
pose_array.header.frame_id = self.info_msg.header.frame_id
else:
self.markers.header.frame_id = self.camera_frame
pose_array.header.frame_id = self.camera_frame
self.markers.header.stamp = img_msg.header.stamp
pose_array.header.stamp = img_msg.header.stamp
"""
# OVERRIDE: use calibrated intrinsic matrix and distortion coefficients
self.intrinsic_mat = np.reshape([615.95431, 0., 325.26983,
0., 617.92586, 257.57722,
0., 0., 1.], (3, 3))
self.distortion = np.array([0.142588, -0.311967, 0.003950, -0.006346, 0.000000])
"""
# call the pose estimation function
frame, pose_array, self.markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
aruco_dict=self.aruco_dictionary,
marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
distortion_coefficients=self.distortion, pose_array=pose_array, markers=self.markers)
# frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
# aruco_detector=self.aruco_detector,
# marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
# distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
# if some markers are detected
if len(self.markers.marker_ids) > 0:
# Publish the results with the poses and markes positions
self.poses_pub.publish(pose_array)
self.markers_pub.publish(self.markers)
# publish the image frame with computed markers positions over the image
self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))
def rgb_depth_sync_callback(self, rgb_msg: Image, depth_msg: Image):
# convert the image messages to cv2 format
cv_depth_image = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding="16UC1")
cv_image = self.bridge.imgmsg_to_cv2(rgb_msg, desired_encoding="rgb8")
# create the ArucoMarkers and PoseArray messages
markers = ArucoMarkers()
pose_array = PoseArray()
# Set the frame id and timestamp for the markers and pose array
if self.camera_frame == "":
markers.header.frame_id = self.info_msg.header.frame_id
pose_array.header.frame_id = self.info_msg.header.frame_id
else:
markers.header.frame_id = self.camera_frame
pose_array.header.frame_id = self.camera_frame
markers.header.stamp = rgb_msg.header.stamp
pose_array.header.stamp = rgb_msg.header.stamp
# call the pose estimation function
frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image,
aruco_dict=self.aruco_dictionary,
marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
# frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image,
# aruco_detector=self.aruco_detector,
# marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
# distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
# if some markers are detected
if len(markers.marker_ids) > 0:
# Publish the results with the poses and markes positions
self.poses_pub.publish(pose_array)
self.markers_pub.publish(markers)
# publish the image frame with computed markers positions over the image
self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
lc_node = ArucoLcNode()
if lc_node:
executor.add_node(lc_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
lc_node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()