485 lines
No EOL
18 KiB
Python
Executable file
485 lines
No EOL
18 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
pose_estimation_lifecycle_node
|
|
ROS 2 program for 6D Pose Estimation
|
|
|
|
@shalenikol release 0.3
|
|
"""
|
|
|
|
from typing import Optional
|
|
import os
|
|
import subprocess
|
|
import shutil
|
|
import json
|
|
import tempfile
|
|
from pathlib import Path
|
|
import transforms3d as t3d
|
|
|
|
import rclpy
|
|
from rclpy.lifecycle import Node
|
|
from rclpy.lifecycle import Publisher
|
|
from rclpy.lifecycle import State
|
|
from rclpy.lifecycle import TransitionCallbackReturn
|
|
from rclpy.timer import Timer
|
|
|
|
from ament_index_python.packages import get_package_share_directory
|
|
from sensor_msgs.msg import Image, CameraInfo
|
|
from geometry_msgs.msg import Pose, TransformStamped
|
|
from tf2_ros import TransformBroadcaster, TransformException
|
|
from tf2_ros.buffer import Buffer
|
|
from tf2_ros.transform_listener import TransformListener
|
|
|
|
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
|
|
import cv2 # OpenCV library
|
|
|
|
class PoseEstimator(Node):
|
|
"""Our lifecycle node."""
|
|
|
|
def _InitService(self):
|
|
# Initialization service data
|
|
p = os.path.join(get_package_share_directory("rbs_perception"), "config", "pose_estimation_config.json")
|
|
# load config
|
|
with open(p, "r") as f:
|
|
y = json.load(f)
|
|
|
|
for name, val in y.items():
|
|
if name == "nodeName":
|
|
self.nodeName = val
|
|
elif name == "cameraLink":
|
|
self.camera_link = val
|
|
elif name == "topicImage":
|
|
self.topicImage = val
|
|
elif name == "topicCameraInfo":
|
|
self.topicCameraInfo = val
|
|
elif name == "topicDepth":
|
|
self.topicDepth = val
|
|
elif name == "publishDelay":
|
|
self.publishDelay = val
|
|
elif name == "topicSrv":
|
|
self.topicSrv = val
|
|
elif name == "tf2_send_pose":
|
|
self.tf2_send_pose = val
|
|
elif name == "mesh_scale":
|
|
self.mesh_scale = val
|
|
|
|
def __init__(self, node_name, **kwargs):
|
|
"""Construct the node."""
|
|
self._count: int = 0
|
|
self._pub: Optional[Publisher] = None
|
|
self._timer: Optional[Timer] = None
|
|
self._image_cnt: int = 0
|
|
self._sub = None
|
|
self._sub_info = None
|
|
self._sub_depth = None
|
|
self._is_camerainfo = False
|
|
self._K = [[0., 0., 0.], [0., 0., 0.], [0., 0., 0.]]
|
|
self._res = [0, 0]
|
|
self._pose = [[1., 0., 0., 0.], [0., 0., 0.]] # pose in format "TWO" (megapose)
|
|
self.tf2_send_pose = 0
|
|
self.mesh_scale = 1.0
|
|
self.quat_rotate_xyz = [0.5, 0.5, -0.5, -0.5]
|
|
self.megapose_model = None
|
|
self.darknet_path = ""
|
|
|
|
self.nodeName = node_name
|
|
self.camera_link = "outer_rgbd_camera"
|
|
self.topicImage = "/outer_rgb_camera/image"
|
|
self.topicCameraInfo = "/outer_rgb_camera/camera_info"
|
|
self.topicDepth = "/outer_rgbd_camera/depth_image"
|
|
self.publishDelay = 2.0
|
|
self.topicSrv = self.nodeName + "/detect6Dpose"
|
|
self._InitService()
|
|
self.camera_pose = Pose()
|
|
|
|
self.tmpdir = tempfile.gettempdir()
|
|
self.mytemppath = Path(self.tmpdir) / "rbs_per"
|
|
self.mytemppath.mkdir(exist_ok=True)
|
|
|
|
# for other nodes
|
|
kwargs["allow_undeclared_parameters"] = True
|
|
kwargs["automatically_declare_parameters_from_overrides"] = True
|
|
super().__init__(self.nodeName, **kwargs)
|
|
self.declare_parameter("mesh_path", rclpy.Parameter.Type.STRING)
|
|
|
|
self.tf_buffer = Buffer()
|
|
self.tf_listener = TransformListener(self.tf_buffer, self)
|
|
# Initialize the transform broadcaster
|
|
self.tf_broadcaster = TransformBroadcaster(self)
|
|
# Used to convert between ROS and OpenCV images
|
|
self.br = CvBridge()
|
|
|
|
self.objName = ""
|
|
self.objMeshFile = ""
|
|
self.objPath = ""
|
|
|
|
def publish(self):
|
|
"""Publish a new message when enabled."""
|
|
self._count += 1
|
|
|
|
if self._pub is not None and self._pub.is_activated:
|
|
# опубликуем результат оценки позы # publish pose estimation result
|
|
q = self._pose[0]
|
|
t = self._pose[1]
|
|
# transform from View space in World space
|
|
# rotate
|
|
#q_mir = t3d.quaternions.qmult(self.quat_rotate_xyz, q)
|
|
# translate (xyz = +z-x-y)
|
|
#t_mir = [t[2], -t[0], -t[1]]
|
|
|
|
#cam_p = self.camera_pose.position
|
|
#cam_Q = self.camera_pose.orientation
|
|
# q_rot = [msgQ.w, msgQ.x, msgQ.y, msgQ.z]
|
|
# q = t3d.quaternions.qinverse(q) #q[0] = -q[0]
|
|
# q_mir = t3d.quaternions.qmult(q_rot,q)
|
|
|
|
#q_mir = q
|
|
|
|
p = Pose()
|
|
p.position.x = t[0]
|
|
p.position.y = t[1]
|
|
p.position.z = t[2]
|
|
p.orientation.w = q[0]
|
|
p.orientation.x = q[1]
|
|
p.orientation.y = q[2]
|
|
p.orientation.z = q[3]
|
|
# p.position.x = t_mir[0] + cam_p.x
|
|
# p.position.y = t_mir[1] + cam_p.y
|
|
# p.position.z = t_mir[2] + cam_p.z
|
|
# p.orientation.w = q_mir[0]
|
|
# p.orientation.x = q_mir[1]
|
|
# p.orientation.y = q_mir[2]
|
|
# p.orientation.z = q_mir[3]
|
|
self._pub.publish(p)
|
|
|
|
if self.tf2_send_pose:
|
|
self.tf_obj_pose(t,q) #(self._pose)
|
|
|
|
def tf_obj_pose(self, tr, q):
|
|
"""
|
|
Передача позиции объекта в tf2
|
|
"""
|
|
t = TransformStamped()
|
|
# assign pose to corresponding tf variables
|
|
t.header.stamp = self.get_clock().now().to_msg()
|
|
t.header.frame_id = self.camera_link #'world'
|
|
t.child_frame_id = self.objName + "_pp"
|
|
# coordinates
|
|
#tr = pose[1]
|
|
t.transform.translation.x = tr[0] #.x
|
|
t.transform.translation.y = tr[1] #.y
|
|
t.transform.translation.z = tr[2] #.z
|
|
# rotation
|
|
#q = pose[0]
|
|
t.transform.rotation.w = q[0]
|
|
t.transform.rotation.x = q[1]
|
|
t.transform.rotation.y = q[2]
|
|
t.transform.rotation.z = q[3]
|
|
# Send the transformation
|
|
self.tf_broadcaster.sendTransform(t)
|
|
|
|
def on_configure(self, state: State) -> TransitionCallbackReturn:
|
|
"""
|
|
Configure the node, after a configuring transition is requested.
|
|
|
|
:return: The state machine either invokes a transition to the "inactive" state or stays
|
|
in "unconfigured" depending on the return value.
|
|
TransitionCallbackReturn.SUCCESS transitions to "inactive".
|
|
TransitionCallbackReturn.FAILURE transitions to "unconfigured".
|
|
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
|
|
"""
|
|
gtpose = None
|
|
str_param = self.get_parameter("mesh_path").get_parameter_value().string_value
|
|
if str_param[0] == "!": # filename json
|
|
json_file = str_param[1:]
|
|
if not os.path.isfile(json_file):
|
|
self.get_logger().info("not JSon '"+ json_file +"'")
|
|
return TransitionCallbackReturn.FAILURE
|
|
str_param = Path(json_file).read_text()
|
|
try:
|
|
y = json.loads(str_param)
|
|
except json.JSONDecodeError as e:
|
|
self.get_logger().info(f"JSon error: {e}")
|
|
return TransitionCallbackReturn.FAILURE
|
|
if "mesh_path" not in y:
|
|
self.get_logger().info("JSon 'mesh_path' not set")
|
|
return TransitionCallbackReturn.FAILURE
|
|
mesh_path = y["mesh_path"]
|
|
if "gtpose" in y:
|
|
gtpose = y["gtpose"]
|
|
if "darknet_path" in y:
|
|
self.darknet_path = y["darknet_path"]
|
|
else:
|
|
mesh_path = str_param
|
|
|
|
if not os.path.isfile(mesh_path):
|
|
self.get_logger().info("Parameter 'mesh_path' not set")
|
|
return TransitionCallbackReturn.FAILURE
|
|
|
|
data = os.path.basename(mesh_path)
|
|
self.objName = os.path.splitext(data)[0]
|
|
self.objMeshFile = mesh_path
|
|
self.objPath = self.mytemppath / "examples"
|
|
self.objPath.mkdir(exist_ok=True)
|
|
self.objPath /= self.objName
|
|
self.objPath.mkdir(exist_ok=True)
|
|
tPath = self.objPath / "inputs"
|
|
tPath.mkdir(exist_ok=True)
|
|
tPath = self.objPath / "meshes"
|
|
tPath.mkdir(exist_ok=True)
|
|
tPath /= self.objName
|
|
tPath.mkdir(exist_ok=True)
|
|
shutil.copyfile(self.objMeshFile, str(tPath / (self.objName+".ply")))
|
|
|
|
# Create the subscribers.
|
|
self._sub_info = self.create_subscription(CameraInfo, self.topicCameraInfo, self.listener_camera_info, 2)
|
|
self._sub_depth = self.create_subscription(Image, self.topicDepth, self.listener_depth, 3)
|
|
|
|
# Create the publisher.
|
|
self._pub = self.create_lifecycle_publisher(Pose, self.topicSrv, 10)
|
|
self._timer = self.create_timer(self.publishDelay, self.publish)
|
|
|
|
if gtpose == None:
|
|
# Preload Megapose model
|
|
from megapose.scripts.run_inference_on_example import ModelPreload
|
|
self.megapose_model = ModelPreload(self.objPath,"megapose-1.0-RGB-multi-hypothesis")
|
|
else:
|
|
self._pose = [t3d.euler.euler2quat(gtpose[3], gtpose[4], gtpose[5]), gtpose[:3]]
|
|
|
|
self.get_logger().info("on_configure() is called.")
|
|
return TransitionCallbackReturn.SUCCESS
|
|
|
|
def on_activate(self, state: State) -> TransitionCallbackReturn:
|
|
# Log
|
|
self.get_logger().info('on_activate() is called.')
|
|
# Create the main subscriber.
|
|
self._sub = self.create_subscription(Image, self.topicImage, self.listener_callback, 3)
|
|
return super().on_activate(state)
|
|
|
|
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
|
|
# Log
|
|
self.get_logger().info('on_deactivate() is called.')
|
|
# Destroy the main subscriber.
|
|
self.destroy_subscription(self._sub)
|
|
return super().on_deactivate(state)
|
|
|
|
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
|
|
"""
|
|
Cleanup the node.
|
|
|
|
:return: The state machine either invokes a transition to the "unconfigured" state or stays
|
|
in "inactive" depending on the return value.
|
|
TransitionCallbackReturn.SUCCESS transitions to "unconfigured".
|
|
TransitionCallbackReturn.FAILURE transitions to "inactive".
|
|
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
|
|
"""
|
|
# очистим параметры
|
|
node_param = rclpy.parameter.Parameter("mesh_path", rclpy.Parameter.Type.STRING, "")
|
|
all_node_param = [node_param]
|
|
self.set_parameters(all_node_param)
|
|
|
|
self._is_camerainfo = False
|
|
|
|
self.destroy_timer(self._timer)
|
|
self.destroy_publisher(self._pub)
|
|
self.destroy_subscription(self._sub)
|
|
self.destroy_subscription(self._sub_info)
|
|
self.destroy_subscription(self._sub_depth)
|
|
|
|
self.get_logger().info('on_cleanup() is called.')
|
|
return TransitionCallbackReturn.SUCCESS
|
|
|
|
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
|
|
"""
|
|
Shutdown the node.
|
|
|
|
:return: The state machine either invokes a transition to the "finalized" state or stays
|
|
in the current state depending on the return value.
|
|
TransitionCallbackReturn.SUCCESS transitions to "unconfigured".
|
|
TransitionCallbackReturn.FAILURE transitions to "inactive".
|
|
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
|
|
"""
|
|
self.destroy_timer(self._timer)
|
|
self.destroy_publisher(self._pub)
|
|
self.destroy_subscription(self._sub)
|
|
self.destroy_subscription(self._sub_info)
|
|
self.destroy_subscription(self._sub_depth)
|
|
|
|
self.get_logger().info('on_shutdown() is called.')
|
|
return TransitionCallbackReturn.SUCCESS
|
|
|
|
def listener_camera_info(self, data):
|
|
"""
|
|
CameraInfo callback function.
|
|
"""
|
|
if self._is_camerainfo: # повторно инфо камеры не читаем
|
|
return
|
|
|
|
self._res = [data.height, data.width]
|
|
k_ = data.k
|
|
# self._K = [
|
|
# [k_[0], k_[1], k_[2]],
|
|
# [k_[3], k_[4], k_[5]],
|
|
# [k_[6], k_[7], k_[8]]
|
|
# ]
|
|
self._K = [
|
|
[k_[0]*2.0, k_[1], data.width / 2.0], # k_[2]], #
|
|
[k_[3], k_[4]*2.0, data.height / 2.0], # k_[5]], #
|
|
[k_[6], k_[7], k_[8]] #self.mesh_scale]
|
|
]
|
|
|
|
# *** for test
|
|
# tPath = self.objPath / "inputs"
|
|
# output_fn = tPath / "object_data.json"
|
|
# output_json_dict = {
|
|
# "label": self.objName,
|
|
# "bbox_modal": [168,173,260,252] # 0.2 0 0 -> [288,170,392,253]
|
|
# }
|
|
# data = []
|
|
# data.append(output_json_dict)
|
|
# output_fn.write_text(json.dumps(data))
|
|
# ***
|
|
|
|
#{"K": [[25.0, 0.0, 8.65], [0.0, 25.0, 6.5], [0.0, 0.0, 1.0]], "resolution": [480, 640]}
|
|
output_fn = self.objPath / "camera_data.json"
|
|
output_json_dict = {
|
|
"K": self._K,
|
|
"resolution": self._res
|
|
}
|
|
data = []
|
|
data.append(output_json_dict)
|
|
output_fn.write_text(json.dumps(output_json_dict))
|
|
|
|
# установим признак получения инфо камеры
|
|
self._is_camerainfo = True
|
|
|
|
def listener_depth(self, data):
|
|
"""
|
|
Depth image callback function.
|
|
"""
|
|
# Convert ROS Image message to OpenCV image
|
|
current_frame = self.br.imgmsg_to_cv2(data)
|
|
|
|
# Save depth image for Megapose
|
|
cv2.imwrite(str(self.objPath / "image_depth.png"), current_frame)
|
|
|
|
def load_result(self, example_dir: Path, json_name = "object_data.json"):
|
|
f = example_dir / "outputs" / json_name
|
|
if os.path.isfile(f):
|
|
data = f.read_text()
|
|
else:
|
|
data = "No result file: '" + str(f) + "'"
|
|
return data
|
|
|
|
def rel2bbox(self, rel_coord):
|
|
bb_w = rel_coord["width"]
|
|
bb_h = rel_coord["height"]
|
|
x = int((rel_coord["center_x"] - bb_w/2.) * self._res[1])
|
|
y = int((rel_coord["center_y"] - bb_h/2.) * self._res[0])
|
|
w = int(bb_w * self._res[1])
|
|
h = int(bb_h * self._res[0])
|
|
return [x,y,w,h]
|
|
|
|
def yolo2megapose(self, res_json: str, path_to: Path) -> bool:
|
|
str_param = Path(res_json).read_text()
|
|
y = json.loads(str_param)[0]
|
|
conf = 0.5 # threshold of detection
|
|
found_coord = None
|
|
for detections in y["objects"]:
|
|
if detections["name"] == self.objName:
|
|
c_conf = detections["confidence"]
|
|
if c_conf > conf:
|
|
conf = c_conf
|
|
found_coord = detections["relative_coordinates"]
|
|
|
|
if found_coord:
|
|
bbox = self.rel2bbox(found_coord)
|
|
else:
|
|
bbox = [2, 2, self._res[1]-4, self._res[0]-4]
|
|
|
|
#tPath = path_to / "inputs"
|
|
output_fn = path_to / "inputs/object_data.json"
|
|
output_json_dict = {
|
|
"label": self.objName,
|
|
"bbox_modal": bbox
|
|
}
|
|
data = []
|
|
data.append(output_json_dict)
|
|
output_fn.write_text(json.dumps(data))
|
|
return bool(found_coord)
|
|
|
|
def listener_callback(self, data):
|
|
"""
|
|
Image Callback function.
|
|
"""
|
|
if not self._is_camerainfo:
|
|
self.get_logger().warning("No data from CameraInfo")
|
|
return
|
|
# get camera pose
|
|
camera_name = self.camera_link #self.topicImage.split('/')[1]
|
|
try:
|
|
t = self.tf_buffer.lookup_transform("world", camera_name, rclpy.time.Time())
|
|
except TransformException as ex:
|
|
self.get_logger().info(f'Could not transform {camera_name} to world: {ex}')
|
|
return
|
|
self.camera_pose.position.x = t.transform.translation.x
|
|
self.camera_pose.position.y = t.transform.translation.y
|
|
self.camera_pose.position.z = t.transform.translation.z
|
|
self.camera_pose.orientation.w = t.transform.rotation.w
|
|
self.camera_pose.orientation.x = t.transform.rotation.x
|
|
self.camera_pose.orientation.y = t.transform.rotation.y
|
|
self.camera_pose.orientation.z = t.transform.rotation.z
|
|
|
|
# self._pose[0] = [t.transform.rotation.w, t.transform.rotation.x, t.transform.rotation.y, t.transform.rotation.z]
|
|
# self._pose[1] = [t.transform.translation.x, t.transform.translation.y, t.transform.translation.z]
|
|
|
|
# Convert ROS Image message to OpenCV image
|
|
current_frame = self.br.imgmsg_to_cv2(data)
|
|
|
|
# Save image for Megapose
|
|
frame_im = str(self.objPath / "image_rgb.png")
|
|
cv2.imwrite(frame_im, current_frame)
|
|
self._image_cnt += 1
|
|
|
|
detected = False
|
|
darknet_bin = os.path.join(self.darknet_path, "darknet")
|
|
if os.path.isfile(darknet_bin):
|
|
# object detection (YoloV4 - darknet)
|
|
self.get_logger().info(f"darknet: begin {self._image_cnt}")
|
|
result_json = str(self.objPath / "res.json")
|
|
|
|
subprocess.run([darknet_bin, "detector", "test",
|
|
"yolov4_objs2.data",
|
|
"yolov4_objs2.cfg",
|
|
"yolov4.weights",
|
|
"-dont_show", "-ext_output", # -thresh 0.5
|
|
"-out", result_json, frame_im],
|
|
cwd = self.darknet_path #, stdout = subprocess.PIPE
|
|
)
|
|
detected = self.yolo2megapose(result_json, self.objPath)
|
|
self.get_logger().info(f"darknet: end {self._image_cnt}")
|
|
|
|
if detected and self.megapose_model:
|
|
# 6D pose estimation
|
|
self.get_logger().info(f"megapose: begin {self._image_cnt} {self.objPath}")
|
|
from megapose.scripts.run_inference_on_example import run_inference_rbs
|
|
run_inference_rbs(self.megapose_model)
|
|
|
|
data = self.load_result(self.objPath)
|
|
if data[0] == "[":
|
|
y = json.loads(data)[0]
|
|
self._pose = y["TWO"]
|
|
self.get_logger().info(f"megapose: end {self._image_cnt}")
|
|
|
|
def main():
|
|
rclpy.init()
|
|
|
|
executor = rclpy.executors.SingleThreadedExecutor()
|
|
lc_node = PoseEstimator("lc_pose_estimator")
|
|
executor.add_node(lc_node)
|
|
try:
|
|
executor.spin()
|
|
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
|
|
lc_node.destroy_node()
|
|
|
|
if __name__ == '__main__':
|
|
main() |