add rbss_aruco_pe skill package

This commit is contained in:
shalenikol 2025-04-08 10:41:00 +03:00
parent 58151ffea8
commit 1c4778e667
10 changed files with 1061 additions and 0 deletions

View file

@ -0,0 +1,51 @@
cmake_minimum_required(VERSION 3.8)
project(rbss_aruco_pe)
# Default to C++17
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# Find dependencies
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclpy REQUIRED)
#find_package(open3d REQUIRED)
#find_package(numpy REQUIRED)
#find_package(cv2 REQUIRED)
#find_package(cv_bridge REQUIRED)
#find_package(tf_transformations REQUIRED)
find_package(rbs_skill_interfaces REQUIRED)
# Install Python modules
ament_python_install_package(${PROJECT_NAME})
# Install Python executables
install(PROGRAMS
scripts/aruco_node_lc.py
DESTINATION lib/${PROJECT_NAME}
)
# Install directories
install(DIRECTORY
launch config
DESTINATION share/${PROJECT_NAME}
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()

View file

@ -0,0 +1,15 @@
/aruco_node:
ros__parameters:
# Aruco detection parameters
marker_size: 0.04 # Size of the markers in meters
aruco_dictionary_id: DICT_6X6_50 # Aruco dictionary type
# Input topics
image_topic: /rgbd_camera/image/image # Input image topic
use_depth_input: false # Use depth image for 3D pose estimation
depth_image_topic: /camera/aligned_depth_to_color/image_raw # Input depth image topic
camera_info_topic: /rgbd_camera/image/camera_info # Input camera info topic with camera intrinsic parameters
camera_frame: camera_color_optical_frame # Camera link frame of reference
# Output topics
detected_markers_topic: /aruco/markers # Output topic with detected markers (aruco poses + ids)
markers_visualization_topic: /aruco/poses # Output topic with visualization of detected markers as pose array
output_image_topic: /aruco/image # Output topic with visualization of detected markers drawn on image

View file

@ -0,0 +1,171 @@
def generate_launch_description():
print("The skill must be launched via interface node")
return []
# # ROS2 imports
# from launch.substitutions import PathJoinSubstitution, LaunchConfiguration
# # from launch.actions import IncludeLaunchDescription
# # from launch.launch_description_sources import PythonLaunchDescriptionSource
# from launch import LaunchDescription
# from launch_ros.actions import Node
# from launch.actions import DeclareLaunchArgument
# # from launch_ros.substitutions import FindPackageShare
# # from launch.conditions import IfCondition, UnlessCondition
# import os
# from ament_index_python.packages import get_package_share_directory
# import yaml
# def generate_launch_description():
# aruco_params_file = os.path.join(
# get_package_share_directory('rbss_aruco_pe'),
# 'config',
# 'aruco_parameters.yaml'
# )
# with open(aruco_params_file, 'r') as file:
# config = yaml.safe_load(file)
# config = config["/aruco_node"]["ros__parameters"]
# # declare configuration parameters
# marker_size_arg = DeclareLaunchArgument(
# name='marker_size',
# default_value=str(config['marker_size']),
# description='Size of the aruco marker in meters',
# )
# aruco_dictionary_id_arg = DeclareLaunchArgument(
# name='aruco_dictionary_id',
# default_value=config['aruco_dictionary_id'],
# description='ID of the aruco dictionary to use',
# )
# image_topic_arg = DeclareLaunchArgument(
# name='image_topic',
# default_value=config['image_topic'],
# description='Name of the image RGB topic to subscribe to',
# )
# use_depth_input_arg = DeclareLaunchArgument(
# name='use_depth_input',
# default_value=str(config['use_depth_input']),
# description='Use depth input for pose estimation',
# choices=['true', 'false', 'True', 'False']
# )
# depth_image_topic_arg = DeclareLaunchArgument(
# name='depth_image_topic',
# default_value=config['depth_image_topic'],
# description='Name of the depth image topic to subscribe to',
# )
# camera_info_topic_arg = DeclareLaunchArgument(
# name='camera_info_topic',
# default_value=config['camera_info_topic'],
# description='Name of the camera info topic to subscribe to',
# )
# camera_frame_arg = DeclareLaunchArgument(
# name='camera_frame',
# default_value=config['camera_frame'],
# description='Name of the camera frame where the estimated pose will be',
# )
# detected_markers_topic_arg = DeclareLaunchArgument(
# name='detected_markers_topic',
# default_value=config['detected_markers_topic'],
# description='Name of the topic to publish the detected markers messages',
# )
# markers_visualization_topic_arg = DeclareLaunchArgument(
# name='markers_visualization_topic',
# default_value=config['markers_visualization_topic'],
# description='Name of the topic to publish the pose array for visualization of the markers',
# )
# output_image_topic_arg = DeclareLaunchArgument(
# name='output_image_topic',
# default_value=config['output_image_topic'],
# description='Name of the topic to publish the image with the detected markers',
# )
# aruco_node = Node(
# package='rbss_aruco_pe',
# executable='aruco_node.py',
# parameters=[{
# "marker_size": LaunchConfiguration('marker_size'),
# "aruco_dictionary_id": LaunchConfiguration('aruco_dictionary_id'),
# "image_topic": LaunchConfiguration('image_topic'),
# "use_depth_input": LaunchConfiguration('use_depth_input'),
# "depth_image_topic": LaunchConfiguration('depth_image_topic'),
# "camera_info_topic": LaunchConfiguration('camera_info_topic'),
# "camera_frame": LaunchConfiguration('camera_frame'),
# "detected_markers_topic": LaunchConfiguration('detected_markers_topic'),
# "markers_visualization_topic": LaunchConfiguration('markers_visualization_topic'),
# "output_image_topic": LaunchConfiguration('output_image_topic'),
# }],
# output='screen',
# emulate_tty=True
# )
# # # launch realsense camera node
# # cam_feed_launch_file = PathJoinSubstitution(
# # [FindPackageShare("realsense2_camera"), "launch", "rs_launch.py"]
# # )
# # camera_feed_depth_node = IncludeLaunchDescription(
# # PythonLaunchDescriptionSource(cam_feed_launch_file),
# # launch_arguments={
# # "pointcloud.enable": "true",
# # "enable_rgbd": "true",
# # "enable_sync": "true",
# # "align_depth.enable": "true",
# # "enable_color": "true",
# # "enable_depth": "true",
# # }.items(),
# # condition=IfCondition(LaunchConfiguration('use_depth_input'))
# # )
# # camera_feed_node = IncludeLaunchDescription(
# # PythonLaunchDescriptionSource(cam_feed_launch_file),
# # launch_arguments={
# # "pointcloud.enable": "true",
# # "enable_color": "true",
# # }.items(),
# # condition=UnlessCondition(LaunchConfiguration('use_depth_input'))
# # )
# # rviz_file = PathJoinSubstitution([
# # FindPackageShare('rbss_aruco_pe'),
# # 'rviz',
# # 'cam_detect.rviz'
# # ])
# # rviz2_node = Node(
# # package='rviz2',
# # executable='rviz2',
# # arguments=['-d', rviz_file]
# # )
# return LaunchDescription([
# # Arguments
# marker_size_arg,
# aruco_dictionary_id_arg,
# image_topic_arg,
# use_depth_input_arg,
# depth_image_topic_arg,
# camera_info_topic_arg,
# camera_frame_arg,
# detected_markers_topic_arg,
# markers_visualization_topic_arg,
# output_image_topic_arg,
# # Nodes
# aruco_node,
# # camera_feed_depth_node,
# # camera_feed_node,
# # rviz2_node
# ])

30
rbss_aruco_pe/package.xml Normal file
View file

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>rbss_aruco_pe</name>
<version>0.0.1</version>
<description>Aruco Marker Tracking and Pose Estimation wrapper for ROS2, using RGB cameras</description>
<maintainer email="shaniks77s@gmail.com">shalenikol</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>ament_cmake_python</buildtool_depend>
<depend>rclpy</depend>
<depend>cv_bridge</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>lifecycle_msgs</depend>
<depend>rbs_skill_interfaces</depend>
<depend>open3d</depend>
<depend>opencv</depend>
<depend>tf_transformations</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View file

@ -0,0 +1,8 @@
<root BTCPP_format="4">
<BehaviorTree ID="Main">
<Sequence>
<Action ID="RbsAction" do="ArucoPE" command="arucoConfig" sid="ArucoPE-example"></Action>
<Action ID="RbsAction" do="ArucoPE" command="arucoStop" sid="ArucoPE-example"></Action>
</Sequence>
</BehaviorTree>
</root>

View file

@ -0,0 +1,81 @@
{
"skills": [
{
"sid": "ArucoPE-example",
"SkillPackage": { "name": "Robossembler", "version": "1.0", "format": "1" },
"Module": { "name": "ArucoPE", "node_name": "aruco_pe", "description": "Aruco Marker Tracking and Pose Estimation wrapper for ROS2, using RGB cameras" },
"Launch": { "package": "rbss_aruco_pe", "executable": "aruco_node_lc.py" },
"BTAction": [
{
"name": "arucoConfig",
"type": "run",
"param": [
{
"type": "topic",
"dependency": {
"type": "topic",
"topicType": "sensor_msgs/msg/Image",
"topicOut": "/rgbd_camera/image/image"
}
},
{
"type": "topic",
"dependency": {
"type": "topic",
"topicType": "sensor_msgs/msg/Image",
"topicOut": "/rgbd_camera/image/depth_image"
}
},
{
"type": "topic",
"dependency": {
"type": "topic",
"topicType": "sensor_msgs/msg/CameraInfo",
"topicOut": "/rgbd_camera/image/camera_info"
}
}
],
"typeAction": "ACTION"
},
{ "name": "arucoStop", "type": "stop", "param": [], "typeAction": "ACTION" }
],
"topicsOut": [
{
"name": "/aruco/markers",
"type": "rbs_skill_interfaces/msg/ArucoMarkers"
},
{
"name": "/aruco/poses",
"type": "geometry_msgs/msg/PoseArray"
},
{
"name": "/aruco/image",
"type": "sensor_msgs/msg/Image"
}
],
"Settings": {
"output": {
"params": [
{
"name": "is_image_mode",
"value": "True"
},
{
"name": "aruco_dictionary_id",
"value": "DICT_6X6_50"
},
{
"name": "marker_size",
"value": "0.04"
},
{
"name": "camera_frame",
"value": ""
}
]
},
"type": "formBuilder"
}
}
]
}

View file

View file

@ -0,0 +1,264 @@
#!/usr/bin/env python3
# Code taken from:
# https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation
# Code taken and readapted from:
# https://github.com/GSNCodes/ArUCo-Markers-Pose-Estimation-Generation-Python/tree/main
# Python imports
import numpy as np
import cv2
import tf_transformations
# ROS2 imports
from rclpy.impl import rcutils_logger
# ROS2 message imports
from geometry_msgs.msg import Pose, PoseArray
from rbs_skill_interfaces.msg import ArucoMarkers
# utils import python code
from rbss_aruco_pe.utils import aruco_display
# def pose_estimation(rgb_frame: np.array, depth_frame: np.array, aruco_detector: cv2.aruco.ArucoDetector, marker_size: float,
# matrix_coefficients: np.array, distortion_coefficients: np.array,
# pose_array: PoseArray, markers: ArucoMarkers) -> list[np.array, PoseArray, ArucoMarkers]:
def pose_estimation(rgb_frame: np.array, depth_frame: np.array, aruco_dict, marker_size: float,
matrix_coefficients: np.array, distortion_coefficients: np.array,
pose_array: PoseArray, markers: ArucoMarkers) -> list[np.array, PoseArray, ArucoMarkers]:
'''
rgb_frame - Frame from the RGB camera stream
depth_frame - Depth frame from the depth camera stream
matrix_coefficients - Intrinsic matrix of the calibrated camera
distortion_coefficients - Distortion coefficients associated with your camera
pose_array - PoseArray message to be published
markers - ArucoMarkers message to be published
return:-
frame - The frame with the axis drawn on it
pose_array - PoseArray with computed poses of the markers
markers - ArucoMarkers message containing markers id number and pose
'''
# old code version
parameters = cv2.aruco.DetectorParameters_create()
corners, marker_ids, _ = cv2.aruco.detectMarkers(rgb_frame, aruco_dict, parameters=parameters)
# new code version of cv2 (4.7.0)
# corners, marker_ids, rejected = aruco_detector.detectMarkers(image=rgb_frame)
frame_processed = rgb_frame
logger = rcutils_logger.RcutilsLogger(name="aruco_node")
# If markers are detected
if len(corners) > 0:
logger.debug(f"Detected {len(corners)} markers.")
for i, marker_id in enumerate(marker_ids):
# Estimate pose of each marker and return the values rvec and tvec
# using deprecated function
# rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(corners=corners[i],
# markerLength=marker_size,
# cameraMatrix=matrix_coefficients,
# distCoeffs=distortion_coefficients)
# tvec = tvec[0]
# alternative code version using solvePnP
tvec, rvec, quat = my_estimatePoseSingleMarkers(corners=corners[i], marker_size=marker_size,
camera_matrix=matrix_coefficients,
distortion=distortion_coefficients)
# show the detected markers bounding boxes
frame_processed = aruco_display(corners=corners, ids=marker_ids,
image=frame_processed)
# draw frame axes
frame_processed = cv2.drawFrameAxes(image=frame_processed, cameraMatrix=matrix_coefficients,
distCoeffs=distortion_coefficients, rvec=rvec, tvec=tvec,
length=0.05, thickness=3)
if (depth_frame is not None):
# get the centroid of the pointcloud
centroid = depth_to_pointcloud_centroid(depth_image=depth_frame,
intrinsic_matrix=matrix_coefficients,
corners=corners[i])
# log comparison between depthcloud centroid and tvec estimated positions
logger.info(f"depthcloud centroid = {centroid}")
logger.info(f"tvec = {tvec[0]} {tvec[1]} {tvec[2]}")
# compute pose from the rvec and tvec arrays
if (depth_frame is not None):
# use computed centroid from depthcloud as estimated pose
pose = Pose()
pose.position.x = float(centroid[0])
pose.position.y = float(centroid[1])
pose.position.z = float(centroid[2])
else:
# use tvec from aruco estimator as estimated pose
pose = Pose()
pose.position.x = float(tvec[0])
pose.position.y = float(tvec[1])
pose.position.z = float(tvec[2])
pose.orientation.x = quat[0]
pose.orientation.y = quat[1]
pose.orientation.z = quat[2]
pose.orientation.w = quat[3]
# add the pose and marker id to the pose_array and markers messages
pose_array.poses.append(pose)
markers.poses.append(pose)
markers.marker_ids.append(marker_id[0])
return frame_processed, pose_array, markers
def my_estimatePoseSingleMarkers(corners, marker_size, camera_matrix, distortion) -> tuple[np.array, np.array, np.array]:
'''
This will estimate the rvec and tvec for each of the marker corners detected by:
corners, ids, rejectedImgPoints = detector.detectMarkers(image)
corners - is an array of detected corners for each detected marker in the image
marker_size - is the size of the detected markers in meters
mtx - is the camera intrinsic matrix
distortion - is the camera distortion matrix
RETURN list of rvecs, tvecs, and trash (so that it corresponds to the old estimatePoseSingleMarkers())
'''
marker_points = np.array([[-marker_size / 2.0, marker_size / 2.0, 0],
[marker_size / 2.0, marker_size / 2.0, 0],
[marker_size / 2.0, -marker_size / 2.0, 0],
[-marker_size / 2.0, -marker_size / 2.0, 0]], dtype=np.float32)
# solvePnP returns the rotation and translation vectors
retval, rvec, tvec = cv2.solvePnP(objectPoints=marker_points, imagePoints=corners,
cameraMatrix=camera_matrix, distCoeffs=distortion, flags=cv2.SOLVEPNP_IPPE_SQUARE)
rvec = rvec.reshape(3, 1)
tvec = tvec.reshape(3, 1)
rot, jacobian = cv2.Rodrigues(rvec)
rot_matrix = np.eye(4, dtype=np.float32)
rot_matrix[0:3, 0:3] = rot
# convert rotation matrix to quaternion
quaternion = tf_transformations.quaternion_from_matrix(rot_matrix)
norm_quat = np.linalg.norm(quaternion)
quaternion = quaternion / norm_quat
return tvec, rvec, quaternion
def depth_to_pointcloud_centroid(depth_image: np.array, intrinsic_matrix: np.array,
corners: np.array) -> np.array:
"""
This function takes a depth image and the corners of a quadrilateral as input,
and returns the centroid of the corresponding pointcloud.
Args:
depth_image: A 2D numpy array representing the depth image.
corners: A list of 4 tuples, each representing the (x, y) coordinates of a corner.
Returns:
A tuple (x, y, z) representing the centroid of the segmented pointcloud.
"""
# Get image parameters
height, width = depth_image.shape
# Check if all corners are within image bounds
# corners has shape (1, 4, 2)
corners_indices = np.array([(int(x), int(y)) for x, y in corners[0]])
for x, y in corners_indices:
if x < 0 or x >= width or y < 0 or y >= height:
raise ValueError("One or more corners are outside the image bounds.")
# bounding box of the polygon
x_min = int(min(corners_indices[:, 0]))
x_max = int(max(corners_indices[:, 0]))
y_min = int(min(corners_indices[:, 1]))
y_max = int(max(corners_indices[:, 1]))
# create array of pixels inside the polygon defined by the corners
# search for pixels inside the squared bounding box of the polygon
points = []
for x in range(x_min, x_max):
for y in range(y_min, y_max):
if is_pixel_in_polygon(pixel=(x, y), corners=corners_indices):
# add point to the list of points
points.append([x, y, depth_image[y, x]])
# Convert points to numpy array
points = np.array(points, dtype=np.uint16)
# convert to open3d image
#depth_segmented = geometry.Image(points)
# create pinhole camera model
#pinhole_matrix = camera.PinholeCameraIntrinsic(width=width, height=height,
# intrinsic_matrix=intrinsic_matrix)
# Convert points to Open3D pointcloud
#pointcloud = geometry.PointCloud.create_from_depth_image(depth=depth_segmented, intrinsic=pinhole_matrix,
# depth_scale=1000.0)
# apply formulas to pointcloud, where
# fx = intrinsic_matrix[0, 0], fy = intrinsic_matrix[1, 1]
# cx = intrinsic_matrix[0, 2], cy = intrinsic_matrix[1, 2],
# u = x, v = y, d = depth_image[y, x], depth_scale = 1000.0,
# z = d / depth_scale
# x = (u - cx) * z / fx
# y = (v - cy) * z / fy
# create pointcloud
pointcloud = []
for x, y, d in points:
z = d / 1000.0
x = (x - intrinsic_matrix[0, 2]) * z / intrinsic_matrix[0, 0]
y = (y - intrinsic_matrix[1, 2]) * z / intrinsic_matrix[1, 1]
pointcloud.append([x, y, z])
# Calculate centroid from pointcloud
centroid = np.mean(np.array(pointcloud, dtype=np.uint16), axis=0)
return centroid
def is_pixel_in_polygon(pixel: tuple, corners: np.array) -> bool:
"""
This function takes a pixel and a list of corners as input, and returns whether the pixel is inside the polygon
defined by the corners. This function uses the ray casting algorithm to determine if the pixel is inside the polygon.
This algorithm works by casting a ray from the pixel in the positive x-direction, and counting the number of times
the ray intersects with the edges of the polygon. If the number of intersections is odd, the pixel is inside the
polygon, otherwise it is outside. This algorithm works for both convex and concave polygons.
Args:
pixel: A tuple (x, y) representing the pixel coordinates.
corners: A list of 4 tuples in a numpy array, each representing the (x, y) coordinates of a corner.
Returns:
A boolean indicating whether the pixel is inside the polygon.
"""
# Initialize counter for number of intersections
num_intersections = 0
# Iterate over each edge of the polygon
for i in range(len(corners)):
x1, y1 = corners[i]
x2, y2 = corners[(i + 1) % len(corners)]
# Check if the pixel is on the same y-level as the edge
if (y1 <= pixel[1] < y2) or (y2 <= pixel[1] < y1):
# Calculate the x-coordinate of the intersection point
x_intersection = (x2 - x1) * (pixel[1] - y1) / (y2 - y1) + x1
# Check if the intersection point is to the right of the pixel
if x_intersection > pixel[0]:
num_intersections += 1
# Return whether the number of intersections is odd
return num_intersections % 2 == 1

View file

@ -0,0 +1,72 @@
#!/usr/bin/env python3
import cv2
from rclpy.impl import rcutils_logger
ARUCO_DICT = {
"DICT_4X4_50": cv2.aruco.DICT_4X4_50,
"DICT_4X4_100": cv2.aruco.DICT_4X4_100,
"DICT_4X4_250": cv2.aruco.DICT_4X4_250,
"DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
"DICT_5X5_50": cv2.aruco.DICT_5X5_50,
"DICT_5X5_100": cv2.aruco.DICT_5X5_100,
"DICT_5X5_250": cv2.aruco.DICT_5X5_250,
"DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
"DICT_6X6_50": cv2.aruco.DICT_6X6_50,
"DICT_6X6_100": cv2.aruco.DICT_6X6_100,
"DICT_6X6_250": cv2.aruco.DICT_6X6_250,
"DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
"DICT_7X7_50": cv2.aruco.DICT_7X7_50,
"DICT_7X7_100": cv2.aruco.DICT_7X7_100,
"DICT_7X7_250": cv2.aruco.DICT_7X7_250,
"DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
"DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL,
"DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5,
"DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9,
"DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10,
"DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11
}
def aruco_display(corners, ids, image):
'''
This function draws the axes on the image for each of the detected markers,
given the rotation and translation vectors of the input camera image frame.
:param corners: corners of the detected marker regions
:param ids: ArUco marker IDs
:param image: input image frame
:return: RGB image with the axes drawn on it
'''
if len(corners) > 0:
# flatten the ArUco IDs list
ids = ids.flatten()
# loop over the detected ArUCo corners
for (markerCorner, markerID) in zip(corners, ids):
# extract the marker corners (which are always returned in
# top-left, top-right, bottom-right, and bottom-left order)
corners = markerCorner.reshape((4, 2))
(topLeft, topRight, bottomRight, bottomLeft) = corners
# convert each of the (x, y)-coordinate pairs to integers
topRight = (int(topRight[0]), int(topRight[1]))
bottomRight = (int(bottomRight[0]), int(bottomRight[1]))
bottomLeft = (int(bottomLeft[0]), int(bottomLeft[1]))
topLeft = (int(topLeft[0]), int(topLeft[1]))
cv2.line(image, topLeft, topRight, (0, 255, 0), 2)
cv2.line(image, topRight, bottomRight, (0, 255, 0), 2)
cv2.line(image, bottomRight, bottomLeft, (0, 255, 0), 2)
cv2.line(image, bottomLeft, topLeft, (0, 255, 0), 2)
# compute and draw the center (x, y)-coordinates of the ArUco
# marker
cX = int((topLeft[0] + bottomRight[0]) / 2.0)
cY = int((topLeft[1] + bottomRight[1]) / 2.0)
cv2.circle(image, (cX, cY), 4, (0, 0, 255), -1)
# draw the ArUco marker ID on the image
cv2.putText(image, str(markerID), (topLeft[0], topLeft[1] - 10), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (0, 255, 0), 2)
logger = rcutils_logger.RcutilsLogger(name="aruco_node")
logger.debug("[Inference] ArUco markers IDs: {}".format(ids))
return image

View file

@ -0,0 +1,369 @@
#!/usr/bin/env python3
"""
Code taken from:
https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation
ROS2 wrapper code taken from:
https://github.com/JMU-ROBOTICS-VIVA/ros2_aruco/tree/main
This node locates Aruco AR markers in images and publishes their ids and poses.
Subscriptions:
/camera/image/image (sensor_msgs.msg.Image)
/camera/image/camera_info (sensor_msgs.msg.CameraInfo)
Published Topics:
/aruco/poses (geometry_msgs.msg.PoseArray)
Pose of all detected markers (suitable for rviz visualization)
/aruco/markers (aruco_interfaces.msg.ArucoMarkers)
Provides an array of all poses along with the corresponding
marker ids.
/aruco/image (sensor_msgs.msg.Image)
Annotated image with marker locations and ids, with markers drawn on it
Parameters:
marker_size - size of the markers in meters (default 0.04)
aruco_dictionary_id - dictionary that was used to generate markers (default DICT_6X6_50)
image_topic - image topic to subscribe to (default /camera/color/image_raw)
camera_info_topic - camera info topic to subscribe to (default /camera/camera_info)
camera_frame - camera optical frame to use (default "camera_depth_optical_frame")
detected_markers_topic - topic to publish detected markers (default /aruco/markers)
markers_visualization_topic - topic to publish markers visualization (default /aruco/poses)
output_image_topic - topic to publish annotated image (default /aruco/image)
Version: 2025-04-08
"""
# ROS2 imports
import rclpy
# import rclpy.node
from rclpy.qos import qos_profile_sensor_data
from cv_bridge import CvBridge
import message_filters
from rclpy.lifecycle import Node
# from rclpy.lifecycle import Publisher
from rclpy.lifecycle import LifecycleState
from rclpy.lifecycle import TransitionCallbackReturn
# Python imports
import numpy as np
import cv2
import json
# Local imports for custom defined functions
from rbss_aruco_pe.utils import ARUCO_DICT
from rbss_aruco_pe.pose_estimation import pose_estimation
# ROS2 message imports
from sensor_msgs.msg import CameraInfo, Image
from geometry_msgs.msg import PoseArray
# from rcl_interfaces.msg import ParameterDescriptor, ParameterType
from rbs_skill_interfaces.msg import ArucoMarkers
NODE_NAME_DEFAULT = "aruco_pe"
PARAM_SKILL_CFG = "aruco_pe_cfg"
DEFAULT_ARUCO_DICTIONARY = "DICT_6X6_50"
DEFAULT_MARKER_SIZE = 0.04
# Input topics
INP_IMAGE = "/camera/image/image"
INP_DEPTH_IMAGE = "/camera/image/depth_image"
INP_CAMERA_INFO = "/camera/image/camera_info"
CAMERA_FRAME = "camera_color_optical_frame" # Camera link frame of reference
# Output topics
OUT_DEFAULT_DETECTED_MARKERS = "/aruco/markers"
OUT_TT_DETECTED_MARKERS = "rbs_skill_interfaces/msg/ArucoMarkers"
OUT_DEFAULT_MARKERS_VISUALISATION = "/aruco/poses"
OUT_TT_MARKERS_VISUALISATION = "geometry_msgs/msg/PoseArray"
OUT_DEFAULT_IMAGE = "/aruco/image"
OUT_TT_IMAGE = "sensor_msgs/msg/Image"
class ArucoLcNode(Node):
"""Our lifecycle node."""
def __init__(self, **kwargs):
# default value
self.dictionary_id_name = DEFAULT_ARUCO_DICTIONARY
self.marker_size = DEFAULT_MARKER_SIZE # Size of the markers in meters
self.use_depth_input = False # Use depth image for 3D pose estimation
self.is_image_mode = False # Publish Image with detected Aruco markers
self.image_topic = INP_IMAGE
self.info_topic = INP_CAMERA_INFO
self.depth_image_topic = INP_DEPTH_IMAGE
self.camera_frame = CAMERA_FRAME
self.detected_markers_topic = OUT_DEFAULT_DETECTED_MARKERS
self.markers_visualization_topic = OUT_DEFAULT_MARKERS_VISUALISATION
self.output_image_topic = OUT_DEFAULT_IMAGE
# for other nodes
kwargs["allow_undeclared_parameters"] = True
kwargs["automatically_declare_parameters_from_overrides"] = True
super().__init__(NODE_NAME_DEFAULT, **kwargs)
str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(str_cfg)
self.nodeName = NODE_NAME_DEFAULT
self.initialize_parameters()
# Make sure we have a valid dictionary id:
try:
dictionary_id = cv2.aruco.__getattribute__(self.dictionary_id_name)
# check if the dictionary_id is a valid dictionary inside ARUCO_DICT values
if dictionary_id not in ARUCO_DICT.values():
raise AttributeError
except AttributeError:
self.get_logger().error(f"bad aruco_dictionary_id: {self.dictionary_id_name}")
options = "\n".join([s for s in ARUCO_DICT])
self.get_logger().error(f"valid options: {options}")
return False
self.aruco_dictionary = cv2.aruco.getPredefinedDictionary(dictionary_id)
self.bridge = CvBridge()
# Set up publishers
self.poses_pub = self.create_lifecycle_publisher(PoseArray, self.markers_visualization_topic, 5)
self.markers_pub = self.create_lifecycle_publisher(ArucoMarkers, self.detected_markers_topic, 5)
self.image_pub = self.create_lifecycle_publisher(Image, self.output_image_topic, 5)
self.clear_param()
def clear_param(self):
# Set up fields for camera parameters
self.info_msg = None
self.intrinsic_mat = None
self.distortion = None
def initialize_parameters(self):
"""
Read parameters from skill config (via interface node)
"""
self._Settings()
for prop in self.skill_cfg["topicsOut"]:
ty = prop["type"]
if ty == OUT_TT_DETECTED_MARKERS:
self.detected_markers_topic = prop["name"]
elif ty == OUT_TT_MARKERS_VISUALISATION:
self.markers_visualization_topic = prop["name"]
elif ty == OUT_TT_IMAGE:
self.output_image_topic = prop["name"]
def _Settings(self):
# Initialization service settings
for prop in self.skill_cfg["Settings"]["output"]["params"]:
nam = prop["name"]
val = prop["value"]
if nam == "aruco_dictionary_id":
self.aruco_dictionary_id = val
elif nam == "marker_size":
self.marker_size = float(val)
elif nam == "camera_frame": # TODO перенести в dependencies для "arucoConfig"
self.camera_frame = val
elif nam == "is_image_mode":
self.is_image_mode = (val == "True")
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
"""
Configure the node, after a configuring transition is requested.
"""
json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(json_param)
for comm in self.skill_cfg["BTAction"]:
for par in comm["param"]:
dependency = par["dependency"]
if par["type"] == "topic":
topic_type = dependency["topicType"]
topic_name = dependency["topicOut"]
if "CameraInfo" in topic_type:
self.info_topic = topic_name
elif "Image" in topic_type:
if "depth" in topic_name:
self.depth_image_topic = topic_name
else:
self.image_topic = topic_name
# elif par["type"] == "formBuilder":
# self.objName = dependency["output"]["object_name"]
# val = dependency["output"]["process"]["selectProcess"]["value"]
# camera info topic for the camera calibration parameters
self.info_sub = self.create_subscription(CameraInfo, self.info_topic, self.info_callback, qos_profile_sensor_data)
self.get_logger().info("on_configure is called.")
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
# select the type of input to use for the pose estimation
if self.use_depth_input:
# use both rgb and depth image topics for the pose estimation
# create a message filter to synchronize the image and depth image topics
self.image_sub = message_filters.Subscriber(self, Image, self.image_topic, qos_profile=qos_profile_sensor_data)
self.depth_image_sub = message_filters.Subscriber(self, Image, self.depth_image_topic, qos_profile=qos_profile_sensor_data)
# create synchronizer between the 2 topics using message filters and approximate time policy
# slop is the maximum time difference between messages that are considered synchronized
self.synchronizer = message_filters.ApproximateTimeSynchronizer(
[self.image_sub, self.depth_image_sub], queue_size=10, slop=0.05
)
self.synchronizer.registerCallback(self.rgb_depth_sync_callback)
else:
# rely only on the rgb image topic for the pose estimation
# create a subscription to the image topic
self.image_sub = self.create_subscription(Image, self.image_topic, self.image_callback, qos_profile_sensor_data)
self.get_logger().info("on_activate is called")
return super().on_activate(state)
def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
if self.use_depth_input:
# TODO not tested
self.destroy_subscription(self.image_sub)
self.destroy_subscription(self.depth_image_sub)
else:
# Destroy the Image subscriber.
self.destroy_subscription(self.image_sub)
self.get_logger().info("on_deactivate is called")
return super().on_deactivate(state)
def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
self.destroy_subscription(self.info_sub)
self.clear_param()
self.get_logger().info("on_cleanup is called")
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info("on_shutdown is called")
return TransitionCallbackReturn.SUCCESS
def info_callback(self, info_msg):
self.info_msg = info_msg
# get the intrinsic matrix and distortion coefficients from the camera info
self.intrinsic_mat = np.reshape(np.array(self.info_msg.k), (3, 3))
self.distortion = np.array(self.info_msg.d)
self.get_logger().info("Camera info received.")
self.get_logger().info(f"Intrinsic matrix: {self.intrinsic_mat}")
self.get_logger().info(f"Distortion coefficients: {self.distortion}")
self.get_logger().info(f"Camera frame: {self.info_msg.width}x{self.info_msg.height}")
# Assume that camera parameters will remain the same...
self.destroy_subscription(self.info_sub)
def image_callback(self, img_msg: Image):
if self.info_msg is None:
self.get_logger().warn("No camera info has been received!")
return
# convert the image messages to cv2 format
cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding="rgb8")
# create the ArucoMarkers and PoseArray messages
markers = ArucoMarkers()
pose_array = PoseArray()
# Set the frame id and timestamp for the markers and pose array
if self.camera_frame == "":
markers.header.frame_id = self.info_msg.header.frame_id
pose_array.header.frame_id = self.info_msg.header.frame_id
else:
markers.header.frame_id = self.camera_frame
pose_array.header.frame_id = self.camera_frame
markers.header.stamp = img_msg.header.stamp
pose_array.header.stamp = img_msg.header.stamp
"""
# OVERRIDE: use calibrated intrinsic matrix and distortion coefficients
self.intrinsic_mat = np.reshape([615.95431, 0., 325.26983,
0., 617.92586, 257.57722,
0., 0., 1.], (3, 3))
self.distortion = np.array([0.142588, -0.311967, 0.003950, -0.006346, 0.000000])
"""
# call the pose estimation function
frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
aruco_dict=self.aruco_dictionary,
marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
# frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
# aruco_detector=self.aruco_detector,
# marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
# distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
# if some markers are detected
if len(markers.marker_ids) > 0:
# Publish the results with the poses and markes positions
self.poses_pub.publish(pose_array)
self.markers_pub.publish(markers)
# publish the image frame with computed markers positions over the image
self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))
def rgb_depth_sync_callback(self, rgb_msg: Image, depth_msg: Image):
# convert the image messages to cv2 format
cv_depth_image = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding="16UC1")
cv_image = self.bridge.imgmsg_to_cv2(rgb_msg, desired_encoding="rgb8")
# create the ArucoMarkers and PoseArray messages
markers = ArucoMarkers()
pose_array = PoseArray()
# Set the frame id and timestamp for the markers and pose array
if self.camera_frame == "":
markers.header.frame_id = self.info_msg.header.frame_id
pose_array.header.frame_id = self.info_msg.header.frame_id
else:
markers.header.frame_id = self.camera_frame
pose_array.header.frame_id = self.camera_frame
markers.header.stamp = rgb_msg.header.stamp
pose_array.header.stamp = rgb_msg.header.stamp
# call the pose estimation function
frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image,
aruco_dict=self.aruco_dictionary,
marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
# frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image,
# aruco_detector=self.aruco_detector,
# marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
# distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
# if some markers are detected
if len(markers.marker_ids) > 0:
# Publish the results with the poses and markes positions
self.poses_pub.publish(pose_array)
self.markers_pub.publish(markers)
# publish the image frame with computed markers positions over the image
self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))
def main():
# rclpy.init()
# node = ArucoNode()
# rclpy.spin(node)
# node.destroy_node()
# rclpy.shutdown()
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
lc_node = ArucoLcNode()
if lc_node:
executor.add_node(lc_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
lc_node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()