ADD: skill - pose estimation for Aruco markers
This commit is contained in:
parent
0798720a4c
commit
0bfa17c30b
12 changed files with 1067 additions and 0 deletions
|
@ -28,6 +28,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
|||
"msg/ActionFeedbackStatusConstants.msg"
|
||||
"msg/ActionResultStatusConstants.msg"
|
||||
"msg/BoundBox.msg"
|
||||
"msg/ArucoMarkers.msg"
|
||||
"srv/DetectObject.srv"
|
||||
"srv/BtInit.srv"
|
||||
"srv/AssembleState.srv"
|
||||
|
|
5
rbs_skill_interfaces/msg/ArucoMarkers.msg
Normal file
5
rbs_skill_interfaces/msg/ArucoMarkers.msg
Normal file
|
@ -0,0 +1,5 @@
|
|||
|
||||
std_msgs/Header header
|
||||
|
||||
int64[] marker_ids
|
||||
geometry_msgs/Pose[] poses
|
51
rbss_aruco_pe/CMakeLists.txt
Normal file
51
rbss_aruco_pe/CMakeLists.txt
Normal file
|
@ -0,0 +1,51 @@
|
|||
cmake_minimum_required(VERSION 3.8)
|
||||
project(rbss_aruco_pe)
|
||||
|
||||
# Default to C++17
|
||||
if(NOT CMAKE_CXX_STANDARD)
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
endif()
|
||||
|
||||
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
|
||||
add_compile_options(-Wall -Wextra -Wpedantic)
|
||||
endif()
|
||||
|
||||
# Find dependencies
|
||||
find_package(ament_cmake REQUIRED)
|
||||
find_package(ament_cmake_python REQUIRED)
|
||||
find_package(rclpy REQUIRED)
|
||||
#find_package(open3d REQUIRED)
|
||||
#find_package(numpy REQUIRED)
|
||||
#find_package(cv2 REQUIRED)
|
||||
#find_package(cv_bridge REQUIRED)
|
||||
#find_package(tf_transformations REQUIRED)
|
||||
find_package(rbs_skill_interfaces REQUIRED)
|
||||
|
||||
# Install Python modules
|
||||
ament_python_install_package(${PROJECT_NAME})
|
||||
|
||||
# Install Python executables
|
||||
install(PROGRAMS
|
||||
scripts/aruco_node_lc.py
|
||||
DESTINATION lib/${PROJECT_NAME}
|
||||
)
|
||||
|
||||
# Install directories
|
||||
install(DIRECTORY
|
||||
launch config
|
||||
DESTINATION share/${PROJECT_NAME}
|
||||
)
|
||||
|
||||
if(BUILD_TESTING)
|
||||
find_package(ament_lint_auto REQUIRED)
|
||||
# the following line skips the linter which checks for copyrights
|
||||
# comment the line when a copyright and license is added to all source files
|
||||
set(ament_cmake_copyright_FOUND TRUE)
|
||||
# the following line skips cpplint (only works in a git repo)
|
||||
# comment the line when this package is in a git repo and when
|
||||
# a copyright and license is added to all source files
|
||||
set(ament_cmake_cpplint_FOUND TRUE)
|
||||
ament_lint_auto_find_test_dependencies()
|
||||
endif()
|
||||
|
||||
ament_package()
|
15
rbss_aruco_pe/config/aruco_parameters.yaml
Normal file
15
rbss_aruco_pe/config/aruco_parameters.yaml
Normal file
|
@ -0,0 +1,15 @@
|
|||
/aruco_node:
|
||||
ros__parameters:
|
||||
# Aruco detection parameters
|
||||
marker_size: 0.04 # Size of the markers in meters
|
||||
aruco_dictionary_id: DICT_6X6_50 # Aruco dictionary type
|
||||
# Input topics
|
||||
image_topic: /rgbd_camera/image/image # Input image topic
|
||||
use_depth_input: false # Use depth image for 3D pose estimation
|
||||
depth_image_topic: /camera/aligned_depth_to_color/image_raw # Input depth image topic
|
||||
camera_info_topic: /rgbd_camera/image/camera_info # Input camera info topic with camera intrinsic parameters
|
||||
camera_frame: camera_color_optical_frame # Camera link frame of reference
|
||||
# Output topics
|
||||
detected_markers_topic: /aruco/markers # Output topic with detected markers (aruco poses + ids)
|
||||
markers_visualization_topic: /aruco/poses # Output topic with visualization of detected markers as pose array
|
||||
output_image_topic: /aruco/image # Output topic with visualization of detected markers drawn on image
|
171
rbss_aruco_pe/launch/rbss_aruco_pe.launch.py
Normal file
171
rbss_aruco_pe/launch/rbss_aruco_pe.launch.py
Normal file
|
@ -0,0 +1,171 @@
|
|||
def generate_launch_description():
|
||||
print("The skill must be launched via interface node")
|
||||
return []
|
||||
|
||||
# # ROS2 imports
|
||||
# from launch.substitutions import PathJoinSubstitution, LaunchConfiguration
|
||||
# # from launch.actions import IncludeLaunchDescription
|
||||
# # from launch.launch_description_sources import PythonLaunchDescriptionSource
|
||||
# from launch import LaunchDescription
|
||||
# from launch_ros.actions import Node
|
||||
# from launch.actions import DeclareLaunchArgument
|
||||
# # from launch_ros.substitutions import FindPackageShare
|
||||
# # from launch.conditions import IfCondition, UnlessCondition
|
||||
|
||||
# import os
|
||||
# from ament_index_python.packages import get_package_share_directory
|
||||
# import yaml
|
||||
|
||||
|
||||
# def generate_launch_description():
|
||||
|
||||
# aruco_params_file = os.path.join(
|
||||
# get_package_share_directory('rbss_aruco_pe'),
|
||||
# 'config',
|
||||
# 'aruco_parameters.yaml'
|
||||
# )
|
||||
|
||||
# with open(aruco_params_file, 'r') as file:
|
||||
# config = yaml.safe_load(file)
|
||||
|
||||
# config = config["/aruco_node"]["ros__parameters"]
|
||||
|
||||
# # declare configuration parameters
|
||||
# marker_size_arg = DeclareLaunchArgument(
|
||||
# name='marker_size',
|
||||
# default_value=str(config['marker_size']),
|
||||
# description='Size of the aruco marker in meters',
|
||||
# )
|
||||
|
||||
# aruco_dictionary_id_arg = DeclareLaunchArgument(
|
||||
# name='aruco_dictionary_id',
|
||||
# default_value=config['aruco_dictionary_id'],
|
||||
# description='ID of the aruco dictionary to use',
|
||||
# )
|
||||
|
||||
# image_topic_arg = DeclareLaunchArgument(
|
||||
# name='image_topic',
|
||||
# default_value=config['image_topic'],
|
||||
# description='Name of the image RGB topic to subscribe to',
|
||||
# )
|
||||
|
||||
# use_depth_input_arg = DeclareLaunchArgument(
|
||||
# name='use_depth_input',
|
||||
# default_value=str(config['use_depth_input']),
|
||||
# description='Use depth input for pose estimation',
|
||||
# choices=['true', 'false', 'True', 'False']
|
||||
# )
|
||||
|
||||
# depth_image_topic_arg = DeclareLaunchArgument(
|
||||
# name='depth_image_topic',
|
||||
# default_value=config['depth_image_topic'],
|
||||
# description='Name of the depth image topic to subscribe to',
|
||||
# )
|
||||
|
||||
# camera_info_topic_arg = DeclareLaunchArgument(
|
||||
# name='camera_info_topic',
|
||||
# default_value=config['camera_info_topic'],
|
||||
# description='Name of the camera info topic to subscribe to',
|
||||
# )
|
||||
|
||||
# camera_frame_arg = DeclareLaunchArgument(
|
||||
# name='camera_frame',
|
||||
# default_value=config['camera_frame'],
|
||||
# description='Name of the camera frame where the estimated pose will be',
|
||||
# )
|
||||
|
||||
# detected_markers_topic_arg = DeclareLaunchArgument(
|
||||
# name='detected_markers_topic',
|
||||
# default_value=config['detected_markers_topic'],
|
||||
# description='Name of the topic to publish the detected markers messages',
|
||||
# )
|
||||
|
||||
# markers_visualization_topic_arg = DeclareLaunchArgument(
|
||||
# name='markers_visualization_topic',
|
||||
# default_value=config['markers_visualization_topic'],
|
||||
# description='Name of the topic to publish the pose array for visualization of the markers',
|
||||
# )
|
||||
|
||||
# output_image_topic_arg = DeclareLaunchArgument(
|
||||
# name='output_image_topic',
|
||||
# default_value=config['output_image_topic'],
|
||||
# description='Name of the topic to publish the image with the detected markers',
|
||||
# )
|
||||
|
||||
# aruco_node = Node(
|
||||
# package='rbss_aruco_pe',
|
||||
# executable='aruco_node.py',
|
||||
# parameters=[{
|
||||
# "marker_size": LaunchConfiguration('marker_size'),
|
||||
# "aruco_dictionary_id": LaunchConfiguration('aruco_dictionary_id'),
|
||||
# "image_topic": LaunchConfiguration('image_topic'),
|
||||
# "use_depth_input": LaunchConfiguration('use_depth_input'),
|
||||
# "depth_image_topic": LaunchConfiguration('depth_image_topic'),
|
||||
# "camera_info_topic": LaunchConfiguration('camera_info_topic'),
|
||||
# "camera_frame": LaunchConfiguration('camera_frame'),
|
||||
# "detected_markers_topic": LaunchConfiguration('detected_markers_topic'),
|
||||
# "markers_visualization_topic": LaunchConfiguration('markers_visualization_topic'),
|
||||
# "output_image_topic": LaunchConfiguration('output_image_topic'),
|
||||
# }],
|
||||
# output='screen',
|
||||
# emulate_tty=True
|
||||
# )
|
||||
|
||||
# # # launch realsense camera node
|
||||
# # cam_feed_launch_file = PathJoinSubstitution(
|
||||
# # [FindPackageShare("realsense2_camera"), "launch", "rs_launch.py"]
|
||||
# # )
|
||||
|
||||
# # camera_feed_depth_node = IncludeLaunchDescription(
|
||||
# # PythonLaunchDescriptionSource(cam_feed_launch_file),
|
||||
# # launch_arguments={
|
||||
# # "pointcloud.enable": "true",
|
||||
# # "enable_rgbd": "true",
|
||||
# # "enable_sync": "true",
|
||||
# # "align_depth.enable": "true",
|
||||
# # "enable_color": "true",
|
||||
# # "enable_depth": "true",
|
||||
# # }.items(),
|
||||
# # condition=IfCondition(LaunchConfiguration('use_depth_input'))
|
||||
# # )
|
||||
|
||||
# # camera_feed_node = IncludeLaunchDescription(
|
||||
# # PythonLaunchDescriptionSource(cam_feed_launch_file),
|
||||
# # launch_arguments={
|
||||
# # "pointcloud.enable": "true",
|
||||
# # "enable_color": "true",
|
||||
# # }.items(),
|
||||
# # condition=UnlessCondition(LaunchConfiguration('use_depth_input'))
|
||||
# # )
|
||||
|
||||
# # rviz_file = PathJoinSubstitution([
|
||||
# # FindPackageShare('rbss_aruco_pe'),
|
||||
# # 'rviz',
|
||||
# # 'cam_detect.rviz'
|
||||
# # ])
|
||||
|
||||
# # rviz2_node = Node(
|
||||
# # package='rviz2',
|
||||
# # executable='rviz2',
|
||||
# # arguments=['-d', rviz_file]
|
||||
# # )
|
||||
|
||||
# return LaunchDescription([
|
||||
# # Arguments
|
||||
# marker_size_arg,
|
||||
# aruco_dictionary_id_arg,
|
||||
# image_topic_arg,
|
||||
# use_depth_input_arg,
|
||||
# depth_image_topic_arg,
|
||||
# camera_info_topic_arg,
|
||||
# camera_frame_arg,
|
||||
# detected_markers_topic_arg,
|
||||
# markers_visualization_topic_arg,
|
||||
# output_image_topic_arg,
|
||||
|
||||
# # Nodes
|
||||
# aruco_node,
|
||||
# # camera_feed_depth_node,
|
||||
# # camera_feed_node,
|
||||
# # rviz2_node
|
||||
# ])
|
30
rbss_aruco_pe/package.xml
Normal file
30
rbss_aruco_pe/package.xml
Normal file
|
@ -0,0 +1,30 @@
|
|||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>rbss_aruco_pe</name>
|
||||
<version>0.0.1</version>
|
||||
<description>Aruco Marker Tracking and Pose Estimation wrapper for ROS2, using RGB cameras</description>
|
||||
<maintainer email="shaniks77s@gmail.com">shalenikol</maintainer>
|
||||
<license>Apache License 2.0</license>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
<buildtool_depend>ament_cmake_python</buildtool_depend>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>cv_bridge</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>lifecycle_msgs</depend>
|
||||
<depend>rbs_skill_interfaces</depend>
|
||||
<depend>open3d</depend>
|
||||
<depend>opencv</depend>
|
||||
<depend>tf_transformations</depend>
|
||||
|
||||
<test_depend>ament_lint_auto</test_depend>
|
||||
<test_depend>ament_lint_common</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_cmake</build_type>
|
||||
</export>
|
||||
</package>
|
8
rbss_aruco_pe/rbs_bt_example.xml
Normal file
8
rbss_aruco_pe/rbs_bt_example.xml
Normal file
|
@ -0,0 +1,8 @@
|
|||
<root BTCPP_format="4">
|
||||
<BehaviorTree ID="Main">
|
||||
<Sequence>
|
||||
<Action ID="RbsAction" do="ArucoPE" command="arucoConfig" sid="ArucoPE-example"></Action>
|
||||
<Action ID="RbsAction" do="ArucoPE" command="arucoStop" sid="ArucoPE-example"></Action>
|
||||
</Sequence>
|
||||
</BehaviorTree>
|
||||
</root>
|
81
rbss_aruco_pe/rbs_package.json
Normal file
81
rbss_aruco_pe/rbs_package.json
Normal file
|
@ -0,0 +1,81 @@
|
|||
{
|
||||
"skills": [
|
||||
{
|
||||
"sid": "ArucoPE-example",
|
||||
"SkillPackage": { "name": "Robossembler", "version": "1.0", "format": "1" },
|
||||
"Module": { "name": "ArucoPE", "node_name": "aruco_pe", "description": "Aruco Marker Tracking and Pose Estimation wrapper for ROS2, using RGB cameras" },
|
||||
"Launch": { "package": "rbss_aruco_pe", "executable": "aruco_node_lc.py" },
|
||||
"BTAction": [
|
||||
{
|
||||
"name": "arucoConfig",
|
||||
"type": "run",
|
||||
"param": [
|
||||
{
|
||||
"type": "topic",
|
||||
"dependency": {
|
||||
"type": "topic",
|
||||
"topicType": "sensor_msgs/msg/Image",
|
||||
"topicOut": "/rgbd_camera/image/image"
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "topic",
|
||||
"dependency": {
|
||||
"type": "topic",
|
||||
"topicType": "sensor_msgs/msg/Image",
|
||||
"topicOut": "/rgbd_camera/image/depth_image"
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "topic",
|
||||
"dependency": {
|
||||
"type": "topic",
|
||||
"topicType": "sensor_msgs/msg/CameraInfo",
|
||||
"topicOut": "/rgbd_camera/image/camera_info"
|
||||
}
|
||||
}
|
||||
],
|
||||
"typeAction": "ACTION"
|
||||
},
|
||||
{ "name": "arucoStop", "type": "stop", "param": [], "typeAction": "ACTION" }
|
||||
],
|
||||
"topicsOut": [
|
||||
{
|
||||
"name": "/aruco/markers",
|
||||
"type": "rbs_skill_interfaces/msg/ArucoMarkers"
|
||||
},
|
||||
{
|
||||
"name": "/aruco/poses",
|
||||
"type": "geometry_msgs/msg/PoseArray"
|
||||
},
|
||||
{
|
||||
"name": "/aruco/image",
|
||||
"type": "sensor_msgs/msg/Image"
|
||||
}
|
||||
],
|
||||
"Settings": {
|
||||
"output": {
|
||||
"params": [
|
||||
{
|
||||
"name": "is_image_mode",
|
||||
"value": "True"
|
||||
},
|
||||
{
|
||||
"name": "aruco_dictionary_id",
|
||||
"value": "DICT_6X6_50"
|
||||
},
|
||||
{
|
||||
"name": "marker_size",
|
||||
"value": "0.04"
|
||||
},
|
||||
{
|
||||
"name": "camera_frame",
|
||||
"value": ""
|
||||
}
|
||||
]
|
||||
},
|
||||
"type": "formBuilder"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
0
rbss_aruco_pe/rbss_aruco_pe/__init__.py
Normal file
0
rbss_aruco_pe/rbss_aruco_pe/__init__.py
Normal file
264
rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py
Executable file
264
rbss_aruco_pe/rbss_aruco_pe/pose_estimation.py
Executable file
|
@ -0,0 +1,264 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
# Code taken from:
|
||||
# https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation
|
||||
|
||||
# Code taken and readapted from:
|
||||
# https://github.com/GSNCodes/ArUCo-Markers-Pose-Estimation-Generation-Python/tree/main
|
||||
|
||||
# Python imports
|
||||
import numpy as np
|
||||
import cv2
|
||||
import tf_transformations
|
||||
|
||||
# ROS2 imports
|
||||
from rclpy.impl import rcutils_logger
|
||||
|
||||
# ROS2 message imports
|
||||
from geometry_msgs.msg import Pose, PoseArray
|
||||
from rbs_skill_interfaces.msg import ArucoMarkers
|
||||
|
||||
# utils import python code
|
||||
from rbss_aruco_pe.utils import aruco_display
|
||||
|
||||
|
||||
# def pose_estimation(rgb_frame: np.array, depth_frame: np.array, aruco_detector: cv2.aruco.ArucoDetector, marker_size: float,
|
||||
# matrix_coefficients: np.array, distortion_coefficients: np.array,
|
||||
# pose_array: PoseArray, markers: ArucoMarkers) -> list[np.array, PoseArray, ArucoMarkers]:
|
||||
def pose_estimation(rgb_frame: np.array, depth_frame: np.array, aruco_dict, marker_size: float,
|
||||
matrix_coefficients: np.array, distortion_coefficients: np.array,
|
||||
pose_array: PoseArray, markers: ArucoMarkers) -> list[np.array, PoseArray, ArucoMarkers]:
|
||||
'''
|
||||
rgb_frame - Frame from the RGB camera stream
|
||||
depth_frame - Depth frame from the depth camera stream
|
||||
matrix_coefficients - Intrinsic matrix of the calibrated camera
|
||||
distortion_coefficients - Distortion coefficients associated with your camera
|
||||
pose_array - PoseArray message to be published
|
||||
markers - ArucoMarkers message to be published
|
||||
|
||||
return:-
|
||||
frame - The frame with the axis drawn on it
|
||||
pose_array - PoseArray with computed poses of the markers
|
||||
markers - ArucoMarkers message containing markers id number and pose
|
||||
'''
|
||||
|
||||
# old code version
|
||||
parameters = cv2.aruco.DetectorParameters_create()
|
||||
corners, marker_ids, _ = cv2.aruco.detectMarkers(rgb_frame, aruco_dict, parameters=parameters)
|
||||
|
||||
# new code version of cv2 (4.7.0)
|
||||
# corners, marker_ids, rejected = aruco_detector.detectMarkers(image=rgb_frame)
|
||||
|
||||
frame_processed = rgb_frame
|
||||
logger = rcutils_logger.RcutilsLogger(name="aruco_node")
|
||||
|
||||
# If markers are detected
|
||||
if len(corners) > 0:
|
||||
|
||||
logger.debug(f"Detected {len(corners)} markers.")
|
||||
|
||||
for i, marker_id in enumerate(marker_ids):
|
||||
# Estimate pose of each marker and return the values rvec and tvec
|
||||
|
||||
# using deprecated function
|
||||
# rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(corners=corners[i],
|
||||
# markerLength=marker_size,
|
||||
# cameraMatrix=matrix_coefficients,
|
||||
# distCoeffs=distortion_coefficients)
|
||||
# tvec = tvec[0]
|
||||
|
||||
# alternative code version using solvePnP
|
||||
tvec, rvec, quat = my_estimatePoseSingleMarkers(corners=corners[i], marker_size=marker_size,
|
||||
camera_matrix=matrix_coefficients,
|
||||
distortion=distortion_coefficients)
|
||||
|
||||
# show the detected markers bounding boxes
|
||||
frame_processed = aruco_display(corners=corners, ids=marker_ids,
|
||||
image=frame_processed)
|
||||
|
||||
# draw frame axes
|
||||
frame_processed = cv2.drawFrameAxes(image=frame_processed, cameraMatrix=matrix_coefficients,
|
||||
distCoeffs=distortion_coefficients, rvec=rvec, tvec=tvec,
|
||||
length=0.05, thickness=3)
|
||||
|
||||
if (depth_frame is not None):
|
||||
# get the centroid of the pointcloud
|
||||
centroid = depth_to_pointcloud_centroid(depth_image=depth_frame,
|
||||
intrinsic_matrix=matrix_coefficients,
|
||||
corners=corners[i])
|
||||
|
||||
# log comparison between depthcloud centroid and tvec estimated positions
|
||||
logger.info(f"depthcloud centroid = {centroid}")
|
||||
logger.info(f"tvec = {tvec[0]} {tvec[1]} {tvec[2]}")
|
||||
|
||||
# compute pose from the rvec and tvec arrays
|
||||
if (depth_frame is not None):
|
||||
# use computed centroid from depthcloud as estimated pose
|
||||
pose = Pose()
|
||||
pose.position.x = float(centroid[0])
|
||||
pose.position.y = float(centroid[1])
|
||||
pose.position.z = float(centroid[2])
|
||||
else:
|
||||
# use tvec from aruco estimator as estimated pose
|
||||
pose = Pose()
|
||||
pose.position.x = float(tvec[0])
|
||||
pose.position.y = float(tvec[1])
|
||||
pose.position.z = float(tvec[2])
|
||||
|
||||
pose.orientation.x = quat[0]
|
||||
pose.orientation.y = quat[1]
|
||||
pose.orientation.z = quat[2]
|
||||
pose.orientation.w = quat[3]
|
||||
|
||||
# add the pose and marker id to the pose_array and markers messages
|
||||
pose_array.poses.append(pose)
|
||||
markers.poses.append(pose)
|
||||
markers.marker_ids.append(marker_id[0])
|
||||
|
||||
return frame_processed, pose_array, markers
|
||||
|
||||
|
||||
def my_estimatePoseSingleMarkers(corners, marker_size, camera_matrix, distortion) -> tuple[np.array, np.array, np.array]:
|
||||
'''
|
||||
This will estimate the rvec and tvec for each of the marker corners detected by:
|
||||
corners, ids, rejectedImgPoints = detector.detectMarkers(image)
|
||||
|
||||
corners - is an array of detected corners for each detected marker in the image
|
||||
marker_size - is the size of the detected markers in meters
|
||||
mtx - is the camera intrinsic matrix
|
||||
distortion - is the camera distortion matrix
|
||||
RETURN list of rvecs, tvecs, and trash (so that it corresponds to the old estimatePoseSingleMarkers())
|
||||
'''
|
||||
marker_points = np.array([[-marker_size / 2.0, marker_size / 2.0, 0],
|
||||
[marker_size / 2.0, marker_size / 2.0, 0],
|
||||
[marker_size / 2.0, -marker_size / 2.0, 0],
|
||||
[-marker_size / 2.0, -marker_size / 2.0, 0]], dtype=np.float32)
|
||||
|
||||
# solvePnP returns the rotation and translation vectors
|
||||
retval, rvec, tvec = cv2.solvePnP(objectPoints=marker_points, imagePoints=corners,
|
||||
cameraMatrix=camera_matrix, distCoeffs=distortion, flags=cv2.SOLVEPNP_IPPE_SQUARE)
|
||||
rvec = rvec.reshape(3, 1)
|
||||
tvec = tvec.reshape(3, 1)
|
||||
|
||||
rot, jacobian = cv2.Rodrigues(rvec)
|
||||
rot_matrix = np.eye(4, dtype=np.float32)
|
||||
rot_matrix[0:3, 0:3] = rot
|
||||
|
||||
# convert rotation matrix to quaternion
|
||||
quaternion = tf_transformations.quaternion_from_matrix(rot_matrix)
|
||||
norm_quat = np.linalg.norm(quaternion)
|
||||
quaternion = quaternion / norm_quat
|
||||
|
||||
return tvec, rvec, quaternion
|
||||
|
||||
|
||||
def depth_to_pointcloud_centroid(depth_image: np.array, intrinsic_matrix: np.array,
|
||||
corners: np.array) -> np.array:
|
||||
"""
|
||||
This function takes a depth image and the corners of a quadrilateral as input,
|
||||
and returns the centroid of the corresponding pointcloud.
|
||||
|
||||
Args:
|
||||
depth_image: A 2D numpy array representing the depth image.
|
||||
corners: A list of 4 tuples, each representing the (x, y) coordinates of a corner.
|
||||
|
||||
Returns:
|
||||
A tuple (x, y, z) representing the centroid of the segmented pointcloud.
|
||||
"""
|
||||
|
||||
# Get image parameters
|
||||
height, width = depth_image.shape
|
||||
|
||||
|
||||
# Check if all corners are within image bounds
|
||||
# corners has shape (1, 4, 2)
|
||||
corners_indices = np.array([(int(x), int(y)) for x, y in corners[0]])
|
||||
|
||||
for x, y in corners_indices:
|
||||
if x < 0 or x >= width or y < 0 or y >= height:
|
||||
raise ValueError("One or more corners are outside the image bounds.")
|
||||
|
||||
# bounding box of the polygon
|
||||
x_min = int(min(corners_indices[:, 0]))
|
||||
x_max = int(max(corners_indices[:, 0]))
|
||||
y_min = int(min(corners_indices[:, 1]))
|
||||
y_max = int(max(corners_indices[:, 1]))
|
||||
|
||||
# create array of pixels inside the polygon defined by the corners
|
||||
# search for pixels inside the squared bounding box of the polygon
|
||||
points = []
|
||||
for x in range(x_min, x_max):
|
||||
for y in range(y_min, y_max):
|
||||
if is_pixel_in_polygon(pixel=(x, y), corners=corners_indices):
|
||||
# add point to the list of points
|
||||
points.append([x, y, depth_image[y, x]])
|
||||
|
||||
# Convert points to numpy array
|
||||
points = np.array(points, dtype=np.uint16)
|
||||
|
||||
# convert to open3d image
|
||||
#depth_segmented = geometry.Image(points)
|
||||
# create pinhole camera model
|
||||
#pinhole_matrix = camera.PinholeCameraIntrinsic(width=width, height=height,
|
||||
# intrinsic_matrix=intrinsic_matrix)
|
||||
# Convert points to Open3D pointcloud
|
||||
#pointcloud = geometry.PointCloud.create_from_depth_image(depth=depth_segmented, intrinsic=pinhole_matrix,
|
||||
# depth_scale=1000.0)
|
||||
|
||||
# apply formulas to pointcloud, where
|
||||
# fx = intrinsic_matrix[0, 0], fy = intrinsic_matrix[1, 1]
|
||||
# cx = intrinsic_matrix[0, 2], cy = intrinsic_matrix[1, 2],
|
||||
# u = x, v = y, d = depth_image[y, x], depth_scale = 1000.0,
|
||||
# z = d / depth_scale
|
||||
# x = (u - cx) * z / fx
|
||||
# y = (v - cy) * z / fy
|
||||
|
||||
# create pointcloud
|
||||
pointcloud = []
|
||||
for x, y, d in points:
|
||||
z = d / 1000.0
|
||||
x = (x - intrinsic_matrix[0, 2]) * z / intrinsic_matrix[0, 0]
|
||||
y = (y - intrinsic_matrix[1, 2]) * z / intrinsic_matrix[1, 1]
|
||||
pointcloud.append([x, y, z])
|
||||
|
||||
# Calculate centroid from pointcloud
|
||||
centroid = np.mean(np.array(pointcloud, dtype=np.uint16), axis=0)
|
||||
|
||||
return centroid
|
||||
|
||||
|
||||
def is_pixel_in_polygon(pixel: tuple, corners: np.array) -> bool:
|
||||
"""
|
||||
This function takes a pixel and a list of corners as input, and returns whether the pixel is inside the polygon
|
||||
defined by the corners. This function uses the ray casting algorithm to determine if the pixel is inside the polygon.
|
||||
This algorithm works by casting a ray from the pixel in the positive x-direction, and counting the number of times
|
||||
the ray intersects with the edges of the polygon. If the number of intersections is odd, the pixel is inside the
|
||||
polygon, otherwise it is outside. This algorithm works for both convex and concave polygons.
|
||||
|
||||
Args:
|
||||
pixel: A tuple (x, y) representing the pixel coordinates.
|
||||
corners: A list of 4 tuples in a numpy array, each representing the (x, y) coordinates of a corner.
|
||||
|
||||
Returns:
|
||||
A boolean indicating whether the pixel is inside the polygon.
|
||||
"""
|
||||
|
||||
# Initialize counter for number of intersections
|
||||
num_intersections = 0
|
||||
|
||||
# Iterate over each edge of the polygon
|
||||
for i in range(len(corners)):
|
||||
x1, y1 = corners[i]
|
||||
x2, y2 = corners[(i + 1) % len(corners)]
|
||||
|
||||
# Check if the pixel is on the same y-level as the edge
|
||||
if (y1 <= pixel[1] < y2) or (y2 <= pixel[1] < y1):
|
||||
# Calculate the x-coordinate of the intersection point
|
||||
x_intersection = (x2 - x1) * (pixel[1] - y1) / (y2 - y1) + x1
|
||||
|
||||
# Check if the intersection point is to the right of the pixel
|
||||
if x_intersection > pixel[0]:
|
||||
num_intersections += 1
|
||||
|
||||
# Return whether the number of intersections is odd
|
||||
return num_intersections % 2 == 1
|
72
rbss_aruco_pe/rbss_aruco_pe/utils.py
Executable file
72
rbss_aruco_pe/rbss_aruco_pe/utils.py
Executable file
|
@ -0,0 +1,72 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import cv2
|
||||
from rclpy.impl import rcutils_logger
|
||||
|
||||
ARUCO_DICT = {
|
||||
"DICT_4X4_50": cv2.aruco.DICT_4X4_50,
|
||||
"DICT_4X4_100": cv2.aruco.DICT_4X4_100,
|
||||
"DICT_4X4_250": cv2.aruco.DICT_4X4_250,
|
||||
"DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
|
||||
"DICT_5X5_50": cv2.aruco.DICT_5X5_50,
|
||||
"DICT_5X5_100": cv2.aruco.DICT_5X5_100,
|
||||
"DICT_5X5_250": cv2.aruco.DICT_5X5_250,
|
||||
"DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
|
||||
"DICT_6X6_50": cv2.aruco.DICT_6X6_50,
|
||||
"DICT_6X6_100": cv2.aruco.DICT_6X6_100,
|
||||
"DICT_6X6_250": cv2.aruco.DICT_6X6_250,
|
||||
"DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
|
||||
"DICT_7X7_50": cv2.aruco.DICT_7X7_50,
|
||||
"DICT_7X7_100": cv2.aruco.DICT_7X7_100,
|
||||
"DICT_7X7_250": cv2.aruco.DICT_7X7_250,
|
||||
"DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
|
||||
"DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL,
|
||||
"DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5,
|
||||
"DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9,
|
||||
"DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10,
|
||||
"DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11
|
||||
}
|
||||
|
||||
|
||||
def aruco_display(corners, ids, image):
|
||||
'''
|
||||
This function draws the axes on the image for each of the detected markers,
|
||||
given the rotation and translation vectors of the input camera image frame.
|
||||
:param corners: corners of the detected marker regions
|
||||
:param ids: ArUco marker IDs
|
||||
:param image: input image frame
|
||||
:return: RGB image with the axes drawn on it
|
||||
'''
|
||||
if len(corners) > 0:
|
||||
# flatten the ArUco IDs list
|
||||
ids = ids.flatten()
|
||||
# loop over the detected ArUCo corners
|
||||
for (markerCorner, markerID) in zip(corners, ids):
|
||||
# extract the marker corners (which are always returned in
|
||||
# top-left, top-right, bottom-right, and bottom-left order)
|
||||
corners = markerCorner.reshape((4, 2))
|
||||
(topLeft, topRight, bottomRight, bottomLeft) = corners
|
||||
# convert each of the (x, y)-coordinate pairs to integers
|
||||
topRight = (int(topRight[0]), int(topRight[1]))
|
||||
bottomRight = (int(bottomRight[0]), int(bottomRight[1]))
|
||||
bottomLeft = (int(bottomLeft[0]), int(bottomLeft[1]))
|
||||
topLeft = (int(topLeft[0]), int(topLeft[1]))
|
||||
|
||||
cv2.line(image, topLeft, topRight, (0, 255, 0), 2)
|
||||
cv2.line(image, topRight, bottomRight, (0, 255, 0), 2)
|
||||
cv2.line(image, bottomRight, bottomLeft, (0, 255, 0), 2)
|
||||
cv2.line(image, bottomLeft, topLeft, (0, 255, 0), 2)
|
||||
# compute and draw the center (x, y)-coordinates of the ArUco
|
||||
# marker
|
||||
cX = int((topLeft[0] + bottomRight[0]) / 2.0)
|
||||
cY = int((topLeft[1] + bottomRight[1]) / 2.0)
|
||||
cv2.circle(image, (cX, cY), 4, (0, 0, 255), -1)
|
||||
# draw the ArUco marker ID on the image
|
||||
cv2.putText(image, str(markerID), (topLeft[0], topLeft[1] - 10), cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.5, (0, 255, 0), 2)
|
||||
|
||||
logger = rcutils_logger.RcutilsLogger(name="aruco_node")
|
||||
logger.debug("[Inference] ArUco markers IDs: {}".format(ids))
|
||||
|
||||
return image
|
||||
|
369
rbss_aruco_pe/scripts/aruco_node_lc.py
Executable file
369
rbss_aruco_pe/scripts/aruco_node_lc.py
Executable file
|
@ -0,0 +1,369 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Code taken from:
|
||||
https://github.com/AIRLab-POLIMI/ros2-aruco-pose-estimation
|
||||
ROS2 wrapper code taken from:
|
||||
https://github.com/JMU-ROBOTICS-VIVA/ros2_aruco/tree/main
|
||||
|
||||
This node locates Aruco AR markers in images and publishes their ids and poses.
|
||||
|
||||
Subscriptions:
|
||||
/camera/image/image (sensor_msgs.msg.Image)
|
||||
/camera/image/camera_info (sensor_msgs.msg.CameraInfo)
|
||||
|
||||
Published Topics:
|
||||
/aruco/poses (geometry_msgs.msg.PoseArray)
|
||||
Pose of all detected markers (suitable for rviz visualization)
|
||||
|
||||
/aruco/markers (aruco_interfaces.msg.ArucoMarkers)
|
||||
Provides an array of all poses along with the corresponding
|
||||
marker ids.
|
||||
|
||||
/aruco/image (sensor_msgs.msg.Image)
|
||||
Annotated image with marker locations and ids, with markers drawn on it
|
||||
|
||||
Parameters:
|
||||
marker_size - size of the markers in meters (default 0.04)
|
||||
aruco_dictionary_id - dictionary that was used to generate markers (default DICT_6X6_50)
|
||||
image_topic - image topic to subscribe to (default /camera/color/image_raw)
|
||||
camera_info_topic - camera info topic to subscribe to (default /camera/camera_info)
|
||||
camera_frame - camera optical frame to use (default "camera_depth_optical_frame")
|
||||
detected_markers_topic - topic to publish detected markers (default /aruco/markers)
|
||||
markers_visualization_topic - topic to publish markers visualization (default /aruco/poses)
|
||||
output_image_topic - topic to publish annotated image (default /aruco/image)
|
||||
|
||||
Version: 2025-04-08
|
||||
"""
|
||||
|
||||
# ROS2 imports
|
||||
import rclpy
|
||||
# import rclpy.node
|
||||
from rclpy.qos import qos_profile_sensor_data
|
||||
from cv_bridge import CvBridge
|
||||
import message_filters
|
||||
from rclpy.lifecycle import Node
|
||||
# from rclpy.lifecycle import Publisher
|
||||
from rclpy.lifecycle import LifecycleState
|
||||
from rclpy.lifecycle import TransitionCallbackReturn
|
||||
|
||||
# Python imports
|
||||
import numpy as np
|
||||
import cv2
|
||||
import json
|
||||
|
||||
# Local imports for custom defined functions
|
||||
from rbss_aruco_pe.utils import ARUCO_DICT
|
||||
from rbss_aruco_pe.pose_estimation import pose_estimation
|
||||
|
||||
# ROS2 message imports
|
||||
from sensor_msgs.msg import CameraInfo, Image
|
||||
from geometry_msgs.msg import PoseArray
|
||||
# from rcl_interfaces.msg import ParameterDescriptor, ParameterType
|
||||
from rbs_skill_interfaces.msg import ArucoMarkers
|
||||
|
||||
NODE_NAME_DEFAULT = "aruco_pe"
|
||||
PARAM_SKILL_CFG = "aruco_pe_cfg"
|
||||
|
||||
DEFAULT_ARUCO_DICTIONARY = "DICT_6X6_50"
|
||||
DEFAULT_MARKER_SIZE = 0.04
|
||||
# Input topics
|
||||
INP_IMAGE = "/camera/image/image"
|
||||
INP_DEPTH_IMAGE = "/camera/image/depth_image"
|
||||
INP_CAMERA_INFO = "/camera/image/camera_info"
|
||||
CAMERA_FRAME = "camera_color_optical_frame" # Camera link frame of reference
|
||||
# Output topics
|
||||
OUT_DEFAULT_DETECTED_MARKERS = "/aruco/markers"
|
||||
OUT_TT_DETECTED_MARKERS = "rbs_skill_interfaces/msg/ArucoMarkers"
|
||||
OUT_DEFAULT_MARKERS_VISUALISATION = "/aruco/poses"
|
||||
OUT_TT_MARKERS_VISUALISATION = "geometry_msgs/msg/PoseArray"
|
||||
OUT_DEFAULT_IMAGE = "/aruco/image"
|
||||
OUT_TT_IMAGE = "sensor_msgs/msg/Image"
|
||||
|
||||
class ArucoLcNode(Node):
|
||||
"""Our lifecycle node."""
|
||||
def __init__(self, **kwargs):
|
||||
# default value
|
||||
self.dictionary_id_name = DEFAULT_ARUCO_DICTIONARY
|
||||
self.marker_size = DEFAULT_MARKER_SIZE # Size of the markers in meters
|
||||
self.use_depth_input = False # Use depth image for 3D pose estimation
|
||||
self.is_image_mode = False # Publish Image with detected Aruco markers
|
||||
|
||||
self.image_topic = INP_IMAGE
|
||||
self.info_topic = INP_CAMERA_INFO
|
||||
self.depth_image_topic = INP_DEPTH_IMAGE
|
||||
self.camera_frame = CAMERA_FRAME
|
||||
|
||||
self.detected_markers_topic = OUT_DEFAULT_DETECTED_MARKERS
|
||||
self.markers_visualization_topic = OUT_DEFAULT_MARKERS_VISUALISATION
|
||||
self.output_image_topic = OUT_DEFAULT_IMAGE
|
||||
|
||||
# for other nodes
|
||||
kwargs["allow_undeclared_parameters"] = True
|
||||
kwargs["automatically_declare_parameters_from_overrides"] = True
|
||||
super().__init__(NODE_NAME_DEFAULT, **kwargs)
|
||||
str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
|
||||
|
||||
self.skill_cfg = json.loads(str_cfg)
|
||||
self.nodeName = NODE_NAME_DEFAULT
|
||||
|
||||
self.initialize_parameters()
|
||||
|
||||
# Make sure we have a valid dictionary id:
|
||||
try:
|
||||
dictionary_id = cv2.aruco.__getattribute__(self.dictionary_id_name)
|
||||
# check if the dictionary_id is a valid dictionary inside ARUCO_DICT values
|
||||
if dictionary_id not in ARUCO_DICT.values():
|
||||
raise AttributeError
|
||||
except AttributeError:
|
||||
self.get_logger().error(f"bad aruco_dictionary_id: {self.dictionary_id_name}")
|
||||
options = "\n".join([s for s in ARUCO_DICT])
|
||||
self.get_logger().error(f"valid options: {options}")
|
||||
return False
|
||||
|
||||
self.aruco_dictionary = cv2.aruco.getPredefinedDictionary(dictionary_id)
|
||||
|
||||
self.bridge = CvBridge()
|
||||
|
||||
# Set up publishers
|
||||
self.poses_pub = self.create_lifecycle_publisher(PoseArray, self.markers_visualization_topic, 5)
|
||||
self.markers_pub = self.create_lifecycle_publisher(ArucoMarkers, self.detected_markers_topic, 5)
|
||||
self.image_pub = self.create_lifecycle_publisher(Image, self.output_image_topic, 5)
|
||||
|
||||
self.clear_param()
|
||||
|
||||
def clear_param(self):
|
||||
# Set up fields for camera parameters
|
||||
self.info_msg = None
|
||||
self.intrinsic_mat = None
|
||||
self.distortion = None
|
||||
|
||||
def initialize_parameters(self):
|
||||
"""
|
||||
Read parameters from skill config (via interface node)
|
||||
"""
|
||||
self._Settings()
|
||||
for prop in self.skill_cfg["topicsOut"]:
|
||||
ty = prop["type"]
|
||||
if ty == OUT_TT_DETECTED_MARKERS:
|
||||
self.detected_markers_topic = prop["name"]
|
||||
elif ty == OUT_TT_MARKERS_VISUALISATION:
|
||||
self.markers_visualization_topic = prop["name"]
|
||||
elif ty == OUT_TT_IMAGE:
|
||||
self.output_image_topic = prop["name"]
|
||||
|
||||
def _Settings(self):
|
||||
# Initialization service settings
|
||||
for prop in self.skill_cfg["Settings"]["output"]["params"]:
|
||||
nam = prop["name"]
|
||||
val = prop["value"]
|
||||
if nam == "aruco_dictionary_id":
|
||||
self.aruco_dictionary_id = val
|
||||
elif nam == "marker_size":
|
||||
self.marker_size = float(val)
|
||||
elif nam == "camera_frame": # TODO перенести в dependencies для "arucoConfig"
|
||||
self.camera_frame = val
|
||||
elif nam == "is_image_mode":
|
||||
self.is_image_mode = (val == "True")
|
||||
|
||||
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
|
||||
"""
|
||||
Configure the node, after a configuring transition is requested.
|
||||
"""
|
||||
json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
|
||||
self.skill_cfg = json.loads(json_param)
|
||||
for comm in self.skill_cfg["BTAction"]:
|
||||
for par in comm["param"]:
|
||||
dependency = par["dependency"]
|
||||
if par["type"] == "topic":
|
||||
topic_type = dependency["topicType"]
|
||||
topic_name = dependency["topicOut"]
|
||||
if "CameraInfo" in topic_type:
|
||||
self.info_topic = topic_name
|
||||
elif "Image" in topic_type:
|
||||
if "depth" in topic_name:
|
||||
self.depth_image_topic = topic_name
|
||||
else:
|
||||
self.image_topic = topic_name
|
||||
# elif par["type"] == "formBuilder":
|
||||
# self.objName = dependency["output"]["object_name"]
|
||||
# val = dependency["output"]["process"]["selectProcess"]["value"]
|
||||
|
||||
# camera info topic for the camera calibration parameters
|
||||
self.info_sub = self.create_subscription(CameraInfo, self.info_topic, self.info_callback, qos_profile_sensor_data)
|
||||
|
||||
self.get_logger().info("on_configure is called.")
|
||||
return TransitionCallbackReturn.SUCCESS
|
||||
|
||||
def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
|
||||
# select the type of input to use for the pose estimation
|
||||
if self.use_depth_input:
|
||||
# use both rgb and depth image topics for the pose estimation
|
||||
|
||||
# create a message filter to synchronize the image and depth image topics
|
||||
self.image_sub = message_filters.Subscriber(self, Image, self.image_topic, qos_profile=qos_profile_sensor_data)
|
||||
self.depth_image_sub = message_filters.Subscriber(self, Image, self.depth_image_topic, qos_profile=qos_profile_sensor_data)
|
||||
|
||||
# create synchronizer between the 2 topics using message filters and approximate time policy
|
||||
# slop is the maximum time difference between messages that are considered synchronized
|
||||
self.synchronizer = message_filters.ApproximateTimeSynchronizer(
|
||||
[self.image_sub, self.depth_image_sub], queue_size=10, slop=0.05
|
||||
)
|
||||
self.synchronizer.registerCallback(self.rgb_depth_sync_callback)
|
||||
else:
|
||||
# rely only on the rgb image topic for the pose estimation
|
||||
|
||||
# create a subscription to the image topic
|
||||
self.image_sub = self.create_subscription(Image, self.image_topic, self.image_callback, qos_profile_sensor_data)
|
||||
|
||||
self.get_logger().info("on_activate is called")
|
||||
return super().on_activate(state)
|
||||
|
||||
def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
|
||||
if self.use_depth_input:
|
||||
# TODO not tested
|
||||
self.destroy_subscription(self.image_sub)
|
||||
self.destroy_subscription(self.depth_image_sub)
|
||||
else:
|
||||
# Destroy the Image subscriber.
|
||||
self.destroy_subscription(self.image_sub)
|
||||
|
||||
self.get_logger().info("on_deactivate is called")
|
||||
return super().on_deactivate(state)
|
||||
|
||||
def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
|
||||
self.destroy_subscription(self.info_sub)
|
||||
|
||||
self.clear_param()
|
||||
|
||||
self.get_logger().info("on_cleanup is called")
|
||||
return TransitionCallbackReturn.SUCCESS
|
||||
|
||||
def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn:
|
||||
self.get_logger().info("on_shutdown is called")
|
||||
return TransitionCallbackReturn.SUCCESS
|
||||
|
||||
def info_callback(self, info_msg):
|
||||
self.info_msg = info_msg
|
||||
# get the intrinsic matrix and distortion coefficients from the camera info
|
||||
self.intrinsic_mat = np.reshape(np.array(self.info_msg.k), (3, 3))
|
||||
self.distortion = np.array(self.info_msg.d)
|
||||
|
||||
self.get_logger().info("Camera info received.")
|
||||
self.get_logger().info(f"Intrinsic matrix: {self.intrinsic_mat}")
|
||||
self.get_logger().info(f"Distortion coefficients: {self.distortion}")
|
||||
self.get_logger().info(f"Camera frame: {self.info_msg.width}x{self.info_msg.height}")
|
||||
|
||||
# Assume that camera parameters will remain the same...
|
||||
self.destroy_subscription(self.info_sub)
|
||||
|
||||
def image_callback(self, img_msg: Image):
|
||||
if self.info_msg is None:
|
||||
self.get_logger().warn("No camera info has been received!")
|
||||
return
|
||||
|
||||
# convert the image messages to cv2 format
|
||||
cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding="rgb8")
|
||||
|
||||
# create the ArucoMarkers and PoseArray messages
|
||||
markers = ArucoMarkers()
|
||||
pose_array = PoseArray()
|
||||
|
||||
# Set the frame id and timestamp for the markers and pose array
|
||||
if self.camera_frame == "":
|
||||
markers.header.frame_id = self.info_msg.header.frame_id
|
||||
pose_array.header.frame_id = self.info_msg.header.frame_id
|
||||
else:
|
||||
markers.header.frame_id = self.camera_frame
|
||||
pose_array.header.frame_id = self.camera_frame
|
||||
|
||||
markers.header.stamp = img_msg.header.stamp
|
||||
pose_array.header.stamp = img_msg.header.stamp
|
||||
|
||||
"""
|
||||
# OVERRIDE: use calibrated intrinsic matrix and distortion coefficients
|
||||
self.intrinsic_mat = np.reshape([615.95431, 0., 325.26983,
|
||||
0., 617.92586, 257.57722,
|
||||
0., 0., 1.], (3, 3))
|
||||
self.distortion = np.array([0.142588, -0.311967, 0.003950, -0.006346, 0.000000])
|
||||
"""
|
||||
|
||||
# call the pose estimation function
|
||||
frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
|
||||
aruco_dict=self.aruco_dictionary,
|
||||
marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
|
||||
distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
|
||||
# frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=None,
|
||||
# aruco_detector=self.aruco_detector,
|
||||
# marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
|
||||
# distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
|
||||
|
||||
# if some markers are detected
|
||||
if len(markers.marker_ids) > 0:
|
||||
# Publish the results with the poses and markes positions
|
||||
self.poses_pub.publish(pose_array)
|
||||
self.markers_pub.publish(markers)
|
||||
|
||||
# publish the image frame with computed markers positions over the image
|
||||
self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))
|
||||
|
||||
def rgb_depth_sync_callback(self, rgb_msg: Image, depth_msg: Image):
|
||||
|
||||
# convert the image messages to cv2 format
|
||||
cv_depth_image = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding="16UC1")
|
||||
cv_image = self.bridge.imgmsg_to_cv2(rgb_msg, desired_encoding="rgb8")
|
||||
|
||||
# create the ArucoMarkers and PoseArray messages
|
||||
markers = ArucoMarkers()
|
||||
pose_array = PoseArray()
|
||||
|
||||
# Set the frame id and timestamp for the markers and pose array
|
||||
if self.camera_frame == "":
|
||||
markers.header.frame_id = self.info_msg.header.frame_id
|
||||
pose_array.header.frame_id = self.info_msg.header.frame_id
|
||||
else:
|
||||
markers.header.frame_id = self.camera_frame
|
||||
pose_array.header.frame_id = self.camera_frame
|
||||
|
||||
markers.header.stamp = rgb_msg.header.stamp
|
||||
pose_array.header.stamp = rgb_msg.header.stamp
|
||||
|
||||
# call the pose estimation function
|
||||
frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image,
|
||||
aruco_dict=self.aruco_dictionary,
|
||||
marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
|
||||
distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
|
||||
# frame, pose_array, markers = pose_estimation(rgb_frame=cv_image, depth_frame=cv_depth_image,
|
||||
# aruco_detector=self.aruco_detector,
|
||||
# marker_size=self.marker_size, matrix_coefficients=self.intrinsic_mat,
|
||||
# distortion_coefficients=self.distortion, pose_array=pose_array, markers=markers)
|
||||
|
||||
# if some markers are detected
|
||||
if len(markers.marker_ids) > 0:
|
||||
# Publish the results with the poses and markes positions
|
||||
self.poses_pub.publish(pose_array)
|
||||
self.markers_pub.publish(markers)
|
||||
|
||||
# publish the image frame with computed markers positions over the image
|
||||
self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "rgb8"))
|
||||
|
||||
def main():
|
||||
# rclpy.init()
|
||||
# node = ArucoNode()
|
||||
# rclpy.spin(node)
|
||||
|
||||
# node.destroy_node()
|
||||
# rclpy.shutdown()
|
||||
rclpy.init()
|
||||
|
||||
executor = rclpy.executors.SingleThreadedExecutor()
|
||||
lc_node = ArucoLcNode()
|
||||
if lc_node:
|
||||
executor.add_node(lc_node)
|
||||
try:
|
||||
executor.spin()
|
||||
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
|
||||
lc_node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Add table
Add a link
Reference in a new issue