diff --git a/rbs_skill_interfaces/CMakeLists.txt b/rbs_skill_interfaces/CMakeLists.txt
index cb29df5..4eb9943 100644
--- a/rbs_skill_interfaces/CMakeLists.txt
+++ b/rbs_skill_interfaces/CMakeLists.txt
@@ -28,6 +28,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/ActionFeedbackStatusConstants.msg"
"msg/ActionResultStatusConstants.msg"
"msg/BoundBox.msg"
+ "msg/ArucoMarkers.msg"
"srv/DetectObject.srv"
"srv/BtInit.srv"
"srv/AssembleState.srv"
diff --git a/rbs_skill_interfaces/msg/ArucoMarkers.msg b/rbs_skill_interfaces/msg/ArucoMarkers.msg
new file mode 100644
index 0000000..184003d
--- /dev/null
+++ b/rbs_skill_interfaces/msg/ArucoMarkers.msg
@@ -0,0 +1,5 @@
+
+std_msgs/Header header
+
+int64[] marker_ids
+geometry_msgs/Pose[] poses
diff --git a/rbss_aruco_pe/CMakeLists.txt b/rbss_aruco_pe/CMakeLists.txt
new file mode 100644
index 0000000..baf23e3
--- /dev/null
+++ b/rbss_aruco_pe/CMakeLists.txt
@@ -0,0 +1,51 @@
+cmake_minimum_required(VERSION 3.8)
+project(rbss_aruco_pe)
+
+# Default to C++17
+if(NOT CMAKE_CXX_STANDARD)
+ set(CMAKE_CXX_STANDARD 17)
+endif()
+
+if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
+ add_compile_options(-Wall -Wextra -Wpedantic)
+endif()
+
+# Find dependencies
+find_package(ament_cmake REQUIRED)
+find_package(ament_cmake_python REQUIRED)
+find_package(rclpy REQUIRED)
+#find_package(open3d REQUIRED)
+#find_package(numpy REQUIRED)
+#find_package(cv2 REQUIRED)
+#find_package(cv_bridge REQUIRED)
+#find_package(tf_transformations REQUIRED)
+find_package(rbs_skill_interfaces REQUIRED)
+
+# Install Python modules
+ament_python_install_package(${PROJECT_NAME})
+
+# Install Python executables
+install(PROGRAMS
+ scripts/aruco_node_lc.py
+ DESTINATION lib/${PROJECT_NAME}
+)
+
+# Install directories
+install(DIRECTORY
+ launch config
+ DESTINATION share/${PROJECT_NAME}
+)
+
+if(BUILD_TESTING)
+ find_package(ament_lint_auto REQUIRED)
+ # the following line skips the linter which checks for copyrights
+ # comment the line when a copyright and license is added to all source files
+ set(ament_cmake_copyright_FOUND TRUE)
+ # the following line skips cpplint (only works in a git repo)
+ # comment the line when this package is in a git repo and when
+ # a copyright and license is added to all source files
+ set(ament_cmake_cpplint_FOUND TRUE)
+ ament_lint_auto_find_test_dependencies()
+endif()
+
+ament_package()
\ No newline at end of file
diff --git a/rbss_aruco_pe/config/aruco_parameters.yaml b/rbss_aruco_pe/config/aruco_parameters.yaml
new file mode 100644
index 0000000..c49c494
--- /dev/null
+++ b/rbss_aruco_pe/config/aruco_parameters.yaml
@@ -0,0 +1,15 @@
+/aruco_node:
+ ros__parameters:
+ # Aruco detection parameters
+ marker_size: 0.04 # Size of the markers in meters
+ aruco_dictionary_id: DICT_6X6_50 # Aruco dictionary type
+ # Input topics
+ image_topic: /rgbd_camera/image/image # Input image topic
+ use_depth_input: false # Use depth image for 3D pose estimation
+ depth_image_topic: /camera/aligned_depth_to_color/image_raw # Input depth image topic
+ camera_info_topic: /rgbd_camera/image/camera_info # Input camera info topic with camera intrinsic parameters
+ camera_frame: camera_color_optical_frame # Camera link frame of reference
+ # Output topics
+ detected_markers_topic: /aruco/markers # Output topic with detected markers (aruco poses + ids)
+ markers_visualization_topic: /aruco/poses # Output topic with visualization of detected markers as pose array
+ output_image_topic: /aruco/image # Output topic with visualization of detected markers drawn on image
diff --git a/rbss_aruco_pe/launch/rbss_aruco_pe.launch.py b/rbss_aruco_pe/launch/rbss_aruco_pe.launch.py
new file mode 100644
index 0000000..43d3d29
--- /dev/null
+++ b/rbss_aruco_pe/launch/rbss_aruco_pe.launch.py
@@ -0,0 +1,171 @@
+def generate_launch_description():
+ print("The skill must be launched via interface node")
+ return []
+
+# # ROS2 imports
+# from launch.substitutions import PathJoinSubstitution, LaunchConfiguration
+# # from launch.actions import IncludeLaunchDescription
+# # from launch.launch_description_sources import PythonLaunchDescriptionSource
+# from launch import LaunchDescription
+# from launch_ros.actions import Node
+# from launch.actions import DeclareLaunchArgument
+# # from launch_ros.substitutions import FindPackageShare
+# # from launch.conditions import IfCondition, UnlessCondition
+
+# import os
+# from ament_index_python.packages import get_package_share_directory
+# import yaml
+
+
+# def generate_launch_description():
+
+# aruco_params_file = os.path.join(
+# get_package_share_directory('rbss_aruco_pe'),
+# 'config',
+# 'aruco_parameters.yaml'
+# )
+
+# with open(aruco_params_file, 'r') as file:
+# config = yaml.safe_load(file)
+
+# config = config["/aruco_node"]["ros__parameters"]
+
+# # declare configuration parameters
+# marker_size_arg = DeclareLaunchArgument(
+# name='marker_size',
+# default_value=str(config['marker_size']),
+# description='Size of the aruco marker in meters',
+# )
+
+# aruco_dictionary_id_arg = DeclareLaunchArgument(
+# name='aruco_dictionary_id',
+# default_value=config['aruco_dictionary_id'],
+# description='ID of the aruco dictionary to use',
+# )
+
+# image_topic_arg = DeclareLaunchArgument(
+# name='image_topic',
+# default_value=config['image_topic'],
+# description='Name of the image RGB topic to subscribe to',
+# )
+
+# use_depth_input_arg = DeclareLaunchArgument(
+# name='use_depth_input',
+# default_value=str(config['use_depth_input']),
+# description='Use depth input for pose estimation',
+# choices=['true', 'false', 'True', 'False']
+# )
+
+# depth_image_topic_arg = DeclareLaunchArgument(
+# name='depth_image_topic',
+# default_value=config['depth_image_topic'],
+# description='Name of the depth image topic to subscribe to',
+# )
+
+# camera_info_topic_arg = DeclareLaunchArgument(
+# name='camera_info_topic',
+# default_value=config['camera_info_topic'],
+# description='Name of the camera info topic to subscribe to',
+# )
+
+# camera_frame_arg = DeclareLaunchArgument(
+# name='camera_frame',
+# default_value=config['camera_frame'],
+# description='Name of the camera frame where the estimated pose will be',
+# )
+
+# detected_markers_topic_arg = DeclareLaunchArgument(
+# name='detected_markers_topic',
+# default_value=config['detected_markers_topic'],
+# description='Name of the topic to publish the detected markers messages',
+# )
+
+# markers_visualization_topic_arg = DeclareLaunchArgument(
+# name='markers_visualization_topic',
+# default_value=config['markers_visualization_topic'],
+# description='Name of the topic to publish the pose array for visualization of the markers',
+# )
+
+# output_image_topic_arg = DeclareLaunchArgument(
+# name='output_image_topic',
+# default_value=config['output_image_topic'],
+# description='Name of the topic to publish the image with the detected markers',
+# )
+
+# aruco_node = Node(
+# package='rbss_aruco_pe',
+# executable='aruco_node.py',
+# parameters=[{
+# "marker_size": LaunchConfiguration('marker_size'),
+# "aruco_dictionary_id": LaunchConfiguration('aruco_dictionary_id'),
+# "image_topic": LaunchConfiguration('image_topic'),
+# "use_depth_input": LaunchConfiguration('use_depth_input'),
+# "depth_image_topic": LaunchConfiguration('depth_image_topic'),
+# "camera_info_topic": LaunchConfiguration('camera_info_topic'),
+# "camera_frame": LaunchConfiguration('camera_frame'),
+# "detected_markers_topic": LaunchConfiguration('detected_markers_topic'),
+# "markers_visualization_topic": LaunchConfiguration('markers_visualization_topic'),
+# "output_image_topic": LaunchConfiguration('output_image_topic'),
+# }],
+# output='screen',
+# emulate_tty=True
+# )
+
+# # # launch realsense camera node
+# # cam_feed_launch_file = PathJoinSubstitution(
+# # [FindPackageShare("realsense2_camera"), "launch", "rs_launch.py"]
+# # )
+
+# # camera_feed_depth_node = IncludeLaunchDescription(
+# # PythonLaunchDescriptionSource(cam_feed_launch_file),
+# # launch_arguments={
+# # "pointcloud.enable": "true",
+# # "enable_rgbd": "true",
+# # "enable_sync": "true",
+# # "align_depth.enable": "true",
+# # "enable_color": "true",
+# # "enable_depth": "true",
+# # }.items(),
+# # condition=IfCondition(LaunchConfiguration('use_depth_input'))
+# # )
+
+# # camera_feed_node = IncludeLaunchDescription(
+# # PythonLaunchDescriptionSource(cam_feed_launch_file),
+# # launch_arguments={
+# # "pointcloud.enable": "true",
+# # "enable_color": "true",
+# # }.items(),
+# # condition=UnlessCondition(LaunchConfiguration('use_depth_input'))
+# # )
+
+# # rviz_file = PathJoinSubstitution([
+# # FindPackageShare('rbss_aruco_pe'),
+# # 'rviz',
+# # 'cam_detect.rviz'
+# # ])
+
+# # rviz2_node = Node(
+# # package='rviz2',
+# # executable='rviz2',
+# # arguments=['-d', rviz_file]
+# # )
+
+# return LaunchDescription([
+# # Arguments
+# marker_size_arg,
+# aruco_dictionary_id_arg,
+# image_topic_arg,
+# use_depth_input_arg,
+# depth_image_topic_arg,
+# camera_info_topic_arg,
+# camera_frame_arg,
+# detected_markers_topic_arg,
+# markers_visualization_topic_arg,
+# output_image_topic_arg,
+
+# # Nodes
+# aruco_node,
+# # camera_feed_depth_node,
+# # camera_feed_node,
+# # rviz2_node
+# ])
diff --git a/rbss_aruco_pe/package.xml b/rbss_aruco_pe/package.xml
new file mode 100644
index 0000000..b70b077
--- /dev/null
+++ b/rbss_aruco_pe/package.xml
@@ -0,0 +1,30 @@
+
+
+
+ rbss_aruco_pe
+ 0.0.1
+ Aruco Marker Tracking and Pose Estimation wrapper for ROS2, using RGB cameras
+ shalenikol
+ Apache License 2.0
+
+ ament_cmake
+ ament_cmake_python
+
+ rclpy
+ cv_bridge
+ std_msgs
+ sensor_msgs
+ geometry_msgs
+ lifecycle_msgs
+ rbs_skill_interfaces
+ open3d
+ opencv
+ tf_transformations
+
+ ament_lint_auto
+ ament_lint_common
+
+
+ ament_cmake
+
+
diff --git a/rbss_aruco_pe/rbs_bt_example.xml b/rbss_aruco_pe/rbs_bt_example.xml
new file mode 100644
index 0000000..ff82aef
--- /dev/null
+++ b/rbss_aruco_pe/rbs_bt_example.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/rbss_aruco_pe/rbs_package.json b/rbss_aruco_pe/rbs_package.json
new file mode 100644
index 0000000..f8cf87c
--- /dev/null
+++ b/rbss_aruco_pe/rbs_package.json
@@ -0,0 +1,81 @@
+{
+ "skills": [
+ {
+ "sid": "ArucoPE-example",
+ "SkillPackage": { "name": "Robossembler", "version": "1.0", "format": "1" },
+ "Module": { "name": "ArucoPE", "node_name": "aruco_pe", "description": "Aruco Marker Tracking and Pose Estimation wrapper for ROS2, using RGB cameras" },
+ "Launch": { "package": "rbss_aruco_pe", "executable": "aruco_node_lc.py" },
+ "BTAction": [
+ {
+ "name": "arucoConfig",
+ "type": "run",
+ "param": [
+ {
+ "type": "topic",
+ "dependency": {
+ "type": "topic",
+ "topicType": "sensor_msgs/msg/Image",
+ "topicOut": "/rgbd_camera/image/image"
+ }
+ },
+ {
+ "type": "topic",
+ "dependency": {
+ "type": "topic",
+ "topicType": "sensor_msgs/msg/Image",
+ "topicOut": "/rgbd_camera/image/depth_image"
+ }
+ },
+ {
+ "type": "topic",
+ "dependency": {
+ "type": "topic",
+ "topicType": "sensor_msgs/msg/CameraInfo",
+ "topicOut": "/rgbd_camera/image/camera_info"
+ }
+ }
+ ],
+ "typeAction": "ACTION"
+ },
+ { "name": "arucoStop", "type": "stop", "param": [], "typeAction": "ACTION" }
+ ],
+ "topicsOut": [
+ {
+ "name": "/aruco/markers",
+ "type": "rbs_skill_interfaces/msg/ArucoMarkers"
+ },
+ {
+ "name": "/aruco/poses",
+ "type": "geometry_msgs/msg/PoseArray"
+ },
+ {
+ "name": "/aruco/image",
+ "type": "sensor_msgs/msg/Image"
+ }
+ ],
+ "Settings": {
+ "output": {
+ "params": [
+ {
+ "name": "is_image_mode",
+ "value": "True"
+ },
+ {
+ "name": "aruco_dictionary_id",
+ "value": "DICT_6X6_50"
+ },
+ {
+ "name": "marker_size",
+ "value": "0.04"
+ },
+ {
+ "name": "camera_frame",
+ "value": ""
+ }
+ ]
+ },
+ "type": "formBuilder"
+ }
+ }
+ ]
+}
diff --git a/rbss_aruco_pe/rbss_aruco_pe/__init__.py b/rbss_aruco_pe/rbss_aruco_pe/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py b/rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py
new file mode 100755
index 0000000..11e00e5
--- /dev/null
+++ b/rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py
@@ -0,0 +1,264 @@
+#!/usr/bin/env python3
+
+# Code taken from:
+# https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation
+
+# Code taken and readapted from:
+# https://github.com/GSNCodes/ArUCo-Markers-Pose-Estimation-Generation-Python/tree/main
+
+# Python imports
+import numpy as np
+import cv2
+import tf_transformations
+
+# ROS2 imports
+from rclpy.impl import rcutils_logger
+
+# ROS2 message imports
+from geometry_msgs.msg import Pose, PoseArray
+from rbs_skill_interfaces.msg import ArucoMarkers
+
+# utils import python code
+from rbss_aruco_pe.utils import aruco_display
+
+
+# def pose_estimation(rgb_frame: np.array, depth_frame: np.array, aruco_detector: cv2.aruco.ArucoDetector, marker_size: float,
+# matrix_coefficients: np.array, distortion_coefficients: np.array,
+# pose_array: PoseArray, markers: ArucoMarkers) -> list[np.array, PoseArray, ArucoMarkers]:
+def pose_estimation(rgb_frame: np.array, depth_frame: np.array, aruco_dict, marker_size: float,
+ matrix_coefficients: np.array, distortion_coefficients: np.array,
+ pose_array: PoseArray, markers: ArucoMarkers) -> list[np.array, PoseArray, ArucoMarkers]:
+ '''
+ rgb_frame - Frame from the RGB camera stream
+ depth_frame - Depth frame from the depth camera stream
+ matrix_coefficients - Intrinsic matrix of the calibrated camera
+ distortion_coefficients - Distortion coefficients associated with your camera
+ pose_array - PoseArray message to be published
+ markers - ArucoMarkers message to be published
+
+ return:-
+ frame - The frame with the axis drawn on it
+ pose_array - PoseArray with computed poses of the markers
+ markers - ArucoMarkers message containing markers id number and pose
+ '''
+
+ # old code version
+ parameters = cv2.aruco.DetectorParameters_create()
+ corners, marker_ids, _ = cv2.aruco.detectMarkers(rgb_frame, aruco_dict, parameters=parameters)
+
+ # new code version of cv2 (4.7.0)
+ # corners, marker_ids, rejected = aruco_detector.detectMarkers(image=rgb_frame)
+
+ frame_processed = rgb_frame
+ logger = rcutils_logger.RcutilsLogger(name="aruco_node")
+
+ # If markers are detected
+ if len(corners) > 0:
+
+ logger.debug(f"Detected {len(corners)} markers.")
+
+ for i, marker_id in enumerate(marker_ids):
+ # Estimate pose of each marker and return the values rvec and tvec
+
+ # using deprecated function
+ # rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(corners=corners[i],
+ # markerLength=marker_size,
+ # cameraMatrix=matrix_coefficients,
+ # distCoeffs=distortion_coefficients)
+ # tvec = tvec[0]
+
+ # alternative code version using solvePnP
+ tvec, rvec, quat = my_estimatePoseSingleMarkers(corners=corners[i], marker_size=marker_size,
+ camera_matrix=matrix_coefficients,
+ distortion=distortion_coefficients)
+
+ # show the detected markers bounding boxes
+ frame_processed = aruco_display(corners=corners, ids=marker_ids,
+ image=frame_processed)
+
+ # draw frame axes
+ frame_processed = cv2.drawFrameAxes(image=frame_processed, cameraMatrix=matrix_coefficients,
+ distCoeffs=distortion_coefficients, rvec=rvec, tvec=tvec,
+ length=0.05, thickness=3)
+
+ if (depth_frame is not None):
+ # get the centroid of the pointcloud
+ centroid = depth_to_pointcloud_centroid(depth_image=depth_frame,
+ intrinsic_matrix=matrix_coefficients,
+ corners=corners[i])
+
+ # log comparison between depthcloud centroid and tvec estimated positions
+ logger.info(f"depthcloud centroid = {centroid}")
+ logger.info(f"tvec = {tvec[0]} {tvec[1]} {tvec[2]}")
+
+ # compute pose from the rvec and tvec arrays
+ if (depth_frame is not None):
+ # use computed centroid from depthcloud as estimated pose
+ pose = Pose()
+ pose.position.x = float(centroid[0])
+ pose.position.y = float(centroid[1])
+ pose.position.z = float(centroid[2])
+ else:
+ # use tvec from aruco estimator as estimated pose
+ pose = Pose()
+ pose.position.x = float(tvec[0])
+ pose.position.y = float(tvec[1])
+ pose.position.z = float(tvec[2])
+
+ pose.orientation.x = quat[0]
+ pose.orientation.y = quat[1]
+ pose.orientation.z = quat[2]
+ pose.orientation.w = quat[3]
+
+ # add the pose and marker id to the pose_array and markers messages
+ pose_array.poses.append(pose)
+ markers.poses.append(pose)
+ markers.marker_ids.append(marker_id[0])
+
+ return frame_processed, pose_array, markers
+
+
+def my_estimatePoseSingleMarkers(corners, marker_size, camera_matrix, distortion) -> tuple[np.array, np.array, np.array]:
+ '''
+ This will estimate the rvec and tvec for each of the marker corners detected by:
+ corners, ids, rejectedImgPoints = detector.detectMarkers(image)
+
+ corners - is an array of detected corners for each detected marker in the image
+ marker_size - is the size of the detected markers in meters
+ mtx - is the camera intrinsic matrix
+ distortion - is the camera distortion matrix
+ RETURN list of rvecs, tvecs, and trash (so that it corresponds to the old estimatePoseSingleMarkers())
+ '''
+ marker_points = np.array([[-marker_size / 2.0, marker_size / 2.0, 0],
+ [marker_size / 2.0, marker_size / 2.0, 0],
+ [marker_size / 2.0, -marker_size / 2.0, 0],
+ [-marker_size / 2.0, -marker_size / 2.0, 0]], dtype=np.float32)
+
+ # solvePnP returns the rotation and translation vectors
+ retval, rvec, tvec = cv2.solvePnP(objectPoints=marker_points, imagePoints=corners,
+ cameraMatrix=camera_matrix, distCoeffs=distortion, flags=cv2.SOLVEPNP_IPPE_SQUARE)
+ rvec = rvec.reshape(3, 1)
+ tvec = tvec.reshape(3, 1)
+
+ rot, jacobian = cv2.Rodrigues(rvec)
+ rot_matrix = np.eye(4, dtype=np.float32)
+ rot_matrix[0:3, 0:3] = rot
+
+ # convert rotation matrix to quaternion
+ quaternion = tf_transformations.quaternion_from_matrix(rot_matrix)
+ norm_quat = np.linalg.norm(quaternion)
+ quaternion = quaternion / norm_quat
+
+ return tvec, rvec, quaternion
+
+
+def depth_to_pointcloud_centroid(depth_image: np.array, intrinsic_matrix: np.array,
+ corners: np.array) -> np.array:
+ """
+ This function takes a depth image and the corners of a quadrilateral as input,
+ and returns the centroid of the corresponding pointcloud.
+
+ Args:
+ depth_image: A 2D numpy array representing the depth image.
+ corners: A list of 4 tuples, each representing the (x, y) coordinates of a corner.
+
+ Returns:
+ A tuple (x, y, z) representing the centroid of the segmented pointcloud.
+ """
+
+ # Get image parameters
+ height, width = depth_image.shape
+
+
+ # Check if all corners are within image bounds
+ # corners has shape (1, 4, 2)
+ corners_indices = np.array([(int(x), int(y)) for x, y in corners[0]])
+
+ for x, y in corners_indices:
+ if x < 0 or x >= width or y < 0 or y >= height:
+ raise ValueError("One or more corners are outside the image bounds.")
+
+ # bounding box of the polygon
+ x_min = int(min(corners_indices[:, 0]))
+ x_max = int(max(corners_indices[:, 0]))
+ y_min = int(min(corners_indices[:, 1]))
+ y_max = int(max(corners_indices[:, 1]))
+
+ # create array of pixels inside the polygon defined by the corners
+ # search for pixels inside the squared bounding box of the polygon
+ points = []
+ for x in range(x_min, x_max):
+ for y in range(y_min, y_max):
+ if is_pixel_in_polygon(pixel=(x, y), corners=corners_indices):
+ # add point to the list of points
+ points.append([x, y, depth_image[y, x]])
+
+ # Convert points to numpy array
+ points = np.array(points, dtype=np.uint16)
+
+ # convert to open3d image
+ #depth_segmented = geometry.Image(points)
+ # create pinhole camera model
+ #pinhole_matrix = camera.PinholeCameraIntrinsic(width=width, height=height,
+ # intrinsic_matrix=intrinsic_matrix)
+ # Convert points to Open3D pointcloud
+ #pointcloud = geometry.PointCloud.create_from_depth_image(depth=depth_segmented, intrinsic=pinhole_matrix,
+ # depth_scale=1000.0)
+
+ # apply formulas to pointcloud, where
+ # fx = intrinsic_matrix[0, 0], fy = intrinsic_matrix[1, 1]
+ # cx = intrinsic_matrix[0, 2], cy = intrinsic_matrix[1, 2],
+ # u = x, v = y, d = depth_image[y, x], depth_scale = 1000.0,
+ # z = d / depth_scale
+ # x = (u - cx) * z / fx
+ # y = (v - cy) * z / fy
+
+ # create pointcloud
+ pointcloud = []
+ for x, y, d in points:
+ z = d / 1000.0
+ x = (x - intrinsic_matrix[0, 2]) * z / intrinsic_matrix[0, 0]
+ y = (y - intrinsic_matrix[1, 2]) * z / intrinsic_matrix[1, 1]
+ pointcloud.append([x, y, z])
+
+ # Calculate centroid from pointcloud
+ centroid = np.mean(np.array(pointcloud, dtype=np.uint16), axis=0)
+
+ return centroid
+
+
+def is_pixel_in_polygon(pixel: tuple, corners: np.array) -> bool:
+ """
+ This function takes a pixel and a list of corners as input, and returns whether the pixel is inside the polygon
+ defined by the corners. This function uses the ray casting algorithm to determine if the pixel is inside the polygon.
+ This algorithm works by casting a ray from the pixel in the positive x-direction, and counting the number of times
+ the ray intersects with the edges of the polygon. If the number of intersections is odd, the pixel is inside the
+ polygon, otherwise it is outside. This algorithm works for both convex and concave polygons.
+
+ Args:
+ pixel: A tuple (x, y) representing the pixel coordinates.
+ corners: A list of 4 tuples in a numpy array, each representing the (x, y) coordinates of a corner.
+
+ Returns:
+ A boolean indicating whether the pixel is inside the polygon.
+ """
+
+ # Initialize counter for number of intersections
+ num_intersections = 0
+
+ # Iterate over each edge of the polygon
+ for i in range(len(corners)):
+ x1, y1 = corners[i]
+ x2, y2 = corners[(i + 1) % len(corners)]
+
+ # Check if the pixel is on the same y-level as the edge
+ if (y1 <= pixel[1] < y2) or (y2 <= pixel[1] < y1):
+ # Calculate the x-coordinate of the intersection point
+ x_intersection = (x2 - x1) * (pixel[1] - y1) / (y2 - y1) + x1
+
+ # Check if the intersection point is to the right of the pixel
+ if x_intersection > pixel[0]:
+ num_intersections += 1
+
+ # Return whether the number of intersections is odd
+ return num_intersections % 2 == 1
diff --git a/rbss_aruco_pe/rbss_aruco_pe/utils.py b/rbss_aruco_pe/rbss_aruco_pe/utils.py
new file mode 100755
index 0000000..281d940
--- /dev/null
+++ b/rbss_aruco_pe/rbss_aruco_pe/utils.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python3
+
+import cv2
+from rclpy.impl import rcutils_logger
+
+ARUCO_DICT = {
+ "DICT_4X4_50": cv2.aruco.DICT_4X4_50,
+ "DICT_4X4_100": cv2.aruco.DICT_4X4_100,
+ "DICT_4X4_250": cv2.aruco.DICT_4X4_250,
+ "DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
+ "DICT_5X5_50": cv2.aruco.DICT_5X5_50,
+ "DICT_5X5_100": cv2.aruco.DICT_5X5_100,
+ "DICT_5X5_250": cv2.aruco.DICT_5X5_250,
+ "DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
+ "DICT_6X6_50": cv2.aruco.DICT_6X6_50,
+ "DICT_6X6_100": cv2.aruco.DICT_6X6_100,
+ "DICT_6X6_250": cv2.aruco.DICT_6X6_250,
+ "DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
+ "DICT_7X7_50": cv2.aruco.DICT_7X7_50,
+ "DICT_7X7_100": cv2.aruco.DICT_7X7_100,
+ "DICT_7X7_250": cv2.aruco.DICT_7X7_250,
+ "DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
+ "DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL,
+ "DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5,
+ "DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9,
+ "DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10,
+ "DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11
+}
+
+
+def aruco_display(corners, ids, image):
+ '''
+ This function draws the axes on the image for each of the detected markers,
+ given the rotation and translation vectors of the input camera image frame.
+ :param corners: corners of the detected marker regions
+ :param ids: ArUco marker IDs
+ :param image: input image frame
+ :return: RGB image with the axes drawn on it
+ '''
+ if len(corners) > 0:
+ # flatten the ArUco IDs list
+ ids = ids.flatten()
+ # loop over the detected ArUCo corners
+ for (markerCorner, markerID) in zip(corners, ids):
+ # extract the marker corners (which are always returned in
+ # top-left, top-right, bottom-right, and bottom-left order)
+ corners = markerCorner.reshape((4, 2))
+ (topLeft, topRight, bottomRight, bottomLeft) = corners
+ # convert each of the (x, y)-coordinate pairs to integers
+ topRight = (int(topRight[0]), int(topRight[1]))
+ bottomRight = (int(bottomRight[0]), int(bottomRight[1]))
+ bottomLeft = (int(bottomLeft[0]), int(bottomLeft[1]))
+ topLeft = (int(topLeft[0]), int(topLeft[1]))
+
+ cv2.line(image, topLeft, topRight, (0, 255, 0), 2)
+ cv2.line(image, topRight, bottomRight, (0, 255, 0), 2)
+ cv2.line(image, bottomRight, bottomLeft, (0, 255, 0), 2)
+ cv2.line(image, bottomLeft, topLeft, (0, 255, 0), 2)
+ # compute and draw the center (x, y)-coordinates of the ArUco
+ # marker
+ cX = int((topLeft[0] + bottomRight[0]) / 2.0)
+ cY = int((topLeft[1] + bottomRight[1]) / 2.0)
+ cv2.circle(image, (cX, cY), 4, (0, 0, 255), -1)
+ # draw the ArUco marker ID on the image
+ cv2.putText(image, str(markerID), (topLeft[0], topLeft[1] - 10), cv2.FONT_HERSHEY_SIMPLEX,
+ 0.5, (0, 255, 0), 2)
+
+ logger = rcutils_logger.RcutilsLogger(name="aruco_node")
+ logger.debug("[Inference] ArUco markers IDs: {}".format(ids))
+
+ return image
+
diff --git a/rbss_aruco_pe/scripts/aruco_node_lc.py b/rbss_aruco_pe/scripts/aruco_node_lc.py
new file mode 100755
index 0000000..a5088a6
--- /dev/null
+++ b/rbss_aruco_pe/scripts/aruco_node_lc.py
@@ -0,0 +1,369 @@
+#!/usr/bin/env python3
+"""
+Code taken from:
+https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation
+ROS2 wrapper code taken from:
+https://github.com/JMU-ROBOTICS-VIVA/ros2_aruco/tree/main
+
+This node locates Aruco AR markers in images and publishes their ids and poses.
+
+Subscriptions:
+ /camera/image/image (sensor_msgs.msg.Image)
+ /camera/image/camera_info (sensor_msgs.msg.CameraInfo)
+
+Published Topics:
+ /aruco/poses (geometry_msgs.msg.PoseArray)
+ Pose of all detected markers (suitable for rviz visualization)
+
+ /aruco/markers (aruco_interfaces.msg.ArucoMarkers)
+ Provides an array of all poses along with the corresponding
+ marker ids.
+
+ /aruco/image (sensor_msgs.msg.Image)
+ Annotated image with marker locations and ids, with markers drawn on it
+
+Parameters:
+ marker_size - size of the markers in meters (default 0.04)
+ aruco_dictionary_id - dictionary that was used to generate markers (default DICT_6X6_50)
+ image_topic - image topic to subscribe to (default /camera/color/image_raw)
+ camera_info_topic - camera info topic to subscribe to (default /camera/camera_info)
+ camera_frame - camera optical frame to use (default "camera_depth_optical_frame")
+ detected_markers_topic - topic to publish detected markers (default /aruco/markers)
+ markers_visualization_topic - topic to publish markers visualization (default /aruco/poses)
+ output_image_topic - topic to publish annotated image (default /aruco/image)
+
+Version: 2025-04-08
+"""
+
+# ROS2 imports
+import rclpy
+# import rclpy.node
+from rclpy.qos import qos_profile_sensor_data
+from cv_bridge import CvBridge
+import message_filters
+from rclpy.lifecycle import Node
+# from rclpy.lifecycle import Publisher
+from rclpy.lifecycle import LifecycleState
+from rclpy.lifecycle import TransitionCallbackReturn
+
+# Python imports
+import numpy as np
+import cv2
+import json
+
+# Local imports for custom defined functions
+from rbss_aruco_pe.utils import ARUCO_DICT
+from rbss_aruco_pe.pose_estimation import pose_estimation
+
+# ROS2 message imports
+from sensor_msgs.msg import CameraInfo, Image
+from geometry_msgs.msg import PoseArray
+# from rcl_interfaces.msg import ParameterDescriptor, ParameterType
+from rbs_skill_interfaces.msg import ArucoMarkers
+
+NODE_NAME_DEFAULT = "aruco_pe"
+PARAM_SKILL_CFG = "aruco_pe_cfg"
+
+DEFAULT_ARUCO_DICTIONARY = "DICT_6X6_50"
+DEFAULT_MARKER_SIZE = 0.04
+# Input topics
+INP_IMAGE = "/camera/image/image"
+INP_DEPTH_IMAGE = "/camera/image/depth_image"
+INP_CAMERA_INFO = "/camera/image/camera_info"
+CAMERA_FRAME = "camera_color_optical_frame" # Camera link frame of reference
+# Output topics
+OUT_DEFAULT_DETECTED_MARKERS = "/aruco/markers"
+OUT_TT_DETECTED_MARKERS = "rbs_skill_interfaces/msg/ArucoMarkers"
+OUT_DEFAULT_MARKERS_VISUALISATION = "/aruco/poses"
+OUT_TT_MARKERS_VISUALISATION = "geometry_msgs/msg/PoseArray"
+OUT_DEFAULT_IMAGE = "/aruco/image"
+OUT_TT_IMAGE = "sensor_msgs/msg/Image"
+
+class ArucoLcNode(Node):
+ """Our lifecycle node."""
+ def __init__(self, **kwargs):
+ # default value
+ self.dictionary_id_name = DEFAULT_ARUCO_DICTIONARY
+ self.marker_size = DEFAULT_MARKER_SIZE # Size of the markers in meters
+ self.use_depth_input = False # Use depth image for 3D pose estimation
+ self.is_image_mode = False # Publish Image with detected Aruco markers
+
+ self.image_topic = INP_IMAGE
+ self.info_topic = INP_CAMERA_INFO
+ self.depth_image_topic = INP_DEPTH_IMAGE
+ self.camera_frame = CAMERA_FRAME
+
+ self.detected_markers_topic = OUT_DEFAULT_DETECTED_MARKERS
+ self.markers_visualization_topic = OUT_DEFAULT_MARKERS_VISUALISATION
+ self.output_image_topic = OUT_DEFAULT_IMAGE
+
+ # for other nodes
+ kwargs["allow_undeclared_parameters"] = True
+ kwargs["automatically_declare_parameters_from_overrides"] = True
+ super().__init__(NODE_NAME_DEFAULT, **kwargs)
+ str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
+
+ self.skill_cfg = json.loads(str_cfg)
+ self.nodeName = NODE_NAME_DEFAULT
+
+ self.initialize_parameters()
+
+ # Make sure we have a valid dictionary id:
+ try:
+ dictionary_id = cv2.aruco.__getattribute__(self.dictionary_id_name)
+ # check if the dictionary_id is a valid dictionary inside ARUCO_DICT values
+ if dictionary_id not in ARUCO_DICT.values():
+ raise AttributeError
+ except AttributeError:
+ self.get_logger().error(f"bad aruco_dictionary_id: {self.dictionary_id_name}")
+ options = "\n".join([s for s in ARUCO_DICT])
+ self.get_logger().error(f"valid options: {options}")
+ return False
+
+ self.aruco_dictionary = cv2.aruco.getPredefinedDictionary(dictionary_id)
+
+ self.bridge = CvBridge()
+
+ # Set up publishers
+ self.poses_pub = self.create_lifecycle_publisher(PoseArray, self.markers_visualization_topic, 5)
+ self.markers_pub = self.create_lifecycle_publisher(ArucoMarkers, self.detected_markers_topic, 5)
+ self.image_pub = self.create_lifecycle_publisher(Image, self.output_image_topic, 5)
+
+ self.clear_param()
+
+ def clear_param(self):
+ # Set up fields for camera parameters
+ self.info_msg = None
+ self.intrinsic_mat = None
+ self.distortion = None
+
+ def initialize_parameters(self):
+ """
+ Read parameters from skill config (via interface node)
+ """
+ self._Settings()
+ for prop in self.skill_cfg["topicsOut"]:
+ ty = prop["type"]
+ if ty == OUT_TT_DETECTED_MARKERS:
+ self.detected_markers_topic = prop["name"]
+ elif ty == OUT_TT_MARKERS_VISUALISATION:
+ self.markers_visualization_topic = prop["name"]
+ elif ty == OUT_TT_IMAGE:
+ self.output_image_topic = prop["name"]
+
+ def _Settings(self):
+ # Initialization service settings
+ for prop in self.skill_cfg["Settings"]["output"]["params"]:
+ nam = prop["name"]
+ val = prop["value"]
+ if nam == "aruco_dictionary_id":
+ self.aruco_dictionary_id = val
+ elif nam == "marker_size":
+ self.marker_size = float(val)
+ elif nam == "camera_frame": # TODO перенести в dependencies для "arucoConfig"
+ self.camera_frame = val
+ elif nam == "is_image_mode":
+ self.is_image_mode = (val == "True")
+
+ def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
+ """
+ Configure the node, after a configuring transition is requested.
+ """
+ json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
+ self.skill_cfg = json.loads(json_param)
+ for comm in self.skill_cfg["BTAction"]:
+ for par in comm["param"]:
+ dependency = par["dependency"]
+ if par["type"] == "topic":
+ topic_type = dependency["topicType"]
+ topic_name = dependency["topicOut"]
+ if "CameraInfo" in topic_type:
+ self.info_topic = topic_name
+ elif "Image" in topic_type:
+ if "depth" in topic_name:
+ self.depth_image_topic = topic_name
+ else:
+ self.image_topic = topic_name
+ # elif par["type"] == "formBuilder":
+ # self.objName = dependency["output"]["object_name"]
+ # val = dependency["output"]["process"]["selectProcess"]["value"]
+
+ # camera info topic for the camera calibration parameters
+ self.info_sub = self.create_subscription(CameraInfo, self.info_topic, self.info_callback, qos_profile_sensor_data)
+
+ self.get_logger().info("on_configure is called.")
+ return TransitionCallbackReturn.SUCCESS
+
+ def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
+ # select the type of input to use for the pose estimation
+ if self.use_depth_input:
+ # use both rgb and depth image topics for the pose estimation
+
+ # create a message filter to synchronize the image and depth image topics
+ self.image_sub = message_filters.Subscriber(self, Image, self.image_topic, qos_profile=qos_profile_sensor_data)
+ self.depth_image_sub = message_filters.Subscriber(self, Image, self.depth_image_topic, qos_profile=qos_profile_sensor_data)
+
+ # create synchronizer between the 2 topics using message filters and approximate time policy
+ # slop is the maximum time difference between messages that are considered synchronized
+ self.synchronizer = message_filters.ApproximateTimeSynchronizer(
+ [self.image_sub, self.depth_image_sub], queue_size=10, slop=0.05
+ )
+ self.synchronizer.registerCallback(self.rgb_depth_sync_callback)
+ else:
+ # rely only on the rgb image topic for the pose estimation
+
+ # create a subscription to the image topic
+ self.image_sub = self.create_subscription(Image, self.image_topic, self.image_callback, qos_profile_sensor_data)
+
+ self.get_logger().info("on_activate is called")
+ return super().on_activate(state)
+
+ def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
+ if self.use_depth_input:
+ # TODO not tested
+ self.destroy_subscription(self.image_sub)
+ self.destroy_subscription(self.depth_image_sub)
+ else:
+ # Destroy the Image subscriber.
+ self.destroy_subscription(self.image_sub)
+
+ self.get_logger().info("on_deactivate is called")
+ return super().on_deactivate(state)
+
+ def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
+ self.destroy_subscription(self.info_sub)
+
+ self.clear_param()
+
+ self.get_logger().info("on_cleanup is called")
+ return TransitionCallbackReturn.SUCCESS
+
+ def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn:
+ self.get_logger().info("on_shutdown is called")
+ return TransitionCallbackReturn.SUCCESS
+
+ def info_callback(self, info_msg):
+ self.info_msg = info_msg
+ # get the intrinsic matrix and distortion coefficients from the camera info
+ self.intrinsic_mat = np.reshape(np.array(self.info_msg.k), (3, 3))
+ self.distortion = np.array(self.info_msg.d)
+
+ self.get_logger().info("Camera info received.")
+ self.get_logger().info(f"Intrinsic matrix: {self.intrinsic_mat}")
+ self.get_logger().info(f"Distortion coefficients: {self.distortion}")
+ self.get_logger().info(f"Camera frame: {self.info_msg.width}x{self.info_msg.height}")
+
+ # Assume that camera parameters will remain the same...
+ self.destroy_subscription(self.info_sub)
+
+ def image_callback(self, img_msg: Image):
+ if self.info_msg is None:
+ self.get_logger().warn("No camera info has been received!")
+ return
+
+ # convert the image messages to cv2 format
+ cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding="rgb8")
+
+ # create the ArucoMarkers and PoseArray messages
+ markers = ArucoMarkers()
+ pose_array = PoseArray()
+
+ # Set the frame id and timestamp for the markers and pose array
+ if self.camera_frame == "":
+ markers.header.frame_id = self.info_msg.header.frame_id
+ pose_array.header.frame_id = self.info_msg.header.frame_id
+ else:
+ markers.header.frame_id = self.camera_frame
+ pose_array.header.frame_id = self.camera_frame
+
+ markers.header.stamp = img_msg.header.stamp
+ pose_array.header.stamp = img_msg.header.stamp
+
+ """
+ # OVERRIDE: use calibrated intrinsic matrix and distortion coefficients
+ self.intrinsic_mat = np.reshape([615.95431, 0., 325.26983,
+ 0., 617.92586, 257.57722,
+ 0., 0., 1.], (3, 3))
+ self.distortion = np.array([0.142588, -0.311967, 0.003950, -0.006346, 0.000000])
+ """
+
+ # call the pose estimation function
+ frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
+ aruco_dict=self.aruco_dictionary,
+ marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
+ distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
+ # frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
+ # aruco_detector=self.aruco_detector,
+ # marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
+ # distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
+
+ # if some markers are detected
+ if len(markers.marker_ids) > 0:
+ # Publish the results with the poses and markes positions
+ self.poses_pub.publish(pose_array)
+ self.markers_pub.publish(markers)
+
+ # publish the image frame with computed markers positions over the image
+ self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))
+
+ def rgb_depth_sync_callback(self, rgb_msg: Image, depth_msg: Image):
+
+ # convert the image messages to cv2 format
+ cv_depth_image = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding="16UC1")
+ cv_image = self.bridge.imgmsg_to_cv2(rgb_msg, desired_encoding="rgb8")
+
+ # create the ArucoMarkers and PoseArray messages
+ markers = ArucoMarkers()
+ pose_array = PoseArray()
+
+ # Set the frame id and timestamp for the markers and pose array
+ if self.camera_frame == "":
+ markers.header.frame_id = self.info_msg.header.frame_id
+ pose_array.header.frame_id = self.info_msg.header.frame_id
+ else:
+ markers.header.frame_id = self.camera_frame
+ pose_array.header.frame_id = self.camera_frame
+
+ markers.header.stamp = rgb_msg.header.stamp
+ pose_array.header.stamp = rgb_msg.header.stamp
+
+ # call the pose estimation function
+ frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image,
+ aruco_dict=self.aruco_dictionary,
+ marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
+ distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
+ # frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image,
+ # aruco_detector=self.aruco_detector,
+ # marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
+ # distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
+
+ # if some markers are detected
+ if len(markers.marker_ids) > 0:
+ # Publish the results with the poses and markes positions
+ self.poses_pub.publish(pose_array)
+ self.markers_pub.publish(markers)
+
+ # publish the image frame with computed markers positions over the image
+ self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))
+
+def main():
+ # rclpy.init()
+ # node = ArucoNode()
+ # rclpy.spin(node)
+
+ # node.destroy_node()
+ # rclpy.shutdown()
+ rclpy.init()
+
+ executor = rclpy.executors.SingleThreadedExecutor()
+ lc_node = ArucoLcNode()
+ if lc_node:
+ executor.add_node(lc_node)
+ try:
+ executor.spin()
+ except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
+ lc_node.destroy_node()
+ rclpy.shutdown()
+
+if __name__ == "__main__":
+ main()