From 1c4778e667898494f1ad376fa9667df4ee982564 Mon Sep 17 00:00:00 2001 From: shalenikol Date: Tue, 8 Apr 2025 10:41:00 +0300 Subject: [PATCH] add rbss_aruco_pe skill package --- rbss_aruco_pe/CMakeLists.txt | 51 +++ rbss_aruco_pe/config/aruco_parameters.yaml | 15 + rbss_aruco_pe/launch/rbss_aruco_pe.launch.py | 171 ++++++++ rbss_aruco_pe/package.xml | 30 ++ rbss_aruco_pe/rbs_bt_example.xml | 8 + rbss_aruco_pe/rbs_package.json | 81 ++++ rbss_aruco_pe/rbss_aruco_pe/__init__.py | 0 .../rbss_aruco_pe/pose_estimation.py | 264 +++++++++++++ rbss_aruco_pe/rbss_aruco_pe/utils.py | 72 ++++ rbss_aruco_pe/scripts/aruco_node_lc.py | 369 ++++++++++++++++++ 10 files changed, 1061 insertions(+) create mode 100644 rbss_aruco_pe/CMakeLists.txt create mode 100644 rbss_aruco_pe/config/aruco_parameters.yaml create mode 100644 rbss_aruco_pe/launch/rbss_aruco_pe.launch.py create mode 100644 rbss_aruco_pe/package.xml create mode 100644 rbss_aruco_pe/rbs_bt_example.xml create mode 100644 rbss_aruco_pe/rbs_package.json create mode 100644 rbss_aruco_pe/rbss_aruco_pe/__init__.py create mode 100755 rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py create mode 100755 rbss_aruco_pe/rbss_aruco_pe/utils.py create mode 100755 rbss_aruco_pe/scripts/aruco_node_lc.py diff --git a/rbss_aruco_pe/CMakeLists.txt b/rbss_aruco_pe/CMakeLists.txt new file mode 100644 index 0000000..baf23e3 --- /dev/null +++ b/rbss_aruco_pe/CMakeLists.txt @@ -0,0 +1,51 @@ +cmake_minimum_required(VERSION 3.8) +project(rbss_aruco_pe) + +# Default to C++17 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 17) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# Find dependencies +find_package(ament_cmake REQUIRED) +find_package(ament_cmake_python REQUIRED) +find_package(rclpy REQUIRED) +#find_package(open3d REQUIRED) +#find_package(numpy REQUIRED) +#find_package(cv2 REQUIRED) +#find_package(cv_bridge REQUIRED) +#find_package(tf_transformations REQUIRED) +find_package(rbs_skill_interfaces REQUIRED) + +# Install Python modules +ament_python_install_package(${PROJECT_NAME}) + +# Install Python executables +install(PROGRAMS + scripts/aruco_node_lc.py + DESTINATION lib/${PROJECT_NAME} +) + +# Install directories +install(DIRECTORY + launch config + DESTINATION share/${PROJECT_NAME} +) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + # the following line skips the linter which checks for copyrights + # comment the line when a copyright and license is added to all source files + set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # comment the line when this package is in a git repo and when + # a copyright and license is added to all source files + set(ament_cmake_cpplint_FOUND TRUE) + ament_lint_auto_find_test_dependencies() +endif() + +ament_package() \ No newline at end of file diff --git a/rbss_aruco_pe/config/aruco_parameters.yaml b/rbss_aruco_pe/config/aruco_parameters.yaml new file mode 100644 index 0000000..c49c494 --- /dev/null +++ b/rbss_aruco_pe/config/aruco_parameters.yaml @@ -0,0 +1,15 @@ +/aruco_node: + ros__parameters: + # Aruco detection parameters + marker_size: 0.04 # Size of the markers in meters + aruco_dictionary_id: DICT_6X6_50 # Aruco dictionary type + # Input topics + image_topic: /rgbd_camera/image/image # Input image topic + use_depth_input: false # Use depth image for 3D pose estimation + depth_image_topic: /camera/aligned_depth_to_color/image_raw # Input depth image topic + camera_info_topic: /rgbd_camera/image/camera_info # Input camera info topic with camera intrinsic parameters + camera_frame: camera_color_optical_frame # Camera link frame of reference + # Output topics + detected_markers_topic: /aruco/markers # Output topic with detected markers (aruco poses + ids) + markers_visualization_topic: /aruco/poses # Output topic with visualization of detected markers as pose array + output_image_topic: /aruco/image # Output topic with visualization of detected markers drawn on image diff --git a/rbss_aruco_pe/launch/rbss_aruco_pe.launch.py b/rbss_aruco_pe/launch/rbss_aruco_pe.launch.py new file mode 100644 index 0000000..43d3d29 --- /dev/null +++ b/rbss_aruco_pe/launch/rbss_aruco_pe.launch.py @@ -0,0 +1,171 @@ +def generate_launch_description(): + print("The skill must be launched via interface node") + return [] + +# # ROS2 imports +# from launch.substitutions import PathJoinSubstitution, LaunchConfiguration +# # from launch.actions import IncludeLaunchDescription +# # from launch.launch_description_sources import PythonLaunchDescriptionSource +# from launch import LaunchDescription +# from launch_ros.actions import Node +# from launch.actions import DeclareLaunchArgument +# # from launch_ros.substitutions import FindPackageShare +# # from launch.conditions import IfCondition, UnlessCondition + +# import os +# from ament_index_python.packages import get_package_share_directory +# import yaml + + +# def generate_launch_description(): + +# aruco_params_file = os.path.join( +# get_package_share_directory('rbss_aruco_pe'), +# 'config', +# 'aruco_parameters.yaml' +# ) + +# with open(aruco_params_file, 'r') as file: +# config = yaml.safe_load(file) + +# config = config["/aruco_node"]["ros__parameters"] + +# # declare configuration parameters +# marker_size_arg = DeclareLaunchArgument( +# name='marker_size', +# default_value=str(config['marker_size']), +# description='Size of the aruco marker in meters', +# ) + +# aruco_dictionary_id_arg = DeclareLaunchArgument( +# name='aruco_dictionary_id', +# default_value=config['aruco_dictionary_id'], +# description='ID of the aruco dictionary to use', +# ) + +# image_topic_arg = DeclareLaunchArgument( +# name='image_topic', +# default_value=config['image_topic'], +# description='Name of the image RGB topic to subscribe to', +# ) + +# use_depth_input_arg = DeclareLaunchArgument( +# name='use_depth_input', +# default_value=str(config['use_depth_input']), +# description='Use depth input for pose estimation', +# choices=['true', 'false', 'True', 'False'] +# ) + +# depth_image_topic_arg = DeclareLaunchArgument( +# name='depth_image_topic', +# default_value=config['depth_image_topic'], +# description='Name of the depth image topic to subscribe to', +# ) + +# camera_info_topic_arg = DeclareLaunchArgument( +# name='camera_info_topic', +# default_value=config['camera_info_topic'], +# description='Name of the camera info topic to subscribe to', +# ) + +# camera_frame_arg = DeclareLaunchArgument( +# name='camera_frame', +# default_value=config['camera_frame'], +# description='Name of the camera frame where the estimated pose will be', +# ) + +# detected_markers_topic_arg = DeclareLaunchArgument( +# name='detected_markers_topic', +# default_value=config['detected_markers_topic'], +# description='Name of the topic to publish the detected markers messages', +# ) + +# markers_visualization_topic_arg = DeclareLaunchArgument( +# name='markers_visualization_topic', +# default_value=config['markers_visualization_topic'], +# description='Name of the topic to publish the pose array for visualization of the markers', +# ) + +# output_image_topic_arg = DeclareLaunchArgument( +# name='output_image_topic', +# default_value=config['output_image_topic'], +# description='Name of the topic to publish the image with the detected markers', +# ) + +# aruco_node = Node( +# package='rbss_aruco_pe', +# executable='aruco_node.py', +# parameters=[{ +# "marker_size": LaunchConfiguration('marker_size'), +# "aruco_dictionary_id": LaunchConfiguration('aruco_dictionary_id'), +# "image_topic": LaunchConfiguration('image_topic'), +# "use_depth_input": LaunchConfiguration('use_depth_input'), +# "depth_image_topic": LaunchConfiguration('depth_image_topic'), +# "camera_info_topic": LaunchConfiguration('camera_info_topic'), +# "camera_frame": LaunchConfiguration('camera_frame'), +# "detected_markers_topic": LaunchConfiguration('detected_markers_topic'), +# "markers_visualization_topic": LaunchConfiguration('markers_visualization_topic'), +# "output_image_topic": LaunchConfiguration('output_image_topic'), +# }], +# output='screen', +# emulate_tty=True +# ) + +# # # launch realsense camera node +# # cam_feed_launch_file = PathJoinSubstitution( +# # [FindPackageShare("realsense2_camera"), "launch", "rs_launch.py"] +# # ) + +# # camera_feed_depth_node = IncludeLaunchDescription( +# # PythonLaunchDescriptionSource(cam_feed_launch_file), +# # launch_arguments={ +# # "pointcloud.enable": "true", +# # "enable_rgbd": "true", +# # "enable_sync": "true", +# # "align_depth.enable": "true", +# # "enable_color": "true", +# # "enable_depth": "true", +# # }.items(), +# # condition=IfCondition(LaunchConfiguration('use_depth_input')) +# # ) + +# # camera_feed_node = IncludeLaunchDescription( +# # PythonLaunchDescriptionSource(cam_feed_launch_file), +# # launch_arguments={ +# # "pointcloud.enable": "true", +# # "enable_color": "true", +# # }.items(), +# # condition=UnlessCondition(LaunchConfiguration('use_depth_input')) +# # ) + +# # rviz_file = PathJoinSubstitution([ +# # FindPackageShare('rbss_aruco_pe'), +# # 'rviz', +# # 'cam_detect.rviz' +# # ]) + +# # rviz2_node = Node( +# # package='rviz2', +# # executable='rviz2', +# # arguments=['-d', rviz_file] +# # ) + +# return LaunchDescription([ +# # Arguments +# marker_size_arg, +# aruco_dictionary_id_arg, +# image_topic_arg, +# use_depth_input_arg, +# depth_image_topic_arg, +# camera_info_topic_arg, +# camera_frame_arg, +# detected_markers_topic_arg, +# markers_visualization_topic_arg, +# output_image_topic_arg, + +# # Nodes +# aruco_node, +# # camera_feed_depth_node, +# # camera_feed_node, +# # rviz2_node +# ]) diff --git a/rbss_aruco_pe/package.xml b/rbss_aruco_pe/package.xml new file mode 100644 index 0000000..b70b077 --- /dev/null +++ b/rbss_aruco_pe/package.xml @@ -0,0 +1,30 @@ + + + + rbss_aruco_pe + 0.0.1 + Aruco Marker Tracking and Pose Estimation wrapper for ROS2, using RGB cameras + shalenikol + Apache License 2.0 + + ament_cmake + ament_cmake_python + + rclpy + cv_bridge + std_msgs + sensor_msgs + geometry_msgs + lifecycle_msgs + rbs_skill_interfaces + open3d + opencv + tf_transformations + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/rbss_aruco_pe/rbs_bt_example.xml b/rbss_aruco_pe/rbs_bt_example.xml new file mode 100644 index 0000000..ff82aef --- /dev/null +++ b/rbss_aruco_pe/rbs_bt_example.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/rbss_aruco_pe/rbs_package.json b/rbss_aruco_pe/rbs_package.json new file mode 100644 index 0000000..f8cf87c --- /dev/null +++ b/rbss_aruco_pe/rbs_package.json @@ -0,0 +1,81 @@ +{ + "skills": [ + { + "sid": "ArucoPE-example", + "SkillPackage": { "name": "Robossembler", "version": "1.0", "format": "1" }, + "Module": { "name": "ArucoPE", "node_name": "aruco_pe", "description": "Aruco Marker Tracking and Pose Estimation wrapper for ROS2, using RGB cameras" }, + "Launch": { "package": "rbss_aruco_pe", "executable": "aruco_node_lc.py" }, + "BTAction": [ + { + "name": "arucoConfig", + "type": "run", + "param": [ + { + "type": "topic", + "dependency": { + "type": "topic", + "topicType": "sensor_msgs/msg/Image", + "topicOut": "/rgbd_camera/image/image" + } + }, + { + "type": "topic", + "dependency": { + "type": "topic", + "topicType": "sensor_msgs/msg/Image", + "topicOut": "/rgbd_camera/image/depth_image" + } + }, + { + "type": "topic", + "dependency": { + "type": "topic", + "topicType": "sensor_msgs/msg/CameraInfo", + "topicOut": "/rgbd_camera/image/camera_info" + } + } + ], + "typeAction": "ACTION" + }, + { "name": "arucoStop", "type": "stop", "param": [], "typeAction": "ACTION" } + ], + "topicsOut": [ + { + "name": "/aruco/markers", + "type": "rbs_skill_interfaces/msg/ArucoMarkers" + }, + { + "name": "/aruco/poses", + "type": "geometry_msgs/msg/PoseArray" + }, + { + "name": "/aruco/image", + "type": "sensor_msgs/msg/Image" + } + ], + "Settings": { + "output": { + "params": [ + { + "name": "is_image_mode", + "value": "True" + }, + { + "name": "aruco_dictionary_id", + "value": "DICT_6X6_50" + }, + { + "name": "marker_size", + "value": "0.04" + }, + { + "name": "camera_frame", + "value": "" + } + ] + }, + "type": "formBuilder" + } + } + ] +} diff --git a/rbss_aruco_pe/rbss_aruco_pe/__init__.py b/rbss_aruco_pe/rbss_aruco_pe/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py b/rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py new file mode 100755 index 0000000..11e00e5 --- /dev/null +++ b/rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 + +# Code taken from: +# https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation + +# Code taken and readapted from: +# https://github.com/GSNCodes/ArUCo-Markers-Pose-Estimation-Generation-Python/tree/main + +# Python imports +import numpy as np +import cv2 +import tf_transformations + +# ROS2 imports +from rclpy.impl import rcutils_logger + +# ROS2 message imports +from geometry_msgs.msg import Pose, PoseArray +from rbs_skill_interfaces.msg import ArucoMarkers + +# utils import python code +from rbss_aruco_pe.utils import aruco_display + + +# def pose_estimation(rgb_frame: np.array, depth_frame: np.array, aruco_detector: cv2.aruco.ArucoDetector, marker_size: float, +# matrix_coefficients: np.array, distortion_coefficients: np.array, +# pose_array: PoseArray, markers: ArucoMarkers) -> list[np.array, PoseArray, ArucoMarkers]: +def pose_estimation(rgb_frame: np.array, depth_frame: np.array, aruco_dict, marker_size: float, + matrix_coefficients: np.array, distortion_coefficients: np.array, + pose_array: PoseArray, markers: ArucoMarkers) -> list[np.array, PoseArray, ArucoMarkers]: + ''' + rgb_frame - Frame from the RGB camera stream + depth_frame - Depth frame from the depth camera stream + matrix_coefficients - Intrinsic matrix of the calibrated camera + distortion_coefficients - Distortion coefficients associated with your camera + pose_array - PoseArray message to be published + markers - ArucoMarkers message to be published + + return:- + frame - The frame with the axis drawn on it + pose_array - PoseArray with computed poses of the markers + markers - ArucoMarkers message containing markers id number and pose + ''' + + # old code version + parameters = cv2.aruco.DetectorParameters_create() + corners, marker_ids, _ = cv2.aruco.detectMarkers(rgb_frame, aruco_dict, parameters=parameters) + + # new code version of cv2 (4.7.0) + # corners, marker_ids, rejected = aruco_detector.detectMarkers(image=rgb_frame) + + frame_processed = rgb_frame + logger = rcutils_logger.RcutilsLogger(name="aruco_node") + + # If markers are detected + if len(corners) > 0: + + logger.debug(f"Detected {len(corners)} markers.") + + for i, marker_id in enumerate(marker_ids): + # Estimate pose of each marker and return the values rvec and tvec + + # using deprecated function + # rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(corners=corners[i], + # markerLength=marker_size, + # cameraMatrix=matrix_coefficients, + # distCoeffs=distortion_coefficients) + # tvec = tvec[0] + + # alternative code version using solvePnP + tvec, rvec, quat = my_estimatePoseSingleMarkers(corners=corners[i], marker_size=marker_size, + camera_matrix=matrix_coefficients, + distortion=distortion_coefficients) + + # show the detected markers bounding boxes + frame_processed = aruco_display(corners=corners, ids=marker_ids, + image=frame_processed) + + # draw frame axes + frame_processed = cv2.drawFrameAxes(image=frame_processed, cameraMatrix=matrix_coefficients, + distCoeffs=distortion_coefficients, rvec=rvec, tvec=tvec, + length=0.05, thickness=3) + + if (depth_frame is not None): + # get the centroid of the pointcloud + centroid = depth_to_pointcloud_centroid(depth_image=depth_frame, + intrinsic_matrix=matrix_coefficients, + corners=corners[i]) + + # log comparison between depthcloud centroid and tvec estimated positions + logger.info(f"depthcloud centroid = {centroid}") + logger.info(f"tvec = {tvec[0]} {tvec[1]} {tvec[2]}") + + # compute pose from the rvec and tvec arrays + if (depth_frame is not None): + # use computed centroid from depthcloud as estimated pose + pose = Pose() + pose.position.x = float(centroid[0]) + pose.position.y = float(centroid[1]) + pose.position.z = float(centroid[2]) + else: + # use tvec from aruco estimator as estimated pose + pose = Pose() + pose.position.x = float(tvec[0]) + pose.position.y = float(tvec[1]) + pose.position.z = float(tvec[2]) + + pose.orientation.x = quat[0] + pose.orientation.y = quat[1] + pose.orientation.z = quat[2] + pose.orientation.w = quat[3] + + # add the pose and marker id to the pose_array and markers messages + pose_array.poses.append(pose) + markers.poses.append(pose) + markers.marker_ids.append(marker_id[0]) + + return frame_processed, pose_array, markers + + +def my_estimatePoseSingleMarkers(corners, marker_size, camera_matrix, distortion) -> tuple[np.array, np.array, np.array]: + ''' + This will estimate the rvec and tvec for each of the marker corners detected by: + corners, ids, rejectedImgPoints = detector.detectMarkers(image) + + corners - is an array of detected corners for each detected marker in the image + marker_size - is the size of the detected markers in meters + mtx - is the camera intrinsic matrix + distortion - is the camera distortion matrix + RETURN list of rvecs, tvecs, and trash (so that it corresponds to the old estimatePoseSingleMarkers()) + ''' + marker_points = np.array([[-marker_size / 2.0, marker_size / 2.0, 0], + [marker_size / 2.0, marker_size / 2.0, 0], + [marker_size / 2.0, -marker_size / 2.0, 0], + [-marker_size / 2.0, -marker_size / 2.0, 0]], dtype=np.float32) + + # solvePnP returns the rotation and translation vectors + retval, rvec, tvec = cv2.solvePnP(objectPoints=marker_points, imagePoints=corners, + cameraMatrix=camera_matrix, distCoeffs=distortion, flags=cv2.SOLVEPNP_IPPE_SQUARE) + rvec = rvec.reshape(3, 1) + tvec = tvec.reshape(3, 1) + + rot, jacobian = cv2.Rodrigues(rvec) + rot_matrix = np.eye(4, dtype=np.float32) + rot_matrix[0:3, 0:3] = rot + + # convert rotation matrix to quaternion + quaternion = tf_transformations.quaternion_from_matrix(rot_matrix) + norm_quat = np.linalg.norm(quaternion) + quaternion = quaternion / norm_quat + + return tvec, rvec, quaternion + + +def depth_to_pointcloud_centroid(depth_image: np.array, intrinsic_matrix: np.array, + corners: np.array) -> np.array: + """ + This function takes a depth image and the corners of a quadrilateral as input, + and returns the centroid of the corresponding pointcloud. + + Args: + depth_image: A 2D numpy array representing the depth image. + corners: A list of 4 tuples, each representing the (x, y) coordinates of a corner. + + Returns: + A tuple (x, y, z) representing the centroid of the segmented pointcloud. + """ + + # Get image parameters + height, width = depth_image.shape + + + # Check if all corners are within image bounds + # corners has shape (1, 4, 2) + corners_indices = np.array([(int(x), int(y)) for x, y in corners[0]]) + + for x, y in corners_indices: + if x < 0 or x >= width or y < 0 or y >= height: + raise ValueError("One or more corners are outside the image bounds.") + + # bounding box of the polygon + x_min = int(min(corners_indices[:, 0])) + x_max = int(max(corners_indices[:, 0])) + y_min = int(min(corners_indices[:, 1])) + y_max = int(max(corners_indices[:, 1])) + + # create array of pixels inside the polygon defined by the corners + # search for pixels inside the squared bounding box of the polygon + points = [] + for x in range(x_min, x_max): + for y in range(y_min, y_max): + if is_pixel_in_polygon(pixel=(x, y), corners=corners_indices): + # add point to the list of points + points.append([x, y, depth_image[y, x]]) + + # Convert points to numpy array + points = np.array(points, dtype=np.uint16) + + # convert to open3d image + #depth_segmented = geometry.Image(points) + # create pinhole camera model + #pinhole_matrix = camera.PinholeCameraIntrinsic(width=width, height=height, + # intrinsic_matrix=intrinsic_matrix) + # Convert points to Open3D pointcloud + #pointcloud = geometry.PointCloud.create_from_depth_image(depth=depth_segmented, intrinsic=pinhole_matrix, + # depth_scale=1000.0) + + # apply formulas to pointcloud, where + # fx = intrinsic_matrix[0, 0], fy = intrinsic_matrix[1, 1] + # cx = intrinsic_matrix[0, 2], cy = intrinsic_matrix[1, 2], + # u = x, v = y, d = depth_image[y, x], depth_scale = 1000.0, + # z = d / depth_scale + # x = (u - cx) * z / fx + # y = (v - cy) * z / fy + + # create pointcloud + pointcloud = [] + for x, y, d in points: + z = d / 1000.0 + x = (x - intrinsic_matrix[0, 2]) * z / intrinsic_matrix[0, 0] + y = (y - intrinsic_matrix[1, 2]) * z / intrinsic_matrix[1, 1] + pointcloud.append([x, y, z]) + + # Calculate centroid from pointcloud + centroid = np.mean(np.array(pointcloud, dtype=np.uint16), axis=0) + + return centroid + + +def is_pixel_in_polygon(pixel: tuple, corners: np.array) -> bool: + """ + This function takes a pixel and a list of corners as input, and returns whether the pixel is inside the polygon + defined by the corners. This function uses the ray casting algorithm to determine if the pixel is inside the polygon. + This algorithm works by casting a ray from the pixel in the positive x-direction, and counting the number of times + the ray intersects with the edges of the polygon. If the number of intersections is odd, the pixel is inside the + polygon, otherwise it is outside. This algorithm works for both convex and concave polygons. + + Args: + pixel: A tuple (x, y) representing the pixel coordinates. + corners: A list of 4 tuples in a numpy array, each representing the (x, y) coordinates of a corner. + + Returns: + A boolean indicating whether the pixel is inside the polygon. + """ + + # Initialize counter for number of intersections + num_intersections = 0 + + # Iterate over each edge of the polygon + for i in range(len(corners)): + x1, y1 = corners[i] + x2, y2 = corners[(i + 1) % len(corners)] + + # Check if the pixel is on the same y-level as the edge + if (y1 <= pixel[1] < y2) or (y2 <= pixel[1] < y1): + # Calculate the x-coordinate of the intersection point + x_intersection = (x2 - x1) * (pixel[1] - y1) / (y2 - y1) + x1 + + # Check if the intersection point is to the right of the pixel + if x_intersection > pixel[0]: + num_intersections += 1 + + # Return whether the number of intersections is odd + return num_intersections % 2 == 1 diff --git a/rbss_aruco_pe/rbss_aruco_pe/utils.py b/rbss_aruco_pe/rbss_aruco_pe/utils.py new file mode 100755 index 0000000..281d940 --- /dev/null +++ b/rbss_aruco_pe/rbss_aruco_pe/utils.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +import cv2 +from rclpy.impl import rcutils_logger + +ARUCO_DICT = { + "DICT_4X4_50": cv2.aruco.DICT_4X4_50, + "DICT_4X4_100": cv2.aruco.DICT_4X4_100, + "DICT_4X4_250": cv2.aruco.DICT_4X4_250, + "DICT_4X4_1000": cv2.aruco.DICT_4X4_1000, + "DICT_5X5_50": cv2.aruco.DICT_5X5_50, + "DICT_5X5_100": cv2.aruco.DICT_5X5_100, + "DICT_5X5_250": cv2.aruco.DICT_5X5_250, + "DICT_5X5_1000": cv2.aruco.DICT_5X5_1000, + "DICT_6X6_50": cv2.aruco.DICT_6X6_50, + "DICT_6X6_100": cv2.aruco.DICT_6X6_100, + "DICT_6X6_250": cv2.aruco.DICT_6X6_250, + "DICT_6X6_1000": cv2.aruco.DICT_6X6_1000, + "DICT_7X7_50": cv2.aruco.DICT_7X7_50, + "DICT_7X7_100": cv2.aruco.DICT_7X7_100, + "DICT_7X7_250": cv2.aruco.DICT_7X7_250, + "DICT_7X7_1000": cv2.aruco.DICT_7X7_1000, + "DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL, + "DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5, + "DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9, + "DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10, + "DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11 +} + + +def aruco_display(corners, ids, image): + ''' + This function draws the axes on the image for each of the detected markers, + given the rotation and translation vectors of the input camera image frame. + :param corners: corners of the detected marker regions + :param ids: ArUco marker IDs + :param image: input image frame + :return: RGB image with the axes drawn on it + ''' + if len(corners) > 0: + # flatten the ArUco IDs list + ids = ids.flatten() + # loop over the detected ArUCo corners + for (markerCorner, markerID) in zip(corners, ids): + # extract the marker corners (which are always returned in + # top-left, top-right, bottom-right, and bottom-left order) + corners = markerCorner.reshape((4, 2)) + (topLeft, topRight, bottomRight, bottomLeft) = corners + # convert each of the (x, y)-coordinate pairs to integers + topRight = (int(topRight[0]), int(topRight[1])) + bottomRight = (int(bottomRight[0]), int(bottomRight[1])) + bottomLeft = (int(bottomLeft[0]), int(bottomLeft[1])) + topLeft = (int(topLeft[0]), int(topLeft[1])) + + cv2.line(image, topLeft, topRight, (0, 255, 0), 2) + cv2.line(image, topRight, bottomRight, (0, 255, 0), 2) + cv2.line(image, bottomRight, bottomLeft, (0, 255, 0), 2) + cv2.line(image, bottomLeft, topLeft, (0, 255, 0), 2) + # compute and draw the center (x, y)-coordinates of the ArUco + # marker + cX = int((topLeft[0] + bottomRight[0]) / 2.0) + cY = int((topLeft[1] + bottomRight[1]) / 2.0) + cv2.circle(image, (cX, cY), 4, (0, 0, 255), -1) + # draw the ArUco marker ID on the image + cv2.putText(image, str(markerID), (topLeft[0], topLeft[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, + 0.5, (0, 255, 0), 2) + + logger = rcutils_logger.RcutilsLogger(name="aruco_node") + logger.debug("[Inference] ArUco markers IDs: {}".format(ids)) + + return image + diff --git a/rbss_aruco_pe/scripts/aruco_node_lc.py b/rbss_aruco_pe/scripts/aruco_node_lc.py new file mode 100755 index 0000000..a5088a6 --- /dev/null +++ b/rbss_aruco_pe/scripts/aruco_node_lc.py @@ -0,0 +1,369 @@ +#!/usr/bin/env python3 +""" +Code taken from: +https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation +ROS2 wrapper code taken from: +https://github.com/JMU-ROBOTICS-VIVA/ros2_aruco/tree/main + +This node locates Aruco AR markers in images and publishes their ids and poses. + +Subscriptions: + /camera/image/image (sensor_msgs.msg.Image) + /camera/image/camera_info (sensor_msgs.msg.CameraInfo) + +Published Topics: + /aruco/poses (geometry_msgs.msg.PoseArray) + Pose of all detected markers (suitable for rviz visualization) + + /aruco/markers (aruco_interfaces.msg.ArucoMarkers) + Provides an array of all poses along with the corresponding + marker ids. + + /aruco/image (sensor_msgs.msg.Image) + Annotated image with marker locations and ids, with markers drawn on it + +Parameters: + marker_size - size of the markers in meters (default 0.04) + aruco_dictionary_id - dictionary that was used to generate markers (default DICT_6X6_50) + image_topic - image topic to subscribe to (default /camera/color/image_raw) + camera_info_topic - camera info topic to subscribe to (default /camera/camera_info) + camera_frame - camera optical frame to use (default "camera_depth_optical_frame") + detected_markers_topic - topic to publish detected markers (default /aruco/markers) + markers_visualization_topic - topic to publish markers visualization (default /aruco/poses) + output_image_topic - topic to publish annotated image (default /aruco/image) + +Version: 2025-04-08 +""" + +# ROS2 imports +import rclpy +# import rclpy.node +from rclpy.qos import qos_profile_sensor_data +from cv_bridge import CvBridge +import message_filters +from rclpy.lifecycle import Node +# from rclpy.lifecycle import Publisher +from rclpy.lifecycle import LifecycleState +from rclpy.lifecycle import TransitionCallbackReturn + +# Python imports +import numpy as np +import cv2 +import json + +# Local imports for custom defined functions +from rbss_aruco_pe.utils import ARUCO_DICT +from rbss_aruco_pe.pose_estimation import pose_estimation + +# ROS2 message imports +from sensor_msgs.msg import CameraInfo, Image +from geometry_msgs.msg import PoseArray +# from rcl_interfaces.msg import ParameterDescriptor, ParameterType +from rbs_skill_interfaces.msg import ArucoMarkers + +NODE_NAME_DEFAULT = "aruco_pe" +PARAM_SKILL_CFG = "aruco_pe_cfg" + +DEFAULT_ARUCO_DICTIONARY = "DICT_6X6_50" +DEFAULT_MARKER_SIZE = 0.04 +# Input topics +INP_IMAGE = "/camera/image/image" +INP_DEPTH_IMAGE = "/camera/image/depth_image" +INP_CAMERA_INFO = "/camera/image/camera_info" +CAMERA_FRAME = "camera_color_optical_frame" # Camera link frame of reference +# Output topics +OUT_DEFAULT_DETECTED_MARKERS = "/aruco/markers" +OUT_TT_DETECTED_MARKERS = "rbs_skill_interfaces/msg/ArucoMarkers" +OUT_DEFAULT_MARKERS_VISUALISATION = "/aruco/poses" +OUT_TT_MARKERS_VISUALISATION = "geometry_msgs/msg/PoseArray" +OUT_DEFAULT_IMAGE = "/aruco/image" +OUT_TT_IMAGE = "sensor_msgs/msg/Image" + +class ArucoLcNode(Node): + """Our lifecycle node.""" + def __init__(self, **kwargs): + # default value + self.dictionary_id_name = DEFAULT_ARUCO_DICTIONARY + self.marker_size = DEFAULT_MARKER_SIZE # Size of the markers in meters + self.use_depth_input = False # Use depth image for 3D pose estimation + self.is_image_mode = False # Publish Image with detected Aruco markers + + self.image_topic = INP_IMAGE + self.info_topic = INP_CAMERA_INFO + self.depth_image_topic = INP_DEPTH_IMAGE + self.camera_frame = CAMERA_FRAME + + self.detected_markers_topic = OUT_DEFAULT_DETECTED_MARKERS + self.markers_visualization_topic = OUT_DEFAULT_MARKERS_VISUALISATION + self.output_image_topic = OUT_DEFAULT_IMAGE + + # for other nodes + kwargs["allow_undeclared_parameters"] = True + kwargs["automatically_declare_parameters_from_overrides"] = True + super().__init__(NODE_NAME_DEFAULT, **kwargs) + str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value + + self.skill_cfg = json.loads(str_cfg) + self.nodeName = NODE_NAME_DEFAULT + + self.initialize_parameters() + + # Make sure we have a valid dictionary id: + try: + dictionary_id = cv2.aruco.__getattribute__(self.dictionary_id_name) + # check if the dictionary_id is a valid dictionary inside ARUCO_DICT values + if dictionary_id not in ARUCO_DICT.values(): + raise AttributeError + except AttributeError: + self.get_logger().error(f"bad aruco_dictionary_id: {self.dictionary_id_name}") + options = "\n".join([s for s in ARUCO_DICT]) + self.get_logger().error(f"valid options: {options}") + return False + + self.aruco_dictionary = cv2.aruco.getPredefinedDictionary(dictionary_id) + + self.bridge = CvBridge() + + # Set up publishers + self.poses_pub = self.create_lifecycle_publisher(PoseArray, self.markers_visualization_topic, 5) + self.markers_pub = self.create_lifecycle_publisher(ArucoMarkers, self.detected_markers_topic, 5) + self.image_pub = self.create_lifecycle_publisher(Image, self.output_image_topic, 5) + + self.clear_param() + + def clear_param(self): + # Set up fields for camera parameters + self.info_msg = None + self.intrinsic_mat = None + self.distortion = None + + def initialize_parameters(self): + """ + Read parameters from skill config (via interface node) + """ + self._Settings() + for prop in self.skill_cfg["topicsOut"]: + ty = prop["type"] + if ty == OUT_TT_DETECTED_MARKERS: + self.detected_markers_topic = prop["name"] + elif ty == OUT_TT_MARKERS_VISUALISATION: + self.markers_visualization_topic = prop["name"] + elif ty == OUT_TT_IMAGE: + self.output_image_topic = prop["name"] + + def _Settings(self): + # Initialization service settings + for prop in self.skill_cfg["Settings"]["output"]["params"]: + nam = prop["name"] + val = prop["value"] + if nam == "aruco_dictionary_id": + self.aruco_dictionary_id = val + elif nam == "marker_size": + self.marker_size = float(val) + elif nam == "camera_frame": # TODO перенести в dependencies для "arucoConfig" + self.camera_frame = val + elif nam == "is_image_mode": + self.is_image_mode = (val == "True") + + def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn: + """ + Configure the node, after a configuring transition is requested. + """ + json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value + self.skill_cfg = json.loads(json_param) + for comm in self.skill_cfg["BTAction"]: + for par in comm["param"]: + dependency = par["dependency"] + if par["type"] == "topic": + topic_type = dependency["topicType"] + topic_name = dependency["topicOut"] + if "CameraInfo" in topic_type: + self.info_topic = topic_name + elif "Image" in topic_type: + if "depth" in topic_name: + self.depth_image_topic = topic_name + else: + self.image_topic = topic_name + # elif par["type"] == "formBuilder": + # self.objName = dependency["output"]["object_name"] + # val = dependency["output"]["process"]["selectProcess"]["value"] + + # camera info topic for the camera calibration parameters + self.info_sub = self.create_subscription(CameraInfo, self.info_topic, self.info_callback, qos_profile_sensor_data) + + self.get_logger().info("on_configure is called.") + return TransitionCallbackReturn.SUCCESS + + def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn: + # select the type of input to use for the pose estimation + if self.use_depth_input: + # use both rgb and depth image topics for the pose estimation + + # create a message filter to synchronize the image and depth image topics + self.image_sub = message_filters.Subscriber(self, Image, self.image_topic, qos_profile=qos_profile_sensor_data) + self.depth_image_sub = message_filters.Subscriber(self, Image, self.depth_image_topic, qos_profile=qos_profile_sensor_data) + + # create synchronizer between the 2 topics using message filters and approximate time policy + # slop is the maximum time difference between messages that are considered synchronized + self.synchronizer = message_filters.ApproximateTimeSynchronizer( + [self.image_sub, self.depth_image_sub], queue_size=10, slop=0.05 + ) + self.synchronizer.registerCallback(self.rgb_depth_sync_callback) + else: + # rely only on the rgb image topic for the pose estimation + + # create a subscription to the image topic + self.image_sub = self.create_subscription(Image, self.image_topic, self.image_callback, qos_profile_sensor_data) + + self.get_logger().info("on_activate is called") + return super().on_activate(state) + + def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn: + if self.use_depth_input: + # TODO not tested + self.destroy_subscription(self.image_sub) + self.destroy_subscription(self.depth_image_sub) + else: + # Destroy the Image subscriber. + self.destroy_subscription(self.image_sub) + + self.get_logger().info("on_deactivate is called") + return super().on_deactivate(state) + + def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn: + self.destroy_subscription(self.info_sub) + + self.clear_param() + + self.get_logger().info("on_cleanup is called") + return TransitionCallbackReturn.SUCCESS + + def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn: + self.get_logger().info("on_shutdown is called") + return TransitionCallbackReturn.SUCCESS + + def info_callback(self, info_msg): + self.info_msg = info_msg + # get the intrinsic matrix and distortion coefficients from the camera info + self.intrinsic_mat = np.reshape(np.array(self.info_msg.k), (3, 3)) + self.distortion = np.array(self.info_msg.d) + + self.get_logger().info("Camera info received.") + self.get_logger().info(f"Intrinsic matrix: {self.intrinsic_mat}") + self.get_logger().info(f"Distortion coefficients: {self.distortion}") + self.get_logger().info(f"Camera frame: {self.info_msg.width}x{self.info_msg.height}") + + # Assume that camera parameters will remain the same... + self.destroy_subscription(self.info_sub) + + def image_callback(self, img_msg: Image): + if self.info_msg is None: + self.get_logger().warn("No camera info has been received!") + return + + # convert the image messages to cv2 format + cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding="rgb8") + + # create the ArucoMarkers and PoseArray messages + markers = ArucoMarkers() + pose_array = PoseArray() + + # Set the frame id and timestamp for the markers and pose array + if self.camera_frame == "": + markers.header.frame_id = self.info_msg.header.frame_id + pose_array.header.frame_id = self.info_msg.header.frame_id + else: + markers.header.frame_id = self.camera_frame + pose_array.header.frame_id = self.camera_frame + + markers.header.stamp = img_msg.header.stamp + pose_array.header.stamp = img_msg.header.stamp + + """ + # OVERRIDE: use calibrated intrinsic matrix and distortion coefficients + self.intrinsic_mat = np.reshape([615.95431, 0., 325.26983, + 0., 617.92586, 257.57722, + 0., 0., 1.], (3, 3)) + self.distortion = np.array([0.142588, -0.311967, 0.003950, -0.006346, 0.000000]) + """ + + # call the pose estimation function + frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None, + aruco_dict=self.aruco_dictionary, + marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat, + distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers) + # frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None, + # aruco_detector=self.aruco_detector, + # marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat, + # distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers) + + # if some markers are detected + if len(markers.marker_ids) > 0: + # Publish the results with the poses and markes positions + self.poses_pub.publish(pose_array) + self.markers_pub.publish(markers) + + # publish the image frame with computed markers positions over the image + self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8")) + + def rgb_depth_sync_callback(self, rgb_msg: Image, depth_msg: Image): + + # convert the image messages to cv2 format + cv_depth_image = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding="16UC1") + cv_image = self.bridge.imgmsg_to_cv2(rgb_msg, desired_encoding="rgb8") + + # create the ArucoMarkers and PoseArray messages + markers = ArucoMarkers() + pose_array = PoseArray() + + # Set the frame id and timestamp for the markers and pose array + if self.camera_frame == "": + markers.header.frame_id = self.info_msg.header.frame_id + pose_array.header.frame_id = self.info_msg.header.frame_id + else: + markers.header.frame_id = self.camera_frame + pose_array.header.frame_id = self.camera_frame + + markers.header.stamp = rgb_msg.header.stamp + pose_array.header.stamp = rgb_msg.header.stamp + + # call the pose estimation function + frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image, + aruco_dict=self.aruco_dictionary, + marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat, + distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers) + # frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image, + # aruco_detector=self.aruco_detector, + # marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat, + # distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers) + + # if some markers are detected + if len(markers.marker_ids) > 0: + # Publish the results with the poses and markes positions + self.poses_pub.publish(pose_array) + self.markers_pub.publish(markers) + + # publish the image frame with computed markers positions over the image + self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8")) + +def main(): + # rclpy.init() + # node = ArucoNode() + # rclpy.spin(node) + + # node.destroy_node() + # rclpy.shutdown() + rclpy.init() + + executor = rclpy.executors.SingleThreadedExecutor() + lc_node = ArucoLcNode() + if lc_node: + executor.add_node(lc_node) + try: + executor.spin() + except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): + lc_node.destroy_node() + rclpy.shutdown() + +if __name__ == "__main__": + main()