From b8dad502a1f8139c329ad19edefb52105f15ef95 Mon Sep 17 00:00:00 2001 From: shalenikol Date: Tue, 15 Oct 2024 11:14:55 +0300 Subject: [PATCH] add rbss_poseestimation --- rbss_poseestimation/CMakeLists.txt | 52 + .../config/pe_dope_config.yaml | 24 + .../launch/pose_estimation.launch.py | 17 + rbss_poseestimation/package.xml | 27 + rbss_poseestimation/rbs_package.json | 75 ++ .../rbss_poseestimation/__init__.py | 0 rbss_poseestimation/scripts/cuboid.py | 126 +++ .../scripts/cuboid_pnp_solver.py | 146 +++ rbss_poseestimation/scripts/detector.py | 901 ++++++++++++++++++ rbss_poseestimation/scripts/models.py | 196 ++++ rbss_poseestimation/scripts/pe_dope_lc.py | 366 +++++++ 11 files changed, 1930 insertions(+) create mode 100644 rbss_poseestimation/CMakeLists.txt create mode 100644 rbss_poseestimation/config/pe_dope_config.yaml create mode 100644 rbss_poseestimation/launch/pose_estimation.launch.py create mode 100644 rbss_poseestimation/package.xml create mode 100644 rbss_poseestimation/rbs_package.json create mode 100644 rbss_poseestimation/rbss_poseestimation/__init__.py create mode 100755 rbss_poseestimation/scripts/cuboid.py create mode 100755 rbss_poseestimation/scripts/cuboid_pnp_solver.py create mode 100755 rbss_poseestimation/scripts/detector.py create mode 100755 rbss_poseestimation/scripts/models.py create mode 100755 rbss_poseestimation/scripts/pe_dope_lc.py diff --git a/rbss_poseestimation/CMakeLists.txt b/rbss_poseestimation/CMakeLists.txt new file mode 100644 index 0000000..d5f35b6 --- /dev/null +++ b/rbss_poseestimation/CMakeLists.txt @@ -0,0 +1,52 @@ +cmake_minimum_required(VERSION 3.8) +project(rbss_poseestimation) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +find_package(ament_cmake_python REQUIRED) +find_package(rclpy REQUIRED) +# find_package(rclcpp REQUIRED) +find_package(image_transport REQUIRED) +find_package(cv_bridge REQUIRED) +find_package(sensor_msgs REQUIRED) +find_package(std_msgs REQUIRED) +find_package(geometry_msgs REQUIRED) +find_package(rbs_skill_interfaces REQUIRED) +find_package(PCL 1.12 REQUIRED) +find_package(pcl_conversions REQUIRED) +find_package(pcl_ros REQUIRED) +find_package(tf2_ros REQUIRED) +find_package(tf2_msgs REQUIRED) +find_package(visualization_msgs REQUIRED) + +# Install Python modules +ament_python_install_package(${PROJECT_NAME}) + +# Install Python executables +install(PROGRAMS + scripts/pe_dope_lc.py + DESTINATION lib/${PROJECT_NAME} +) + +install( + DIRECTORY config launch + DESTINATION share/${PROJECT_NAME} +) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + # the following line skips the linter which checks for copyrights + # comment the line when a copyright and license is added to all source files + set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # comment the line when this package is in a git repo and when + # a copyright and license is added to all source files + set(ament_cmake_cpplint_FOUND TRUE) + ament_lint_auto_find_test_dependencies() +endif() + +ament_package() diff --git a/rbss_poseestimation/config/pe_dope_config.yaml b/rbss_poseestimation/config/pe_dope_config.yaml new file mode 100644 index 0000000..439096a --- /dev/null +++ b/rbss_poseestimation/config/pe_dope_config.yaml @@ -0,0 +1,24 @@ +input_is_rectified: True # Whether the input image is rectified (strongly suggested!) +downscale_height: 480 # if the input image is larger than this, scale it down to this pixel height + +# # Cuboid dimension in cm x,y,z +# dimensions: { +# "model1": [13.7, 16.5, 20.2], +# "model2": [10.0, 10.0, 22.64], +# } + +# class_ids: { +# "model1": 1, +# "model2": 2, +# } + +# draw_colors: { +# "model1": [13, 255, 128], # green +# "model2": [255, 255, 255], # white +# } + +# Config params for DOPE +thresh_angle: 0.5 +thresh_map: 0.01 +sigma: 3 +thresh_points: 0.0 diff --git a/rbss_poseestimation/launch/pose_estimation.launch.py b/rbss_poseestimation/launch/pose_estimation.launch.py new file mode 100644 index 0000000..6175d16 --- /dev/null +++ b/rbss_poseestimation/launch/pose_estimation.launch.py @@ -0,0 +1,17 @@ +# from launch_ros.actions import Node +# from launch import LaunchDescription + +def generate_launch_description(): + print("The skill must be launched via interface node") + return [] + # declared_arguments = [] + + # rbss_pe = Node( + # package="rbss_poseestimation", + # executable="od_dope_lc.py", + # ) + + # nodes_to_start = [ + # rbss_pe, + # ] + # return LaunchDescription(declared_arguments + nodes_to_start) diff --git a/rbss_poseestimation/package.xml b/rbss_poseestimation/package.xml new file mode 100644 index 0000000..b661a9e --- /dev/null +++ b/rbss_poseestimation/package.xml @@ -0,0 +1,27 @@ + + + + rbss_poseestimation + 0.0.1 + The Pose Estimation skill + shalenikol + Apache License 2.0 + + ament_cmake + ament_cmake_python + + rclpy + + image_transport + cv_bridge + sensor_msgs + std_msgs + rbs_skill_interfaces + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/rbss_poseestimation/rbs_package.json b/rbss_poseestimation/rbs_package.json new file mode 100644 index 0000000..b6740a7 --- /dev/null +++ b/rbss_poseestimation/rbs_package.json @@ -0,0 +1,75 @@ +{ + "SkillPackage": { + "name": "Robossembler", + "version": "1.0", + "format": "1" + }, + "Module": { + "name": "PoseEstimation", + "description": "Pose Estimation skill with DOPE", + "node_name": "lc_dope" + }, + "Launch": { + "package": "rbss_poseestimation", + "executable": "pe_dope_lc.py" + }, + "BTAction": [ + { + "name": "peConfigure", + "type": "run", + "param": [ + { + "type": "weights", + "dependency": { + "object_name": "knight", + "weights_file": "/home/shalenikol/0_rbs/w_knight.pth", + "dimensions": [ + 0.03, + 0.026, + 0.065 + ] + } + } + ] + }, + { + "name": "peStop", + "type": "stop", + "param": [], + "result": [] + } + ], + "Interface": { + "Input": [ + { + "name": "cameraLink", + "type": "CAMERA" + }, + { + "name": "object_name", + "type": "MODEL" + } + ], + "Output": [ + { + "name": "object_detection", + "type": "BoundBox", + "description": "Topic for publisher" + } + ] + }, + "Settings": [ + { + "name": "cameraLink", + "value": "rgbd_camera" + }, + { + "name": "publishDelay", + "value": 0.5 + }, + { + "name": "is_image_mode", + "value": true + } + ] +} \ No newline at end of file diff --git a/rbss_poseestimation/rbss_poseestimation/__init__.py b/rbss_poseestimation/rbss_poseestimation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rbss_poseestimation/scripts/cuboid.py b/rbss_poseestimation/scripts/cuboid.py new file mode 100755 index 0000000..469df1f --- /dev/null +++ b/rbss_poseestimation/scripts/cuboid.py @@ -0,0 +1,126 @@ +# Copyright (c) 2018 NVIDIA Corporation. All rights reserved. +# This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. +# https://creativecommons.org/licenses/by-nc-sa/4.0/legalcode + +from enum import IntEnum, unique +import numpy as np +import cv2 + +# Related to the object's local coordinate system +# @unique +class CuboidVertexType(IntEnum): + FrontTopRight = 0 + FrontTopLeft = 1 + FrontBottomLeft = 2 + FrontBottomRight = 3 + RearTopRight = 4 + RearTopLeft = 5 + RearBottomLeft = 6 + RearBottomRight = 7 + Center = 8 + TotalCornerVertexCount = 8 # Corner vertexes doesn't include the center point + TotalVertexCount = 9 + + +# List of the vertex indexes in each line edges of the cuboid +CuboidLineIndexes = [ + # Front face + [CuboidVertexType.FrontTopLeft, CuboidVertexType.FrontTopRight], + [CuboidVertexType.FrontTopRight, CuboidVertexType.FrontBottomRight], + [CuboidVertexType.FrontBottomRight, CuboidVertexType.FrontBottomLeft], + [CuboidVertexType.FrontBottomLeft, CuboidVertexType.FrontTopLeft], + # Back face + [CuboidVertexType.RearTopLeft, CuboidVertexType.RearTopRight], + [CuboidVertexType.RearTopRight, CuboidVertexType.RearBottomRight], + [CuboidVertexType.RearBottomRight, CuboidVertexType.RearBottomLeft], + [CuboidVertexType.RearBottomLeft, CuboidVertexType.RearTopLeft], + # Left face + [CuboidVertexType.FrontBottomLeft, CuboidVertexType.RearBottomLeft], + [CuboidVertexType.FrontTopLeft, CuboidVertexType.RearTopLeft], + # Right face + [CuboidVertexType.FrontBottomRight, CuboidVertexType.RearBottomRight], + [CuboidVertexType.FrontTopRight, CuboidVertexType.RearTopRight], +] + + +# ========================= Cuboid3d ========================= +class Cuboid3d: + """This class contains a 3D cuboid.""" + + # Create a box with a certain size + def __init__( + self, + size3d=[1.0, 1.0, 1.0], + center_location=[0, 0, 0], + coord_system=None, + parent_object=None, + ): + + # NOTE: This local coordinate system is similar + # to the intrinsic transform matrix of a 3d object + self.center_location = center_location + self.coord_system = coord_system + self.size3d = size3d + self._vertices = [0, 0, 0] * CuboidVertexType.TotalVertexCount + + self.generate_vertexes() + + def get_vertex(self, vertex_type): + """Returns the location of a vertex. + + Args: + vertex_type: enum of type CuboidVertexType + + Returns: + Numpy array(3) - Location of the vertex type in the cuboid + """ + return self._vertices[vertex_type] + + def get_vertices(self): + return self._vertices + + def generate_vertexes(self): + width, height, depth = self.size3d + + # By default just use the normal OpenCV coordinate system + if self.coord_system is None: + cx, cy, cz = self.center_location + # X axis point to the right + right = cx + width / 2.0 + left = cx - width / 2.0 + # Y axis point downward + top = cy - height / 2.0 + bottom = cy + height / 2.0 + # Z axis point forward + front = cz + depth / 2.0 + rear = cz - depth / 2.0 + + # List of 8 vertices of the box + self._vertices = [ + [right, top, front], # Front Top Right + [left, top, front], # Front Top Left + [left, bottom, front], # Front Bottom Left + [right, bottom, front], # Front Bottom Right + [right, top, rear], # Rear Top Right + [left, top, rear], # Rear Top Left + [left, bottom, rear], # Rear Bottom Left + [right, bottom, rear], # Rear Bottom Right + self.center_location, # Center + ] + else: + sx, sy, sz = self.size3d + forward = np.array(self.coord_system.forward, dtype=float) * sy * 0.5 + up = np.array(self.coord_system.up, dtype=float) * sz * 0.5 + right = np.array(self.coord_system.right, dtype=float) * sx * 0.5 + center = np.array(self.center_location, dtype=float) + self._vertices = [ + center + forward + up + right, # Front Top Right + center + forward + up - right, # Front Top Left + center + forward - up - right, # Front Bottom Left + center + forward - up + right, # Front Bottom Right + center - forward + up + right, # Rear Top Right + center - forward + up - right, # Rear Top Left + center - forward - up - right, # Rear Bottom Left + center - forward - up + right, # Rear Bottom Right + self.center_location, # Center + ] diff --git a/rbss_poseestimation/scripts/cuboid_pnp_solver.py b/rbss_poseestimation/scripts/cuboid_pnp_solver.py new file mode 100755 index 0000000..bcb113f --- /dev/null +++ b/rbss_poseestimation/scripts/cuboid_pnp_solver.py @@ -0,0 +1,146 @@ +# Copyright (c) 2018 NVIDIA Corporation. All rights reserved. +# This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. +# https://creativecommons.org/licenses/by-nc-sa/4.0/legalcode + +import cv2 +import numpy as np +from cuboid import CuboidVertexType +from pyrr import Quaternion + + +class CuboidPNPSolver(object): + """ + This class is used to find the 6-DoF pose of a cuboid given its projected vertices. + + Runs perspective-n-point (PNP) algorithm. + """ + + # Class variables + cv2version = cv2.__version__.split(".") + cv2majorversion = int(cv2version[0]) + + def __init__( + self, + object_name="", + camera_intrinsic_matrix=None, + cuboid3d=None, + dist_coeffs=np.zeros((4, 1)), + ): + self.object_name = object_name + if not camera_intrinsic_matrix is None: + self._camera_intrinsic_matrix = camera_intrinsic_matrix + else: + self._camera_intrinsic_matrix = np.array([[0, 0, 0], [0, 0, 0], [0, 0, 0]]) + self._cuboid3d = cuboid3d + + self._dist_coeffs = dist_coeffs + + def set_camera_intrinsic_matrix(self, new_intrinsic_matrix): + """Sets the camera intrinsic matrix""" + self._camera_intrinsic_matrix = new_intrinsic_matrix + + def set_dist_coeffs(self, dist_coeffs): + """Sets the camera intrinsic matrix""" + self._dist_coeffs = dist_coeffs + + def solve_pnp(self, cuboid2d_points, pnp_algorithm=None): + """ + Detects the rotation and traslation + of a cuboid object from its vertexes' + 2D location in the image + """ + + # Fallback to default PNP algorithm base on OpenCV version + if pnp_algorithm is None: + if CuboidPNPSolver.cv2majorversion == 2: + pnp_algorithm = cv2.CV_ITERATIVE + elif CuboidPNPSolver.cv2majorversion == 3: + pnp_algorithm = cv2.SOLVEPNP_ITERATIVE + + if pnp_algorithm is None: + pnp_algorithm = cv2.SOLVEPNP_EPNP + + location = None + quaternion = None + projected_points = cuboid2d_points + + cuboid3d_points = np.array(self._cuboid3d.get_vertices()) + obj_2d_points = [] + obj_3d_points = [] + + for i in range(CuboidVertexType.TotalVertexCount): + check_point_2d = cuboid2d_points[i] + # Ignore invalid points + if check_point_2d is None: + continue + obj_2d_points.append(check_point_2d) + obj_3d_points.append(cuboid3d_points[i]) + + obj_2d_points = np.array(obj_2d_points, dtype=float) + obj_3d_points = np.array(obj_3d_points, dtype=float) + + valid_point_count = len(obj_2d_points) + + # Can only do PNP if we have more than 3 valid points + is_points_valid = valid_point_count >= 4 + + if is_points_valid: + + ret, rvec, tvec = cv2.solvePnP( + obj_3d_points, + obj_2d_points, + self._camera_intrinsic_matrix, + self._dist_coeffs, + flags=pnp_algorithm, + ) + + if ret: + location = list(x[0] for x in tvec) + quaternion = self.convert_rvec_to_quaternion(rvec) + + projected_points, _ = cv2.projectPoints( + cuboid3d_points, + rvec, + tvec, + self._camera_intrinsic_matrix, + self._dist_coeffs, + ) + projected_points = np.squeeze(projected_points) + + # If the location.Z is negative or object is behind the camera then flip both location and rotation + x, y, z = location + if z < 0: + # Get the opposite location + location = [-x, -y, -z] + + # Change the rotation by 180 degree + rotate_angle = np.pi + rotate_quaternion = Quaternion.from_axis_rotation( + location, rotate_angle + ) + quaternion = rotate_quaternion.cross(quaternion) + + return location, quaternion, projected_points + + def convert_rvec_to_quaternion(self, rvec): + """Convert rvec (which is log quaternion) to quaternion""" + theta = np.sqrt( + rvec[0] * rvec[0] + rvec[1] * rvec[1] + rvec[2] * rvec[2] + ) # in radians + raxis = [rvec[0] / theta, rvec[1] / theta, rvec[2] / theta] + + # pyrr's Quaternion (order is XYZW), https://pyrr.readthedocs.io/en/latest/oo_api_quaternion.html + return Quaternion.from_axis_rotation(raxis, theta) + + def project_points(self, rvec, tvec): + """Project points from model onto image using rotation, translation""" + output_points, tmp = cv2.projectPoints( + self.__object_vertex_coordinates, + rvec, + tvec, + self.__camera_intrinsic_matrix, + self.__dist_coeffs, + ) + + output_points = np.squeeze(output_points) + return output_points diff --git a/rbss_poseestimation/scripts/detector.py b/rbss_poseestimation/scripts/detector.py new file mode 100755 index 0000000..b26a7f3 --- /dev/null +++ b/rbss_poseestimation/scripts/detector.py @@ -0,0 +1,901 @@ +# Copyright (c) 2018 NVIDIA Corporation. All rights reserved. +# This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. +# https://creativecommons.org/licenses/by-nc-sa/4.0/legalcode + +""" +Contains the following classes: + - ModelData - High level information encapsulation + - ObjectDetector - Greedy algorithm to build cuboids from belief maps +""" +# 14.06.2024 @shalenikol find_object_poses: remove "cuboid2d" + +import time + +import sys +from os import path + +import numpy as np + +import torch +import torch.nn as nn +import torchvision.transforms as transforms +from torch.autograd import Variable +import torchvision.models as models + +from scipy.ndimage.filters import gaussian_filter +from scipy import optimize + +import sys + +sys.path.append("../") +from models import * + +# Import the definition of the neural network model and cuboids +from cuboid_pnp_solver import * + +# global transform for image input +transform = transforms.Compose( + [ + # transforms.Scale(IMAGE_SIZE), + # transforms.CenterCrop((imagesize,imagesize)), + transforms.ToTensor(), + # transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), + transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), + ] +) + + +# ================================ Models ================================ + + +class DopeNetwork(nn.Module): + def __init__( + self, + numBeliefMap=9, + numAffinity=16, + stop_at_stage=6, # number of stages to process (if less than total number of stages) + ): + super(DopeNetwork, self).__init__() + + self.stop_at_stage = stop_at_stage + + vgg_full = models.vgg19(pretrained=False).features + self.vgg = nn.Sequential() + for i_layer in range(24): + self.vgg.add_module(str(i_layer), vgg_full[i_layer]) + + # Add some layers + i_layer = 23 + self.vgg.add_module( + str(i_layer), nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1) + ) + self.vgg.add_module(str(i_layer + 1), nn.ReLU(inplace=True)) + self.vgg.add_module( + str(i_layer + 2), nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1) + ) + self.vgg.add_module(str(i_layer + 3), nn.ReLU(inplace=True)) + + # print('---Belief------------------------------------------------') + # _2 are the belief map stages + self.m1_2 = DopeNetwork.create_stage(128, numBeliefMap, True) + self.m2_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + self.m3_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + self.m4_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + self.m5_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + self.m6_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + + # print('---Affinity----------------------------------------------') + # _1 are the affinity map stages + self.m1_1 = DopeNetwork.create_stage(128, numAffinity, True) + self.m2_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + self.m3_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + self.m4_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + self.m5_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + self.m6_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + + def forward(self, x): + """Runs inference on the neural network""" + + out1 = self.vgg(x) + + out1_2 = self.m1_2(out1) + out1_1 = self.m1_1(out1) + + if self.stop_at_stage == 1: + return [out1_2], [out1_1] + + out2 = torch.cat([out1_2, out1_1, out1], 1) + out2_2 = self.m2_2(out2) + out2_1 = self.m2_1(out2) + + if self.stop_at_stage == 2: + return [out1_2, out2_2], [out1_1, out2_1] + + out3 = torch.cat([out2_2, out2_1, out1], 1) + out3_2 = self.m3_2(out3) + out3_1 = self.m3_1(out3) + + if self.stop_at_stage == 3: + return [out1_2, out2_2, out3_2], [out1_1, out2_1, out3_1] + + out4 = torch.cat([out3_2, out3_1, out1], 1) + out4_2 = self.m4_2(out4) + out4_1 = self.m4_1(out4) + + if self.stop_at_stage == 4: + return [out1_2, out2_2, out3_2, out4_2], [out1_1, out2_1, out3_1, out4_1] + + out5 = torch.cat([out4_2, out4_1, out1], 1) + out5_2 = self.m5_2(out5) + out5_1 = self.m5_1(out5) + + if self.stop_at_stage == 5: + return [out1_2, out2_2, out3_2, out4_2, out5_2], [ + out1_1, + out2_1, + out3_1, + out4_1, + out5_1, + ] + + out6 = torch.cat([out5_2, out5_1, out1], 1) + out6_2 = self.m6_2(out6) + out6_1 = self.m6_1(out6) + + return [out1_2, out2_2, out3_2, out4_2, out5_2, out6_2], [ + out1_1, + out2_1, + out3_1, + out4_1, + out5_1, + out6_1, + ] + + @staticmethod + def create_stage(in_channels, out_channels, first=False): + """Create the neural network layers for a single stage.""" + + model = nn.Sequential() + mid_channels = 128 + if first: + padding = 1 + kernel = 3 + count = 6 + final_channels = 512 + else: + padding = 3 + kernel = 7 + count = 10 + final_channels = mid_channels + + # First convolution + model.add_module( + "0", + nn.Conv2d( + in_channels, mid_channels, kernel_size=kernel, stride=1, padding=padding + ), + ) + + # Middle convolutions + i = 1 + while i < count - 1: + model.add_module(str(i), nn.ReLU(inplace=True)) + i += 1 + model.add_module( + str(i), + nn.Conv2d( + mid_channels, + mid_channels, + kernel_size=kernel, + stride=1, + padding=padding, + ), + ) + i += 1 + + # Penultimate convolution + model.add_module(str(i), nn.ReLU(inplace=True)) + i += 1 + model.add_module( + str(i), nn.Conv2d(mid_channels, final_channels, kernel_size=1, stride=1) + ) + i += 1 + + # Last convolution + model.add_module(str(i), nn.ReLU(inplace=True)) + i += 1 + model.add_module( + str(i), nn.Conv2d(final_channels, out_channels, kernel_size=1, stride=1) + ) + i += 1 + + return model + + +class ModelData(object): + """This class contains methods for loading the neural network""" + + def __init__(self, name="", net_path="", gpu_id=0, architecture="dope"): + self.name = name + self.net_path = net_path # Path to trained network model + self.net = None # Trained network + self.gpu_id = gpu_id + self.architecture = architecture + + def get_net(self): + """Returns network""" + if not self.net: + self.load_net_model() + return self.net + + def load_net_model(self): + """Loads network model from disk""" + if not self.net and path.exists(self.net_path): + self.net = self.load_net_model_path(self.net_path) + if not path.exists(self.net_path): + print("ERROR: Unable to find model weights: '{}'".format(self.net_path)) + exit(0) + + def load_net_model_path(self, path): + """Loads network model from disk with given path""" + model_loading_start_time = time.time() + print("Loading DOPE model '{}'...".format(path)) + net = DopeNetwork() + + net = torch.nn.DataParallel(net, [0]).cuda() + net.load_state_dict(torch.load(path)) + net.eval() + print( + " Model loaded in {:.2f} seconds.".format( + time.time() - model_loading_start_time + ) + ) + return net + + def __str__(self): + """Converts to string""" + return "{}: {}".format(self.name, self.net_path) + + +# ================================ ObjectDetector ================================ +class ObjectDetector(object): + """This class contains methods for object detection""" + + @staticmethod + def gaussian(height, center_x, center_y, width_x, width_y): + """Returns a gaussian function with the given parameters""" + width_x = float(width_x) + width_y = float(width_y) + return lambda x, y: height * np.exp( + -(((center_x - x) / width_x) ** 2 + ((center_y - y) / width_y) ** 2) / 2 + ) + + @staticmethod + def moments(data): + """Returns (height, x, y, width_x, width_y) + the gaussian parameters of a 2D distribution by calculating its + moments""" + total = data.sum() + X, Y = np.indices(data.shape) + x = (X * data).sum() / total + y = (Y * data).sum() / total + col = data[:, int(y)] + width_x = np.sqrt( + np.abs((np.arange(col.size) - y) ** 2 * col).sum() / col.sum() + ) + row = data[int(x), :] + width_y = np.sqrt( + np.abs((np.arange(row.size) - x) ** 2 * row).sum() / row.sum() + ) + height = data.max() + return height, x, y, width_x, width_y + + @staticmethod + def fitgaussian(data): + """Returns (height, x, y, width_x, width_y) + the gaussian parameters of a 2D distribution found by a fit""" + params = ObjectDetector.moments(data) + errorfunction = lambda p: np.ravel( + ObjectDetector.gaussian(*p)(*np.indices(data.shape)) - data + ) + p, success = optimize.leastsq(errorfunction, params) + return p + + @staticmethod + def make_grid( + tensor, + nrow=8, + padding=2, + normalize=False, + range_=None, + scale_each=False, + pad_value=0, + ): + """Make a grid of images. + Args: + tensor (Tensor or list): 4D mini-batch Tensor of shape (B x C x H x W) + or a list of images all of the same size. + nrow (int, optional): Number of images displayed in each row of the grid. + The Final grid size is (B / nrow, nrow). Default is 8. + padding (int, optional): amount of padding. Default is 2. + normalize (bool, optional): If True, shift the image to the range (0, 1), + by subtracting the minimum and dividing by the maximum pixel value. + range (tuple, optional): tuple (min, max) where min and max are numbers, + then these numbers are used to normalize the image. By default, min and max + are computed from the tensor. + scale_each (bool, optional): If True, scale each image in the batch of + images separately rather than the (min, max) over all images. + pad_value (float, optional): Value for the padded pixels. + Example: + See this notebook `here `_ + """ + import math + + if not ( + torch.is_tensor(tensor) + or (isinstance(tensor, list) and all(torch.is_tensor(t) for t in tensor)) + ): + raise TypeError( + "tensor or list of tensors expected, got {}".format(type(tensor)) + ) + + # if list of tensors, convert to a 4D mini-batch Tensor + if isinstance(tensor, list): + tensor = torch.stack(tensor, dim=0) + + if tensor.dim() == 2: # single image H x W + tensor = tensor.view(1, tensor.size(0), tensor.size(1)) + if tensor.dim() == 3: # single image + if tensor.size(0) == 1: # if single-channel, convert to 3-channel + tensor = torch.cat((tensor, tensor, tensor), 0) + tensor = tensor.view(1, tensor.size(0), tensor.size(1), tensor.size(2)) + + if tensor.dim() == 4 and tensor.size(1) == 1: # single-channel images + tensor = torch.cat((tensor, tensor, tensor), 1) + + if normalize is True: + tensor = tensor.clone() # avoid modifying tensor in-place + if range_ is not None: + assert isinstance( + range_, tuple + ), "range has to be a tuple (min, max) if specified. min and max are numbers" + + def norm_ip(img, min, max): + img.clamp_(min=min, max=max) + img.add_(-min).div_(max - min + 1e-5) + + def norm_range(t, range_): + if range_ is not None: + norm_ip(t, range_[0], range_[1]) + else: + norm_ip(t, float(t.min()), float(t.max())) + + if scale_each is True: + for t in tensor: # loop over mini-batch dimension + norm_range(t, range) + else: + norm_range(tensor, range) + + if tensor.size(0) == 1: + return tensor.squeeze() + + # make the mini-batch of images into a grid + nmaps = tensor.size(0) + xmaps = min(nrow, nmaps) + ymaps = int(math.ceil(float(nmaps) / xmaps)) + height, width = int(tensor.size(2) + padding), int(tensor.size(3) + padding) + grid = tensor.new(3, height * ymaps + padding, width * xmaps + padding).fill_( + pad_value + ) + k = 0 + for y in range(ymaps): + for x in range(xmaps): + if k >= nmaps: + break + grid.narrow(1, y * height + padding, height - padding).narrow( + 2, x * width + padding, width - padding + ).copy_(tensor[k]) + k = k + 1 + return grid + + @staticmethod + def get_image_grid(tensor, filename, nrow=3, padding=2, mean=None, std=None): + """ + Saves a given Tensor into an image file. + If given a mini-batch tensor, will save the tensor as a grid of images. + """ + from PIL import Image + + # tensor = tensor.cpu() + grid = ObjectDetector.make_grid(tensor, nrow=nrow, padding=10, pad_value=1) + if not mean is None: + # ndarr = grid.mul(std).add(mean).mul(255).byte().transpose(0,2).transpose(0,1).numpy() + ndarr = ( + grid.mul(std) + .add(mean) + .mul(255) + .byte() + .transpose(0, 2) + .transpose(0, 1) + .numpy() + ) + else: + ndarr = ( + grid.mul(0.5) + .add(0.5) + .mul(255) + .byte() + .transpose(0, 2) + .transpose(0, 1) + .numpy() + ) + im = Image.fromarray(ndarr) + # im.save(filename) + return im + + @staticmethod + def detect_object_in_image( + net_model, pnp_solver, in_img, config, grid_belief_debug=False, norm_belief=True + ): + """Detect objects in a image using a specific trained network model + Returns the poses of the objects and the belief maps + """ + + if in_img is None: + return [] + + # print("detect_object_in_image - image shape: {}".format(in_img.shape)) + + # Run network inference + image_tensor = transform(in_img) + image_torch = Variable(image_tensor).cuda().unsqueeze(0) + out, seg = net_model( + image_torch + ) # run inference using the network (calls 'forward' method) + vertex2 = out[-1][0] + aff = seg[-1][0] + + # Find objects from network output + detected_objects = ObjectDetector.find_object_poses( + vertex2, aff, pnp_solver, config + ) + + if not grid_belief_debug: + + return detected_objects, None + else: + # Run the belief maps debug display on the beliefmaps + + upsampling = nn.UpsamplingNearest2d(scale_factor=8) + tensor = vertex2 + belief_imgs = [] + in_img = torch.tensor(in_img).float() / 255.0 + in_img *= 0.7 + + for j in range(tensor.size()[0]): + belief = tensor[j].clone() + if norm_belief: + belief -= float(torch.min(belief)[0].data.cpu().numpy()) + belief /= float(torch.max(belief)[0].data.cpu().numpy()) + + belief = ( + upsampling(belief.unsqueeze(0).unsqueeze(0)) + .squeeze() + .squeeze() + .data + ) + belief = torch.clamp(belief, 0, 1).cpu() + belief = torch.cat( + [ + belief.unsqueeze(0) + in_img[:, :, 0], + belief.unsqueeze(0) + in_img[:, :, 1], + belief.unsqueeze(0) + in_img[:, :, 2], + ] + ).unsqueeze(0) + belief = torch.clamp(belief, 0, 1) + + # belief_imgs.append(belief.data.squeeze().cpu().numpy().transpose(1,2,0)) + belief_imgs.append(belief.data.squeeze().numpy()) + + # Create the image grid + belief_imgs = torch.tensor(np.array(belief_imgs)) + + im_belief = ObjectDetector.get_image_grid(belief_imgs, None, mean=0, std=1) + + return detected_objects, im_belief + + @staticmethod + def find_object_poses( + vertex2, + aff, + pnp_solver, + config, + run_sampling=False, + num_sample=100, + scale_factor=8, + ): + """Detect objects given network output""" + + # run_sampling = True + + # Detect objects from belief maps and affinities + objects, all_peaks = ObjectDetector.find_objects( + vertex2, + aff, + config, + run_sampling=run_sampling, + num_sample=num_sample, + scale_factor=scale_factor, + ) + detected_objects = [] + obj_name = pnp_solver.object_name + + print(all_peaks) + + # print("find_object_poses: found {} objects ================".format(len(objects))) + for obj in objects: + # Run PNP + points = obj[1] + [(obj[0][0] * scale_factor, obj[0][1] * scale_factor)] + # print(points) + # cuboid2d = np.copy(points) + location, quaternion, projected_points = pnp_solver.solve_pnp(points) + + # run multiple sample + if run_sampling: + lx, ly, lz = [], [], [] + qx, qy, qz, qw = [], [], [], [] + + for i_sample in range(num_sample): + sample = [] + for i_point in range(len(obj[-1])): + if not obj[-1][i_point][i_sample] is None: + sample.append( + ( + obj[-1][i_point][i_sample][0] * scale_factor, + obj[-1][i_point][i_sample][1] * scale_factor, + ) + ) + else: + sample.append(None) + # final_cuboids.append(sample) + pnp_sample = pnp_solver.solve_pnp(sample) + + try: + lx.append(pnp_sample[0][0]) + ly.append(pnp_sample[0][1]) + lz.append(pnp_sample[0][2]) + + qx.append(pnp_sample[1][0]) + qy.append(pnp_sample[1][1]) + qz.append(pnp_sample[1][2]) + qw.append(pnp_sample[1][3]) + except: + pass + # TODO + # RUN quaternion as well for the std and avg. + + try: + print("----") + print("location:") + print(location[0], location[1], location[2]) + print(np.mean(lx), np.mean(ly), np.mean(lz)) + print(np.std(lx), np.std(ly), np.std(lz)) + print("quaternion:") + print(quaternion[0], quaternion[1], quaternion[2], quaternion[3]) + print(np.mean(qx), np.mean(qy), np.mean(qz), np.mean(qw)) + print(np.std(qx), np.std(qy), np.std(qz), np.std(qw)) + + except: + pass + if not location is None: + detected_objects.append( + { + "name": obj_name, + "location": location, + "quaternion": quaternion, + # "cuboid2d": cuboid2d, + "projected_points": projected_points, + "confidence": obj[-1], + "raw_points": points, + } + ) + + # print("find_object_poses: points = ", type(points), points) + # print("find_object_poses: locn = ", location, "quat =", quaternion) + # print("find_object_poses: projected_points = ", type(projected_points), projected_points) + + return detected_objects + + @staticmethod + def find_objects( + vertex2, + aff, + config, + numvertex=8, + run_sampling=False, + num_sample=100, + scale_factor=8, + ): + """Detects objects given network belief maps and affinities, using heuristic method""" + + all_peaks = [] + all_samples = [] + + peak_counter = 0 + for j in range(vertex2.size()[0]): + belief = vertex2[j].clone() + map_ori = belief.cpu().data.numpy() + + map = gaussian_filter(belief.cpu().data.numpy(), sigma=config.sigma) + p = 1 + map_left = np.zeros(map.shape) + map_left[p:, :] = map[:-p, :] + map_right = np.zeros(map.shape) + map_right[:-p, :] = map[p:, :] + map_up = np.zeros(map.shape) + map_up[:, p:] = map[:, :-p] + map_down = np.zeros(map.shape) + map_down[:, :-p] = map[:, p:] + + peaks_binary = np.logical_and.reduce( + ( + map >= map_left, + map >= map_right, + map >= map_up, + map >= map_down, + map > config.thresh_map, + ) + ) + peaks = zip(np.nonzero(peaks_binary)[1], np.nonzero(peaks_binary)[0]) + + # Computing the weigthed average for localizing the peaks + peaks = list(peaks) + win = 11 + ran = win // 2 + peaks_avg = [] + point_sample_list = [] + for p_value in range(len(peaks)): + p = peaks[p_value] + weights = np.zeros((win, win)) + i_values = np.zeros((win, win)) + j_values = np.zeros((win, win)) + for i in range(-ran, ran + 1): + for j in range(-ran, ran + 1): + if ( + p[1] + i < 0 + or p[1] + i >= map_ori.shape[0] + or p[0] + j < 0 + or p[0] + j >= map_ori.shape[1] + ): + continue + + i_values[j + ran, i + ran] = p[1] + i + j_values[j + ran, i + ran] = p[0] + j + + weights[j + ran, i + ran] = map_ori[p[1] + i, p[0] + j] + # if the weights are all zeros + # then add the none continuous points + OFFSET_DUE_TO_UPSAMPLING = 0.4395 + + # Sample the points using the gaussian + if run_sampling: + data = weights + params = ObjectDetector.fitgaussian(data) + fit = ObjectDetector.gaussian(*params) + _, mu_x, mu_y, std_x, std_y = params + points_sample = np.random.multivariate_normal( + np.array( + [ + p[1] + mu_x + OFFSET_DUE_TO_UPSAMPLING, + p[0] - mu_y + OFFSET_DUE_TO_UPSAMPLING, + ] + ), + # np.array([[std_x*std_x,0],[0,std_y*std_y]]), size=num_sample) + np.array([[std_x, 0], [0, std_y]]), + size=num_sample, + ) + point_sample_list.append(points_sample) + + try: + peaks_avg.append( + ( + np.average(j_values, weights=weights) + + OFFSET_DUE_TO_UPSAMPLING, + np.average(i_values, weights=weights) + + OFFSET_DUE_TO_UPSAMPLING, + ) + ) + except: + peaks_avg.append( + ( + p[0] + OFFSET_DUE_TO_UPSAMPLING, + p[1] + OFFSET_DUE_TO_UPSAMPLING, + ) + ) + + # Note: Python3 doesn't support len for zip object + peaks_len = min( + len(np.nonzero(peaks_binary)[1]), len(np.nonzero(peaks_binary)[0]) + ) + + peaks_with_score = [ + peaks_avg[x_] + (map_ori[peaks[x_][1], peaks[x_][0]],) + for x_ in range(len(peaks)) + ] + + id = range(peak_counter, peak_counter + peaks_len) + + peaks_with_score_and_id = [ + peaks_with_score[i] + (id[i],) for i in range(len(id)) + ] + + all_peaks.append(peaks_with_score_and_id) + all_samples.append(point_sample_list) + peak_counter += peaks_len + + objects = [] + + if aff is None: + # Assume there is only one object + points = [None for i in range(numvertex)] + for i_peak, peaks in enumerate(all_peaks): + # print (peaks) + for peak in peaks: + if peak[2] > config.threshold: + points[i_peak] = (peak[0], peak[1]) + + return points + + # Check object centroid and build the objects if the centroid is found + for nb_object in range(len(all_peaks[-1])): + if all_peaks[-1][nb_object][2] > config.thresh_points: + objects.append( + [ + [ + all_peaks[-1][nb_object][:2][0], + all_peaks[-1][nb_object][:2][1], + ], + [None for i in range(numvertex)], + [None for i in range(numvertex)], + all_peaks[-1][nb_object][2], + [ + [None for j in range(num_sample)] + for i in range(numvertex + 1) + ], + ] + ) + + # Check if the object was added before + if run_sampling and nb_object < len(objects): + # add the samples to the object centroids + objects[nb_object][4][-1] = all_samples[-1][nb_object] + + # Working with an output that only has belief maps + if aff is None: + if len(objects) > 0 and len(all_peaks) > 0 and len(all_peaks[0]) > 0: + for i_points in range(8): + if ( + len(all_peaks[i_points]) > 0 + and all_peaks[i_points][0][2] > config.threshold + ): + objects[0][1][i_points] = ( + all_peaks[i_points][0][0], + all_peaks[i_points][0][1], + ) + else: + # For all points found + for i_lists in range(len(all_peaks[:-1])): + lists = all_peaks[i_lists] + + # Candidate refers to point that needs to be match with a centroid object + for i_candidate, candidate in enumerate(lists): + if candidate[2] < config.thresh_points: + continue + + i_best = -1 + best_dist = 10000 + best_angle = 100 + + # Find the points that links to that centroid. + for i_obj in range(len(objects)): + center = [objects[i_obj][0][0], objects[i_obj][0][1]] + + # integer is used to look into the affinity map, + # but the float version is used to run + point_int = [int(candidate[0]), int(candidate[1])] + point = [candidate[0], candidate[1]] + + # look at the distance to the vector field. + v_aff = ( + np.array( + [ + aff[ + i_lists * 2, point_int[1], point_int[0] + ].data.item(), + aff[ + i_lists * 2 + 1, point_int[1], point_int[0] + ].data.item(), + ] + ) + * 10 + ) + + # normalize the vector + xvec = v_aff[0] + yvec = v_aff[1] + + norms = np.sqrt(xvec * xvec + yvec * yvec) + + xvec /= norms + yvec /= norms + + v_aff = np.concatenate([[xvec], [yvec]]) + + v_center = np.array(center) - np.array(point) + xvec = v_center[0] + yvec = v_center[1] + + norms = np.sqrt(xvec * xvec + yvec * yvec) + + xvec /= norms + yvec /= norms + + v_center = np.concatenate([[xvec], [yvec]]) + + # vector affinity + dist_angle = np.linalg.norm(v_center - v_aff) + + # distance between vertexes + dist_point = np.linalg.norm(np.array(point) - np.array(center)) + + if ( + dist_angle < config.thresh_angle + and best_dist > 1000 + or dist_angle < config.thresh_angle + and best_dist > dist_point + ): + i_best = i_obj + best_angle = dist_angle + best_dist = dist_point + + if i_best == -1: + continue + + if ( + objects[i_best][1][i_lists] is None + or best_angle < config.thresh_angle + and best_dist < objects[i_best][2][i_lists][1] + ): + # set the points + objects[i_best][1][i_lists] = ( + (candidate[0]) * scale_factor, + (candidate[1]) * scale_factor, + ) + # set information about the points: angle and distance + objects[i_best][2][i_lists] = (best_angle, best_dist) + # add the sample points + if run_sampling: + objects[i_best][4][i_lists] = all_samples[i_lists][ + i_candidate + ] + return objects, all_peaks diff --git a/rbss_poseestimation/scripts/models.py b/rbss_poseestimation/scripts/models.py new file mode 100755 index 0000000..0c89004 --- /dev/null +++ b/rbss_poseestimation/scripts/models.py @@ -0,0 +1,196 @@ +""" +NVIDIA from jtremblay@gmail.com +""" + +# Networks +import torch +import torch +import torch.nn as nn +import torch.nn.parallel +import torch.utils.data +import torchvision.models as models + + +class DopeNetwork(nn.Module): + def __init__( + self, + pretrained=False, + numBeliefMap=9, + numAffinity=16, + stop_at_stage=6, # number of stages to process (if less than total number of stages) + ): + super(DopeNetwork, self).__init__() + + self.stop_at_stage = stop_at_stage + + vgg_full = models.vgg19(pretrained=False).features + self.vgg = nn.Sequential() + for i_layer in range(24): + self.vgg.add_module(str(i_layer), vgg_full[i_layer]) + + # Add some layers + i_layer = 23 + self.vgg.add_module( + str(i_layer), nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1) + ) + self.vgg.add_module(str(i_layer + 1), nn.ReLU(inplace=True)) + self.vgg.add_module( + str(i_layer + 2), nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1) + ) + self.vgg.add_module(str(i_layer + 3), nn.ReLU(inplace=True)) + + # print('---Belief------------------------------------------------') + # _2 are the belief map stages + self.m1_2 = DopeNetwork.create_stage(128, numBeliefMap, True) + self.m2_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + self.m3_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + self.m4_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + self.m5_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + self.m6_2 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numBeliefMap, False + ) + + # print('---Affinity----------------------------------------------') + # _1 are the affinity map stages + self.m1_1 = DopeNetwork.create_stage(128, numAffinity, True) + self.m2_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + self.m3_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + self.m4_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + self.m5_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + self.m6_1 = DopeNetwork.create_stage( + 128 + numBeliefMap + numAffinity, numAffinity, False + ) + + def forward(self, x): + """Runs inference on the neural network""" + + out1 = self.vgg(x) + + out1_2 = self.m1_2(out1) + out1_1 = self.m1_1(out1) + + if self.stop_at_stage == 1: + return [out1_2], [out1_1] + + out2 = torch.cat([out1_2, out1_1, out1], 1) + out2_2 = self.m2_2(out2) + out2_1 = self.m2_1(out2) + + if self.stop_at_stage == 2: + return [out1_2, out2_2], [out1_1, out2_1] + + out3 = torch.cat([out2_2, out2_1, out1], 1) + out3_2 = self.m3_2(out3) + out3_1 = self.m3_1(out3) + + if self.stop_at_stage == 3: + return [out1_2, out2_2, out3_2], [out1_1, out2_1, out3_1] + + out4 = torch.cat([out3_2, out3_1, out1], 1) + out4_2 = self.m4_2(out4) + out4_1 = self.m4_1(out4) + + if self.stop_at_stage == 4: + return [out1_2, out2_2, out3_2, out4_2], [out1_1, out2_1, out3_1, out4_1] + + out5 = torch.cat([out4_2, out4_1, out1], 1) + out5_2 = self.m5_2(out5) + out5_1 = self.m5_1(out5) + + if self.stop_at_stage == 5: + return [out1_2, out2_2, out3_2, out4_2, out5_2], [ + out1_1, + out2_1, + out3_1, + out4_1, + out5_1, + ] + + out6 = torch.cat([out5_2, out5_1, out1], 1) + out6_2 = self.m6_2(out6) + out6_1 = self.m6_1(out6) + + return [out1_2, out2_2, out3_2, out4_2, out5_2, out6_2], [ + out1_1, + out2_1, + out3_1, + out4_1, + out5_1, + out6_1, + ] + + @staticmethod + def create_stage(in_channels, out_channels, first=False): + """Create the neural network layers for a single stage.""" + + model = nn.Sequential() + mid_channels = 128 + if first: + padding = 1 + kernel = 3 + count = 6 + final_channels = 512 + else: + padding = 3 + kernel = 7 + count = 10 + final_channels = mid_channels + + # First convolution + model.add_module( + "0", + nn.Conv2d( + in_channels, mid_channels, kernel_size=kernel, stride=1, padding=padding + ), + ) + + # Middle convolutions + i = 1 + while i < count - 1: + model.add_module(str(i), nn.ReLU(inplace=True)) + i += 1 + model.add_module( + str(i), + nn.Conv2d( + mid_channels, + mid_channels, + kernel_size=kernel, + stride=1, + padding=padding, + ), + ) + i += 1 + + # Penultimate convolution + model.add_module(str(i), nn.ReLU(inplace=True)) + i += 1 + model.add_module( + str(i), nn.Conv2d(mid_channels, final_channels, kernel_size=1, stride=1) + ) + i += 1 + + # Last convolution + model.add_module(str(i), nn.ReLU(inplace=True)) + i += 1 + model.add_module( + str(i), nn.Conv2d(final_channels, out_channels, kernel_size=1, stride=1) + ) + i += 1 + + return model diff --git a/rbss_poseestimation/scripts/pe_dope_lc.py b/rbss_poseestimation/scripts/pe_dope_lc.py new file mode 100755 index 0000000..46e0d3c --- /dev/null +++ b/rbss_poseestimation/scripts/pe_dope_lc.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +""" + pose_estimation_lifecycle_node_with_DOPE + ROS 2 program for 6D Pose Estimation + + Source run: +python inference.py --weights ../output/weights_2996 --data ../sample_dataset100 --object fork --exts jpg \ + --config config/config_pose_fork.yaml --camera config/camera_info_640x480.yaml + + @shalenikol release 0.4 +""" +import os +import json +import yaml + +import rclpy +from rclpy.lifecycle import Node +from rclpy.lifecycle import State +from rclpy.lifecycle import TransitionCallbackReturn + +from ament_index_python.packages import get_package_share_directory +from sensor_msgs.msg import Image, CameraInfo +from geometry_msgs.msg import Pose + +from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images +import cv2 # OpenCV library + +FILE_DOPE_CONFIG = "pe_dope_config.yaml" +# FILE_TEMP_IMAGE = "image_rgb.png" +CAMERA_LINK_DEFAULT = "rgbd_camera" +NODE_NAME_DEFAULT = "lc_dope" # this name must match the name in the description (["Module"]["node_name"]) +PARAM_SKILL_CFG = "lc_dope_cfg" + + +def get_transfer_path_() -> str: + return os.path.join(get_package_share_directory("rbs_perception"), "config") + +class PE_DOPE(Node): + """Pose estimation lifecycle node with DOPE.""" + def __init__(self, **kwargs): + """Construct the node.""" + # for other nodes + kwargs["allow_undeclared_parameters"] = True + kwargs["automatically_declare_parameters_from_overrides"] = True + super().__init__(NODE_NAME_DEFAULT, **kwargs) + + str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value + self.skill_cfg = json.loads(str_cfg) + self.nodeName = NODE_NAME_DEFAULT # self.skill_cfg["Module"]["node_name"] + # out_par = self.skill_cfg["Interface"]["Output"][0] + # self.topicSrv = self.nodeName + "/" + out_par["name"] + self.topicSrv = self.skill_cfg["topicsOut"][0]["name"] + + # Used to convert between ROS and OpenCV images + self.br = CvBridge() + self.dope_cfg = self._load_config_DOPE() + + self._cam_pose = Pose() + + self._is_camerainfo = False + self.topicImage = "" + self.topicCameraInfo = "" + self._set_camera_topic(CAMERA_LINK_DEFAULT) + self._sub = None + self._sub_info = None + self._pub = None + self._image_cnt = 0 + self._K = [] + + def _load_config_DOPE(self): + p = os.path.join(get_transfer_path_(), FILE_DOPE_CONFIG) + with open(p, "r") as f: + y = yaml.load(f, Loader=yaml.FullLoader) + return y + + def _set_camera_topic(self, camera_link: str): + """ Service for camera name topics """ + self.topicImage = "/" + camera_link + "/image" + self.topicCameraInfo = "/" + camera_link +"/camera_info" + + def listener_camera_info(self, data): + """ CameraInfo callback function. """ + if self._is_camerainfo: # don’t read camera info again + return + + self._res = [data.height, data.width] + k_ = data.k + self._K = [ + [k_[0], k_[1], k_[2]], + [k_[3], k_[4], k_[5]], + [k_[6], k_[7], k_[8]] + ] + # set the indicator for receiving the camera info + self._is_camerainfo = True + + def on_configure(self, state: State) -> TransitionCallbackReturn: + json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value + jdata = json.loads(json_param) + dependency = {} + for comm in jdata["BTAction"]: + for par in comm["param"]: + if par["type"] == "weights": + dependency = par["dependency"] + elif par["type"] == "topic": + t_dep = par["dependency"] + if "Image" in t_dep["topicType"]: + self.topicImage = t_dep["topicOut"] + else: + self.topicCameraInfo = t_dep["topicOut"] + assert dependency, "no weights dependency" + assert self.topicImage, "no input topic dependency" + + # Create the subscribers. + self._sub_info = self.create_subscription(CameraInfo, self.topicCameraInfo, self.listener_camera_info, 2) + # Create the publisher. + self._pub = self.create_lifecycle_publisher(Pose, self.topicSrv, 1) + + # Load model weights + w = dependency["weights_file"] + if not os.path.isfile(w): + self.get_logger().warning(f"No weights found <{w}>") + return TransitionCallbackReturn.FAILURE + + obj = dependency["object_name"] + dim = dependency["dimensions"] + self.dope_node = Dope(self.dope_cfg, w, obj, dim) + + self.get_logger().info(f"configure is success (with '{obj}')") + return TransitionCallbackReturn.SUCCESS + + def on_activate(self, state: State) -> TransitionCallbackReturn: + self.get_logger().info("on_activate is called") + # Create the Image subscriber. + self._sub = self.create_subscription(Image, self.topicImage, self.image_callback, 3) + return super().on_activate(state) + + def on_deactivate(self, state: State) -> TransitionCallbackReturn: + self.get_logger().info("on_deactivate is called") + # Destroy the Image subscriber. + self.destroy_subscription(self._sub) + return super().on_deactivate(state) + + def on_cleanup(self, state: State) -> TransitionCallbackReturn: + self.destroy_publisher(self._pub) + self.destroy_subscription(self._sub_info) + + self._is_camerainfo = False + self._pub = None + + self.get_logger().info("on_cleanup is called") + return TransitionCallbackReturn.SUCCESS + + def on_shutdown(self, state: State) -> TransitionCallbackReturn: + self.get_logger().info("on_shutdown is called") + return TransitionCallbackReturn.SUCCESS + + # def test_im_proc(self): + # im = "im_tst.jpg" + # if not os.path.isfile(im): + # print(f"File not found '{im}'") + # return + # frame = cv2.imread(im) + # frame = frame[..., ::-1].copy() + # self._K = [[585.756089952257, 0.0, 319.5], [0.0, 585.756089952257, 239.5], [0.0, 0.0, 1.0]] + # # call the inference node + # p = self.dope_node.image_processing(img=frame, camera_info=self._K) + # print(f"pose = {p}") + + def image_callback(self, data): + """ Image Callback function. """ + if not self._is_camerainfo: + self.get_logger().warning("No data from CameraInfo") + return + # Convert ROS Image message to OpenCV image + current_frame = self.br.imgmsg_to_cv2(data) + + # # Save image + # frame_im = FILE_TEMP_IMAGE # str(self.objPath / "image_rgb.png") + # cv2.imwrite(frame_im, current_frame) + self._image_cnt += 1 + + self.get_logger().info(f"dope: begin {self._image_cnt}") + current_frame = current_frame[..., ::-1].copy() + pose = self.dope_node.image_processing(img=current_frame, camera_info=self._K) + self.get_logger().info(f"dope: end {self._image_cnt}") + if self._pub is not None and self._pub.is_activated: + # publish pose estimation result + self._pub.publish(pose) + +from detector import ModelData, ObjectDetector +from cuboid_pnp_solver import CuboidPNPSolver +from cuboid import Cuboid3d +import numpy as np + +class Dope(object): + """ROS node that listens to image topic, runs DOPE, and publishes DOPE results""" + + def __init__( + self, + config, # config yaml loaded eg dict + weight, # path to weight + class_name, + dim: list # dimensions of model 'class_name' + ): + self.input_is_rectified = config["input_is_rectified"] + self.downscale_height = config["downscale_height"] + + self.config_detect = lambda: None + self.config_detect.mask_edges = 1 + self.config_detect.mask_faces = 1 + self.config_detect.vertex = 1 + self.config_detect.threshold = 0.5 + self.config_detect.softmax = 1000 + self.config_detect.thresh_angle = config["thresh_angle"] + self.config_detect.thresh_map = config["thresh_map"] + self.config_detect.sigma = config["sigma"] + self.config_detect.thresh_points = config["thresh_points"] + + # load network model, create PNP solver + self.model = ModelData(name=class_name, net_path=weight) + + # TODO warn on load_net_model(): + # Loading DOPE model '/home/shalenikol/robossembler_ws/fork_e47.pth'... + # /home/shalenikol/.local/lib/python3.10/site-packages/torchvision/models/_utils.py:208: + # UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead. + + # warnings.warn( + # /home/shalenikol/.local/lib/python3.10/site-packages/torchvision/models/_utils.py:223: + # UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. + # The current behavior is equivalent to passing `weights=None`. + # warnings.warn(msg) + self.model.load_net_model() + # print("Model Loaded") + + # try: + # self.draw_color = tuple(config["draw_colors"][class_name]) + # except: + self.draw_color = (0, 255, 0) + + # TODO load dim from config + # dim = [13.7, 16.5, 20.2] # config["dimensions"][class_name] + self.dimension = tuple(dim) + self.class_id = 1 #config["class_ids"][class_name] + + self.pnp_solver = CuboidPNPSolver(class_name, cuboid3d=Cuboid3d(dim)) + self.class_name = class_name + # print("Ctrl-C to stop") + + def image_processing( + self, + img, + camera_info + # img_name, # this is the name of the img file to save, it needs the .png at the end + # output_folder, # folder where to put the output + # weight + ) -> Pose: + # !!! Allways self.input_is_rectified = True + camera_matrix = np.matrix(camera_info, dtype="float64").copy() + dist_coeffs = np.zeros((4, 1)) + # Update camera matrix and distortion coefficients + # if self.input_is_rectified: + # P = np.matrix( + # camera_info["projection_matrix"]["data"], dtype="float64" + # ).copy() + # P.resize((3, 4)) + # camera_matrix = P[:, :3] + # dist_coeffs = np.zeros((4, 1)) + # else: + # # TODO ??? + # camera_matrix = np.matrix(camera_info.K, dtype="float64") + # camera_matrix.resize((3, 3)) + # dist_coeffs = np.matrix(camera_info.D, dtype="float64") + # dist_coeffs.resize((len(camera_info.D), 1)) + + # Downscale image if necessary + height, width, _ = img.shape + scaling_factor = float(self.downscale_height) / height + if scaling_factor < 1.0: + camera_matrix[:2] *= scaling_factor + img = cv2.resize(img, (int(scaling_factor * width), int(scaling_factor * height))) + + self.pnp_solver.set_camera_intrinsic_matrix(camera_matrix) + self.pnp_solver.set_dist_coeffs(dist_coeffs) + + # # Copy and draw image + # img_copy = img.copy() + # im = Image.fromarray(img_copy) + # draw = Draw(im) + + # # dictionary for the final output + # dict_out = {"camera_data": {}, "objects": []} + + # Detect object + results, _ = ObjectDetector.detect_object_in_image( + self.model.net, self.pnp_solver, img, self.config_detect + ) + + # Publish pose #and overlay cube on image + p = Pose() + for _, result in enumerate(results): + if result["location"] is None: + continue + + l = result["location"] + q = result["quaternion"] + p.position.x = l[0] + p.position.y = l[1] + p.position.z = l[2] + p.orientation.x = q[0] + p.orientation.y = q[1] + p.orientation.z = q[2] + p.orientation.w = q[3] + break # !!! only considering the first option for now + return p + # # save the json files + # with open(f"tmp_result{i}.json", "w") as fp: + # json.dump(result, fp, indent=2) + + # dict_out["objects"].append( + # { + # "class": self.class_name, + # "location": np.array(loc).tolist(), + # "quaternion_xyzw": np.array(ori).tolist(), + # "projected_cuboid": np.array(result["projected_points"]).tolist(), + # } + # ) + # # Draw the cube + # if None not in result["projected_points"]: + # points2d = [] + # for pair in result["projected_points"]: + # points2d.append(tuple(pair)) + # draw.draw_cube(points2d, self.draw_color) + + # # create directory to save image if it does not exist + # img_name_base = img_name.split("/")[-1] + # output_path = os.path.join( + # output_folder, + # weight.split("/")[-1].replace(".pth", ""), + # *img_name.split("/")[:-1], + # ) + # if not os.path.isdir(output_path): + # os.makedirs(output_path, exist_ok=True) + + # im.save(os.path.join(output_path, img_name_base)) + + # json_path = os.path.join( + # output_path, ".".join(img_name_base.split(".")[:-1]) + ".json" + # ) + # # save the json files + # with open(json_path, "w") as fp: + # json.dump(dict_out, fp, indent=2) + +def main(): + rclpy.init() + + executor = rclpy.executors.SingleThreadedExecutor() + # executor = rclpy.executors.MultiThreadedExecutor() + lc_node = PE_DOPE() + executor.add_node(lc_node) + try: + executor.spin() + except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): + lc_node.destroy_node() + +if __name__ == '__main__': + main() \ No newline at end of file