From 16ec90cabeb6bd81a4ef5d5bef65f7a3288b5d77 Mon Sep 17 00:00:00 2001 From: shalenikol Date: Tue, 15 Oct 2024 12:46:17 +0300 Subject: [PATCH] delete pe_dope from rbs_perception --- rbs_perception/CMakeLists.txt | 7 +- rbs_perception/scripts/cuboid.py | 126 --- rbs_perception/scripts/cuboid_pnp_solver.py | 146 ---- rbs_perception/scripts/detector.py | 901 -------------------- rbs_perception/scripts/models.py | 196 ----- rbs_perception/scripts/pe_dope_lc.py | 410 --------- 6 files changed, 1 insertion(+), 1785 deletions(-) delete mode 100755 rbs_perception/scripts/cuboid.py delete mode 100755 rbs_perception/scripts/cuboid_pnp_solver.py delete mode 100755 rbs_perception/scripts/detector.py delete mode 100755 rbs_perception/scripts/models.py delete mode 100755 rbs_perception/scripts/pe_dope_lc.py diff --git a/rbs_perception/CMakeLists.txt b/rbs_perception/CMakeLists.txt index 8f23f41..b8ec830 100644 --- a/rbs_perception/CMakeLists.txt +++ b/rbs_perception/CMakeLists.txt @@ -37,7 +37,7 @@ install(PROGRAMS scripts/grasp_marker_publish.py scripts/pose_estimation.py scripts/pose_estimation_lifecycle.py - scripts/pe_dope_lc.py + # scripts/pe_dope_lc.py scripts/rbs_interface.py DESTINATION lib/${PROJECT_NAME} ) @@ -67,11 +67,6 @@ install( DESTINATION share/${PROJECT_NAME} ) -install( - DIRECTORY config - DESTINATION share/${PROJECT_NAME} -) - install(TARGETS pc_filter DESTINATION lib/${PROJECT_NAME} diff --git a/rbs_perception/scripts/cuboid.py b/rbs_perception/scripts/cuboid.py deleted file mode 100755 index 469df1f..0000000 --- a/rbs_perception/scripts/cuboid.py +++ /dev/null @@ -1,126 +0,0 @@ -# Copyright (c) 2018 NVIDIA Corporation. All rights reserved. -# This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. -# https://creativecommons.org/licenses/by-nc-sa/4.0/legalcode - -from enum import IntEnum, unique -import numpy as np -import cv2 - -# Related to the object's local coordinate system -# @unique -class CuboidVertexType(IntEnum): - FrontTopRight = 0 - FrontTopLeft = 1 - FrontBottomLeft = 2 - FrontBottomRight = 3 - RearTopRight = 4 - RearTopLeft = 5 - RearBottomLeft = 6 - RearBottomRight = 7 - Center = 8 - TotalCornerVertexCount = 8 # Corner vertexes doesn't include the center point - TotalVertexCount = 9 - - -# List of the vertex indexes in each line edges of the cuboid -CuboidLineIndexes = [ - # Front face - [CuboidVertexType.FrontTopLeft, CuboidVertexType.FrontTopRight], - [CuboidVertexType.FrontTopRight, CuboidVertexType.FrontBottomRight], - [CuboidVertexType.FrontBottomRight, CuboidVertexType.FrontBottomLeft], - [CuboidVertexType.FrontBottomLeft, CuboidVertexType.FrontTopLeft], - # Back face - [CuboidVertexType.RearTopLeft, CuboidVertexType.RearTopRight], - [CuboidVertexType.RearTopRight, CuboidVertexType.RearBottomRight], - [CuboidVertexType.RearBottomRight, CuboidVertexType.RearBottomLeft], - [CuboidVertexType.RearBottomLeft, CuboidVertexType.RearTopLeft], - # Left face - [CuboidVertexType.FrontBottomLeft, CuboidVertexType.RearBottomLeft], - [CuboidVertexType.FrontTopLeft, CuboidVertexType.RearTopLeft], - # Right face - [CuboidVertexType.FrontBottomRight, CuboidVertexType.RearBottomRight], - [CuboidVertexType.FrontTopRight, CuboidVertexType.RearTopRight], -] - - -# ========================= Cuboid3d ========================= -class Cuboid3d: - """This class contains a 3D cuboid.""" - - # Create a box with a certain size - def __init__( - self, - size3d=[1.0, 1.0, 1.0], - center_location=[0, 0, 0], - coord_system=None, - parent_object=None, - ): - - # NOTE: This local coordinate system is similar - # to the intrinsic transform matrix of a 3d object - self.center_location = center_location - self.coord_system = coord_system - self.size3d = size3d - self._vertices = [0, 0, 0] * CuboidVertexType.TotalVertexCount - - self.generate_vertexes() - - def get_vertex(self, vertex_type): - """Returns the location of a vertex. - - Args: - vertex_type: enum of type CuboidVertexType - - Returns: - Numpy array(3) - Location of the vertex type in the cuboid - """ - return self._vertices[vertex_type] - - def get_vertices(self): - return self._vertices - - def generate_vertexes(self): - width, height, depth = self.size3d - - # By default just use the normal OpenCV coordinate system - if self.coord_system is None: - cx, cy, cz = self.center_location - # X axis point to the right - right = cx + width / 2.0 - left = cx - width / 2.0 - # Y axis point downward - top = cy - height / 2.0 - bottom = cy + height / 2.0 - # Z axis point forward - front = cz + depth / 2.0 - rear = cz - depth / 2.0 - - # List of 8 vertices of the box - self._vertices = [ - [right, top, front], # Front Top Right - [left, top, front], # Front Top Left - [left, bottom, front], # Front Bottom Left - [right, bottom, front], # Front Bottom Right - [right, top, rear], # Rear Top Right - [left, top, rear], # Rear Top Left - [left, bottom, rear], # Rear Bottom Left - [right, bottom, rear], # Rear Bottom Right - self.center_location, # Center - ] - else: - sx, sy, sz = self.size3d - forward = np.array(self.coord_system.forward, dtype=float) * sy * 0.5 - up = np.array(self.coord_system.up, dtype=float) * sz * 0.5 - right = np.array(self.coord_system.right, dtype=float) * sx * 0.5 - center = np.array(self.center_location, dtype=float) - self._vertices = [ - center + forward + up + right, # Front Top Right - center + forward + up - right, # Front Top Left - center + forward - up - right, # Front Bottom Left - center + forward - up + right, # Front Bottom Right - center - forward + up + right, # Rear Top Right - center - forward + up - right, # Rear Top Left - center - forward - up - right, # Rear Bottom Left - center - forward - up + right, # Rear Bottom Right - self.center_location, # Center - ] diff --git a/rbs_perception/scripts/cuboid_pnp_solver.py b/rbs_perception/scripts/cuboid_pnp_solver.py deleted file mode 100755 index bcb113f..0000000 --- a/rbs_perception/scripts/cuboid_pnp_solver.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2018 NVIDIA Corporation. All rights reserved. -# This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. -# https://creativecommons.org/licenses/by-nc-sa/4.0/legalcode - -import cv2 -import numpy as np -from cuboid import CuboidVertexType -from pyrr import Quaternion - - -class CuboidPNPSolver(object): - """ - This class is used to find the 6-DoF pose of a cuboid given its projected vertices. - - Runs perspective-n-point (PNP) algorithm. - """ - - # Class variables - cv2version = cv2.__version__.split(".") - cv2majorversion = int(cv2version[0]) - - def __init__( - self, - object_name="", - camera_intrinsic_matrix=None, - cuboid3d=None, - dist_coeffs=np.zeros((4, 1)), - ): - self.object_name = object_name - if not camera_intrinsic_matrix is None: - self._camera_intrinsic_matrix = camera_intrinsic_matrix - else: - self._camera_intrinsic_matrix = np.array([[0, 0, 0], [0, 0, 0], [0, 0, 0]]) - self._cuboid3d = cuboid3d - - self._dist_coeffs = dist_coeffs - - def set_camera_intrinsic_matrix(self, new_intrinsic_matrix): - """Sets the camera intrinsic matrix""" - self._camera_intrinsic_matrix = new_intrinsic_matrix - - def set_dist_coeffs(self, dist_coeffs): - """Sets the camera intrinsic matrix""" - self._dist_coeffs = dist_coeffs - - def solve_pnp(self, cuboid2d_points, pnp_algorithm=None): - """ - Detects the rotation and traslation - of a cuboid object from its vertexes' - 2D location in the image - """ - - # Fallback to default PNP algorithm base on OpenCV version - if pnp_algorithm is None: - if CuboidPNPSolver.cv2majorversion == 2: - pnp_algorithm = cv2.CV_ITERATIVE - elif CuboidPNPSolver.cv2majorversion == 3: - pnp_algorithm = cv2.SOLVEPNP_ITERATIVE - - if pnp_algorithm is None: - pnp_algorithm = cv2.SOLVEPNP_EPNP - - location = None - quaternion = None - projected_points = cuboid2d_points - - cuboid3d_points = np.array(self._cuboid3d.get_vertices()) - obj_2d_points = [] - obj_3d_points = [] - - for i in range(CuboidVertexType.TotalVertexCount): - check_point_2d = cuboid2d_points[i] - # Ignore invalid points - if check_point_2d is None: - continue - obj_2d_points.append(check_point_2d) - obj_3d_points.append(cuboid3d_points[i]) - - obj_2d_points = np.array(obj_2d_points, dtype=float) - obj_3d_points = np.array(obj_3d_points, dtype=float) - - valid_point_count = len(obj_2d_points) - - # Can only do PNP if we have more than 3 valid points - is_points_valid = valid_point_count >= 4 - - if is_points_valid: - - ret, rvec, tvec = cv2.solvePnP( - obj_3d_points, - obj_2d_points, - self._camera_intrinsic_matrix, - self._dist_coeffs, - flags=pnp_algorithm, - ) - - if ret: - location = list(x[0] for x in tvec) - quaternion = self.convert_rvec_to_quaternion(rvec) - - projected_points, _ = cv2.projectPoints( - cuboid3d_points, - rvec, - tvec, - self._camera_intrinsic_matrix, - self._dist_coeffs, - ) - projected_points = np.squeeze(projected_points) - - # If the location.Z is negative or object is behind the camera then flip both location and rotation - x, y, z = location - if z < 0: - # Get the opposite location - location = [-x, -y, -z] - - # Change the rotation by 180 degree - rotate_angle = np.pi - rotate_quaternion = Quaternion.from_axis_rotation( - location, rotate_angle - ) - quaternion = rotate_quaternion.cross(quaternion) - - return location, quaternion, projected_points - - def convert_rvec_to_quaternion(self, rvec): - """Convert rvec (which is log quaternion) to quaternion""" - theta = np.sqrt( - rvec[0] * rvec[0] + rvec[1] * rvec[1] + rvec[2] * rvec[2] - ) # in radians - raxis = [rvec[0] / theta, rvec[1] / theta, rvec[2] / theta] - - # pyrr's Quaternion (order is XYZW), https://pyrr.readthedocs.io/en/latest/oo_api_quaternion.html - return Quaternion.from_axis_rotation(raxis, theta) - - def project_points(self, rvec, tvec): - """Project points from model onto image using rotation, translation""" - output_points, tmp = cv2.projectPoints( - self.__object_vertex_coordinates, - rvec, - tvec, - self.__camera_intrinsic_matrix, - self.__dist_coeffs, - ) - - output_points = np.squeeze(output_points) - return output_points diff --git a/rbs_perception/scripts/detector.py b/rbs_perception/scripts/detector.py deleted file mode 100755 index b26a7f3..0000000 --- a/rbs_perception/scripts/detector.py +++ /dev/null @@ -1,901 +0,0 @@ -# Copyright (c) 2018 NVIDIA Corporation. All rights reserved. -# This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. -# https://creativecommons.org/licenses/by-nc-sa/4.0/legalcode - -""" -Contains the following classes: - - ModelData - High level information encapsulation - - ObjectDetector - Greedy algorithm to build cuboids from belief maps -""" -# 14.06.2024 @shalenikol find_object_poses: remove "cuboid2d" - -import time - -import sys -from os import path - -import numpy as np - -import torch -import torch.nn as nn -import torchvision.transforms as transforms -from torch.autograd import Variable -import torchvision.models as models - -from scipy.ndimage.filters import gaussian_filter -from scipy import optimize - -import sys - -sys.path.append("../") -from models import * - -# Import the definition of the neural network model and cuboids -from cuboid_pnp_solver import * - -# global transform for image input -transform = transforms.Compose( - [ - # transforms.Scale(IMAGE_SIZE), - # transforms.CenterCrop((imagesize,imagesize)), - transforms.ToTensor(), - # transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), - transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), - ] -) - - -# ================================ Models ================================ - - -class DopeNetwork(nn.Module): - def __init__( - self, - numBeliefMap=9, - numAffinity=16, - stop_at_stage=6, # number of stages to process (if less than total number of stages) - ): - super(DopeNetwork, self).__init__() - - self.stop_at_stage = stop_at_stage - - vgg_full = models.vgg19(pretrained=False).features - self.vgg = nn.Sequential() - for i_layer in range(24): - self.vgg.add_module(str(i_layer), vgg_full[i_layer]) - - # Add some layers - i_layer = 23 - self.vgg.add_module( - str(i_layer), nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1) - ) - self.vgg.add_module(str(i_layer + 1), nn.ReLU(inplace=True)) - self.vgg.add_module( - str(i_layer + 2), nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1) - ) - self.vgg.add_module(str(i_layer + 3), nn.ReLU(inplace=True)) - - # print('---Belief------------------------------------------------') - # _2 are the belief map stages - self.m1_2 = DopeNetwork.create_stage(128, numBeliefMap, True) - self.m2_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - self.m3_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - self.m4_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - self.m5_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - self.m6_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - - # print('---Affinity----------------------------------------------') - # _1 are the affinity map stages - self.m1_1 = DopeNetwork.create_stage(128, numAffinity, True) - self.m2_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - self.m3_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - self.m4_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - self.m5_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - self.m6_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - - def forward(self, x): - """Runs inference on the neural network""" - - out1 = self.vgg(x) - - out1_2 = self.m1_2(out1) - out1_1 = self.m1_1(out1) - - if self.stop_at_stage == 1: - return [out1_2], [out1_1] - - out2 = torch.cat([out1_2, out1_1, out1], 1) - out2_2 = self.m2_2(out2) - out2_1 = self.m2_1(out2) - - if self.stop_at_stage == 2: - return [out1_2, out2_2], [out1_1, out2_1] - - out3 = torch.cat([out2_2, out2_1, out1], 1) - out3_2 = self.m3_2(out3) - out3_1 = self.m3_1(out3) - - if self.stop_at_stage == 3: - return [out1_2, out2_2, out3_2], [out1_1, out2_1, out3_1] - - out4 = torch.cat([out3_2, out3_1, out1], 1) - out4_2 = self.m4_2(out4) - out4_1 = self.m4_1(out4) - - if self.stop_at_stage == 4: - return [out1_2, out2_2, out3_2, out4_2], [out1_1, out2_1, out3_1, out4_1] - - out5 = torch.cat([out4_2, out4_1, out1], 1) - out5_2 = self.m5_2(out5) - out5_1 = self.m5_1(out5) - - if self.stop_at_stage == 5: - return [out1_2, out2_2, out3_2, out4_2, out5_2], [ - out1_1, - out2_1, - out3_1, - out4_1, - out5_1, - ] - - out6 = torch.cat([out5_2, out5_1, out1], 1) - out6_2 = self.m6_2(out6) - out6_1 = self.m6_1(out6) - - return [out1_2, out2_2, out3_2, out4_2, out5_2, out6_2], [ - out1_1, - out2_1, - out3_1, - out4_1, - out5_1, - out6_1, - ] - - @staticmethod - def create_stage(in_channels, out_channels, first=False): - """Create the neural network layers for a single stage.""" - - model = nn.Sequential() - mid_channels = 128 - if first: - padding = 1 - kernel = 3 - count = 6 - final_channels = 512 - else: - padding = 3 - kernel = 7 - count = 10 - final_channels = mid_channels - - # First convolution - model.add_module( - "0", - nn.Conv2d( - in_channels, mid_channels, kernel_size=kernel, stride=1, padding=padding - ), - ) - - # Middle convolutions - i = 1 - while i < count - 1: - model.add_module(str(i), nn.ReLU(inplace=True)) - i += 1 - model.add_module( - str(i), - nn.Conv2d( - mid_channels, - mid_channels, - kernel_size=kernel, - stride=1, - padding=padding, - ), - ) - i += 1 - - # Penultimate convolution - model.add_module(str(i), nn.ReLU(inplace=True)) - i += 1 - model.add_module( - str(i), nn.Conv2d(mid_channels, final_channels, kernel_size=1, stride=1) - ) - i += 1 - - # Last convolution - model.add_module(str(i), nn.ReLU(inplace=True)) - i += 1 - model.add_module( - str(i), nn.Conv2d(final_channels, out_channels, kernel_size=1, stride=1) - ) - i += 1 - - return model - - -class ModelData(object): - """This class contains methods for loading the neural network""" - - def __init__(self, name="", net_path="", gpu_id=0, architecture="dope"): - self.name = name - self.net_path = net_path # Path to trained network model - self.net = None # Trained network - self.gpu_id = gpu_id - self.architecture = architecture - - def get_net(self): - """Returns network""" - if not self.net: - self.load_net_model() - return self.net - - def load_net_model(self): - """Loads network model from disk""" - if not self.net and path.exists(self.net_path): - self.net = self.load_net_model_path(self.net_path) - if not path.exists(self.net_path): - print("ERROR: Unable to find model weights: '{}'".format(self.net_path)) - exit(0) - - def load_net_model_path(self, path): - """Loads network model from disk with given path""" - model_loading_start_time = time.time() - print("Loading DOPE model '{}'...".format(path)) - net = DopeNetwork() - - net = torch.nn.DataParallel(net, [0]).cuda() - net.load_state_dict(torch.load(path)) - net.eval() - print( - " Model loaded in {:.2f} seconds.".format( - time.time() - model_loading_start_time - ) - ) - return net - - def __str__(self): - """Converts to string""" - return "{}: {}".format(self.name, self.net_path) - - -# ================================ ObjectDetector ================================ -class ObjectDetector(object): - """This class contains methods for object detection""" - - @staticmethod - def gaussian(height, center_x, center_y, width_x, width_y): - """Returns a gaussian function with the given parameters""" - width_x = float(width_x) - width_y = float(width_y) - return lambda x, y: height * np.exp( - -(((center_x - x) / width_x) ** 2 + ((center_y - y) / width_y) ** 2) / 2 - ) - - @staticmethod - def moments(data): - """Returns (height, x, y, width_x, width_y) - the gaussian parameters of a 2D distribution by calculating its - moments""" - total = data.sum() - X, Y = np.indices(data.shape) - x = (X * data).sum() / total - y = (Y * data).sum() / total - col = data[:, int(y)] - width_x = np.sqrt( - np.abs((np.arange(col.size) - y) ** 2 * col).sum() / col.sum() - ) - row = data[int(x), :] - width_y = np.sqrt( - np.abs((np.arange(row.size) - x) ** 2 * row).sum() / row.sum() - ) - height = data.max() - return height, x, y, width_x, width_y - - @staticmethod - def fitgaussian(data): - """Returns (height, x, y, width_x, width_y) - the gaussian parameters of a 2D distribution found by a fit""" - params = ObjectDetector.moments(data) - errorfunction = lambda p: np.ravel( - ObjectDetector.gaussian(*p)(*np.indices(data.shape)) - data - ) - p, success = optimize.leastsq(errorfunction, params) - return p - - @staticmethod - def make_grid( - tensor, - nrow=8, - padding=2, - normalize=False, - range_=None, - scale_each=False, - pad_value=0, - ): - """Make a grid of images. - Args: - tensor (Tensor or list): 4D mini-batch Tensor of shape (B x C x H x W) - or a list of images all of the same size. - nrow (int, optional): Number of images displayed in each row of the grid. - The Final grid size is (B / nrow, nrow). Default is 8. - padding (int, optional): amount of padding. Default is 2. - normalize (bool, optional): If True, shift the image to the range (0, 1), - by subtracting the minimum and dividing by the maximum pixel value. - range (tuple, optional): tuple (min, max) where min and max are numbers, - then these numbers are used to normalize the image. By default, min and max - are computed from the tensor. - scale_each (bool, optional): If True, scale each image in the batch of - images separately rather than the (min, max) over all images. - pad_value (float, optional): Value for the padded pixels. - Example: - See this notebook `here `_ - """ - import math - - if not ( - torch.is_tensor(tensor) - or (isinstance(tensor, list) and all(torch.is_tensor(t) for t in tensor)) - ): - raise TypeError( - "tensor or list of tensors expected, got {}".format(type(tensor)) - ) - - # if list of tensors, convert to a 4D mini-batch Tensor - if isinstance(tensor, list): - tensor = torch.stack(tensor, dim=0) - - if tensor.dim() == 2: # single image H x W - tensor = tensor.view(1, tensor.size(0), tensor.size(1)) - if tensor.dim() == 3: # single image - if tensor.size(0) == 1: # if single-channel, convert to 3-channel - tensor = torch.cat((tensor, tensor, tensor), 0) - tensor = tensor.view(1, tensor.size(0), tensor.size(1), tensor.size(2)) - - if tensor.dim() == 4 and tensor.size(1) == 1: # single-channel images - tensor = torch.cat((tensor, tensor, tensor), 1) - - if normalize is True: - tensor = tensor.clone() # avoid modifying tensor in-place - if range_ is not None: - assert isinstance( - range_, tuple - ), "range has to be a tuple (min, max) if specified. min and max are numbers" - - def norm_ip(img, min, max): - img.clamp_(min=min, max=max) - img.add_(-min).div_(max - min + 1e-5) - - def norm_range(t, range_): - if range_ is not None: - norm_ip(t, range_[0], range_[1]) - else: - norm_ip(t, float(t.min()), float(t.max())) - - if scale_each is True: - for t in tensor: # loop over mini-batch dimension - norm_range(t, range) - else: - norm_range(tensor, range) - - if tensor.size(0) == 1: - return tensor.squeeze() - - # make the mini-batch of images into a grid - nmaps = tensor.size(0) - xmaps = min(nrow, nmaps) - ymaps = int(math.ceil(float(nmaps) / xmaps)) - height, width = int(tensor.size(2) + padding), int(tensor.size(3) + padding) - grid = tensor.new(3, height * ymaps + padding, width * xmaps + padding).fill_( - pad_value - ) - k = 0 - for y in range(ymaps): - for x in range(xmaps): - if k >= nmaps: - break - grid.narrow(1, y * height + padding, height - padding).narrow( - 2, x * width + padding, width - padding - ).copy_(tensor[k]) - k = k + 1 - return grid - - @staticmethod - def get_image_grid(tensor, filename, nrow=3, padding=2, mean=None, std=None): - """ - Saves a given Tensor into an image file. - If given a mini-batch tensor, will save the tensor as a grid of images. - """ - from PIL import Image - - # tensor = tensor.cpu() - grid = ObjectDetector.make_grid(tensor, nrow=nrow, padding=10, pad_value=1) - if not mean is None: - # ndarr = grid.mul(std).add(mean).mul(255).byte().transpose(0,2).transpose(0,1).numpy() - ndarr = ( - grid.mul(std) - .add(mean) - .mul(255) - .byte() - .transpose(0, 2) - .transpose(0, 1) - .numpy() - ) - else: - ndarr = ( - grid.mul(0.5) - .add(0.5) - .mul(255) - .byte() - .transpose(0, 2) - .transpose(0, 1) - .numpy() - ) - im = Image.fromarray(ndarr) - # im.save(filename) - return im - - @staticmethod - def detect_object_in_image( - net_model, pnp_solver, in_img, config, grid_belief_debug=False, norm_belief=True - ): - """Detect objects in a image using a specific trained network model - Returns the poses of the objects and the belief maps - """ - - if in_img is None: - return [] - - # print("detect_object_in_image - image shape: {}".format(in_img.shape)) - - # Run network inference - image_tensor = transform(in_img) - image_torch = Variable(image_tensor).cuda().unsqueeze(0) - out, seg = net_model( - image_torch - ) # run inference using the network (calls 'forward' method) - vertex2 = out[-1][0] - aff = seg[-1][0] - - # Find objects from network output - detected_objects = ObjectDetector.find_object_poses( - vertex2, aff, pnp_solver, config - ) - - if not grid_belief_debug: - - return detected_objects, None - else: - # Run the belief maps debug display on the beliefmaps - - upsampling = nn.UpsamplingNearest2d(scale_factor=8) - tensor = vertex2 - belief_imgs = [] - in_img = torch.tensor(in_img).float() / 255.0 - in_img *= 0.7 - - for j in range(tensor.size()[0]): - belief = tensor[j].clone() - if norm_belief: - belief -= float(torch.min(belief)[0].data.cpu().numpy()) - belief /= float(torch.max(belief)[0].data.cpu().numpy()) - - belief = ( - upsampling(belief.unsqueeze(0).unsqueeze(0)) - .squeeze() - .squeeze() - .data - ) - belief = torch.clamp(belief, 0, 1).cpu() - belief = torch.cat( - [ - belief.unsqueeze(0) + in_img[:, :, 0], - belief.unsqueeze(0) + in_img[:, :, 1], - belief.unsqueeze(0) + in_img[:, :, 2], - ] - ).unsqueeze(0) - belief = torch.clamp(belief, 0, 1) - - # belief_imgs.append(belief.data.squeeze().cpu().numpy().transpose(1,2,0)) - belief_imgs.append(belief.data.squeeze().numpy()) - - # Create the image grid - belief_imgs = torch.tensor(np.array(belief_imgs)) - - im_belief = ObjectDetector.get_image_grid(belief_imgs, None, mean=0, std=1) - - return detected_objects, im_belief - - @staticmethod - def find_object_poses( - vertex2, - aff, - pnp_solver, - config, - run_sampling=False, - num_sample=100, - scale_factor=8, - ): - """Detect objects given network output""" - - # run_sampling = True - - # Detect objects from belief maps and affinities - objects, all_peaks = ObjectDetector.find_objects( - vertex2, - aff, - config, - run_sampling=run_sampling, - num_sample=num_sample, - scale_factor=scale_factor, - ) - detected_objects = [] - obj_name = pnp_solver.object_name - - print(all_peaks) - - # print("find_object_poses: found {} objects ================".format(len(objects))) - for obj in objects: - # Run PNP - points = obj[1] + [(obj[0][0] * scale_factor, obj[0][1] * scale_factor)] - # print(points) - # cuboid2d = np.copy(points) - location, quaternion, projected_points = pnp_solver.solve_pnp(points) - - # run multiple sample - if run_sampling: - lx, ly, lz = [], [], [] - qx, qy, qz, qw = [], [], [], [] - - for i_sample in range(num_sample): - sample = [] - for i_point in range(len(obj[-1])): - if not obj[-1][i_point][i_sample] is None: - sample.append( - ( - obj[-1][i_point][i_sample][0] * scale_factor, - obj[-1][i_point][i_sample][1] * scale_factor, - ) - ) - else: - sample.append(None) - # final_cuboids.append(sample) - pnp_sample = pnp_solver.solve_pnp(sample) - - try: - lx.append(pnp_sample[0][0]) - ly.append(pnp_sample[0][1]) - lz.append(pnp_sample[0][2]) - - qx.append(pnp_sample[1][0]) - qy.append(pnp_sample[1][1]) - qz.append(pnp_sample[1][2]) - qw.append(pnp_sample[1][3]) - except: - pass - # TODO - # RUN quaternion as well for the std and avg. - - try: - print("----") - print("location:") - print(location[0], location[1], location[2]) - print(np.mean(lx), np.mean(ly), np.mean(lz)) - print(np.std(lx), np.std(ly), np.std(lz)) - print("quaternion:") - print(quaternion[0], quaternion[1], quaternion[2], quaternion[3]) - print(np.mean(qx), np.mean(qy), np.mean(qz), np.mean(qw)) - print(np.std(qx), np.std(qy), np.std(qz), np.std(qw)) - - except: - pass - if not location is None: - detected_objects.append( - { - "name": obj_name, - "location": location, - "quaternion": quaternion, - # "cuboid2d": cuboid2d, - "projected_points": projected_points, - "confidence": obj[-1], - "raw_points": points, - } - ) - - # print("find_object_poses: points = ", type(points), points) - # print("find_object_poses: locn = ", location, "quat =", quaternion) - # print("find_object_poses: projected_points = ", type(projected_points), projected_points) - - return detected_objects - - @staticmethod - def find_objects( - vertex2, - aff, - config, - numvertex=8, - run_sampling=False, - num_sample=100, - scale_factor=8, - ): - """Detects objects given network belief maps and affinities, using heuristic method""" - - all_peaks = [] - all_samples = [] - - peak_counter = 0 - for j in range(vertex2.size()[0]): - belief = vertex2[j].clone() - map_ori = belief.cpu().data.numpy() - - map = gaussian_filter(belief.cpu().data.numpy(), sigma=config.sigma) - p = 1 - map_left = np.zeros(map.shape) - map_left[p:, :] = map[:-p, :] - map_right = np.zeros(map.shape) - map_right[:-p, :] = map[p:, :] - map_up = np.zeros(map.shape) - map_up[:, p:] = map[:, :-p] - map_down = np.zeros(map.shape) - map_down[:, :-p] = map[:, p:] - - peaks_binary = np.logical_and.reduce( - ( - map >= map_left, - map >= map_right, - map >= map_up, - map >= map_down, - map > config.thresh_map, - ) - ) - peaks = zip(np.nonzero(peaks_binary)[1], np.nonzero(peaks_binary)[0]) - - # Computing the weigthed average for localizing the peaks - peaks = list(peaks) - win = 11 - ran = win // 2 - peaks_avg = [] - point_sample_list = [] - for p_value in range(len(peaks)): - p = peaks[p_value] - weights = np.zeros((win, win)) - i_values = np.zeros((win, win)) - j_values = np.zeros((win, win)) - for i in range(-ran, ran + 1): - for j in range(-ran, ran + 1): - if ( - p[1] + i < 0 - or p[1] + i >= map_ori.shape[0] - or p[0] + j < 0 - or p[0] + j >= map_ori.shape[1] - ): - continue - - i_values[j + ran, i + ran] = p[1] + i - j_values[j + ran, i + ran] = p[0] + j - - weights[j + ran, i + ran] = map_ori[p[1] + i, p[0] + j] - # if the weights are all zeros - # then add the none continuous points - OFFSET_DUE_TO_UPSAMPLING = 0.4395 - - # Sample the points using the gaussian - if run_sampling: - data = weights - params = ObjectDetector.fitgaussian(data) - fit = ObjectDetector.gaussian(*params) - _, mu_x, mu_y, std_x, std_y = params - points_sample = np.random.multivariate_normal( - np.array( - [ - p[1] + mu_x + OFFSET_DUE_TO_UPSAMPLING, - p[0] - mu_y + OFFSET_DUE_TO_UPSAMPLING, - ] - ), - # np.array([[std_x*std_x,0],[0,std_y*std_y]]), size=num_sample) - np.array([[std_x, 0], [0, std_y]]), - size=num_sample, - ) - point_sample_list.append(points_sample) - - try: - peaks_avg.append( - ( - np.average(j_values, weights=weights) - + OFFSET_DUE_TO_UPSAMPLING, - np.average(i_values, weights=weights) - + OFFSET_DUE_TO_UPSAMPLING, - ) - ) - except: - peaks_avg.append( - ( - p[0] + OFFSET_DUE_TO_UPSAMPLING, - p[1] + OFFSET_DUE_TO_UPSAMPLING, - ) - ) - - # Note: Python3 doesn't support len for zip object - peaks_len = min( - len(np.nonzero(peaks_binary)[1]), len(np.nonzero(peaks_binary)[0]) - ) - - peaks_with_score = [ - peaks_avg[x_] + (map_ori[peaks[x_][1], peaks[x_][0]],) - for x_ in range(len(peaks)) - ] - - id = range(peak_counter, peak_counter + peaks_len) - - peaks_with_score_and_id = [ - peaks_with_score[i] + (id[i],) for i in range(len(id)) - ] - - all_peaks.append(peaks_with_score_and_id) - all_samples.append(point_sample_list) - peak_counter += peaks_len - - objects = [] - - if aff is None: - # Assume there is only one object - points = [None for i in range(numvertex)] - for i_peak, peaks in enumerate(all_peaks): - # print (peaks) - for peak in peaks: - if peak[2] > config.threshold: - points[i_peak] = (peak[0], peak[1]) - - return points - - # Check object centroid and build the objects if the centroid is found - for nb_object in range(len(all_peaks[-1])): - if all_peaks[-1][nb_object][2] > config.thresh_points: - objects.append( - [ - [ - all_peaks[-1][nb_object][:2][0], - all_peaks[-1][nb_object][:2][1], - ], - [None for i in range(numvertex)], - [None for i in range(numvertex)], - all_peaks[-1][nb_object][2], - [ - [None for j in range(num_sample)] - for i in range(numvertex + 1) - ], - ] - ) - - # Check if the object was added before - if run_sampling and nb_object < len(objects): - # add the samples to the object centroids - objects[nb_object][4][-1] = all_samples[-1][nb_object] - - # Working with an output that only has belief maps - if aff is None: - if len(objects) > 0 and len(all_peaks) > 0 and len(all_peaks[0]) > 0: - for i_points in range(8): - if ( - len(all_peaks[i_points]) > 0 - and all_peaks[i_points][0][2] > config.threshold - ): - objects[0][1][i_points] = ( - all_peaks[i_points][0][0], - all_peaks[i_points][0][1], - ) - else: - # For all points found - for i_lists in range(len(all_peaks[:-1])): - lists = all_peaks[i_lists] - - # Candidate refers to point that needs to be match with a centroid object - for i_candidate, candidate in enumerate(lists): - if candidate[2] < config.thresh_points: - continue - - i_best = -1 - best_dist = 10000 - best_angle = 100 - - # Find the points that links to that centroid. - for i_obj in range(len(objects)): - center = [objects[i_obj][0][0], objects[i_obj][0][1]] - - # integer is used to look into the affinity map, - # but the float version is used to run - point_int = [int(candidate[0]), int(candidate[1])] - point = [candidate[0], candidate[1]] - - # look at the distance to the vector field. - v_aff = ( - np.array( - [ - aff[ - i_lists * 2, point_int[1], point_int[0] - ].data.item(), - aff[ - i_lists * 2 + 1, point_int[1], point_int[0] - ].data.item(), - ] - ) - * 10 - ) - - # normalize the vector - xvec = v_aff[0] - yvec = v_aff[1] - - norms = np.sqrt(xvec * xvec + yvec * yvec) - - xvec /= norms - yvec /= norms - - v_aff = np.concatenate([[xvec], [yvec]]) - - v_center = np.array(center) - np.array(point) - xvec = v_center[0] - yvec = v_center[1] - - norms = np.sqrt(xvec * xvec + yvec * yvec) - - xvec /= norms - yvec /= norms - - v_center = np.concatenate([[xvec], [yvec]]) - - # vector affinity - dist_angle = np.linalg.norm(v_center - v_aff) - - # distance between vertexes - dist_point = np.linalg.norm(np.array(point) - np.array(center)) - - if ( - dist_angle < config.thresh_angle - and best_dist > 1000 - or dist_angle < config.thresh_angle - and best_dist > dist_point - ): - i_best = i_obj - best_angle = dist_angle - best_dist = dist_point - - if i_best == -1: - continue - - if ( - objects[i_best][1][i_lists] is None - or best_angle < config.thresh_angle - and best_dist < objects[i_best][2][i_lists][1] - ): - # set the points - objects[i_best][1][i_lists] = ( - (candidate[0]) * scale_factor, - (candidate[1]) * scale_factor, - ) - # set information about the points: angle and distance - objects[i_best][2][i_lists] = (best_angle, best_dist) - # add the sample points - if run_sampling: - objects[i_best][4][i_lists] = all_samples[i_lists][ - i_candidate - ] - return objects, all_peaks diff --git a/rbs_perception/scripts/models.py b/rbs_perception/scripts/models.py deleted file mode 100755 index 0c89004..0000000 --- a/rbs_perception/scripts/models.py +++ /dev/null @@ -1,196 +0,0 @@ -""" -NVIDIA from jtremblay@gmail.com -""" - -# Networks -import torch -import torch -import torch.nn as nn -import torch.nn.parallel -import torch.utils.data -import torchvision.models as models - - -class DopeNetwork(nn.Module): - def __init__( - self, - pretrained=False, - numBeliefMap=9, - numAffinity=16, - stop_at_stage=6, # number of stages to process (if less than total number of stages) - ): - super(DopeNetwork, self).__init__() - - self.stop_at_stage = stop_at_stage - - vgg_full = models.vgg19(pretrained=False).features - self.vgg = nn.Sequential() - for i_layer in range(24): - self.vgg.add_module(str(i_layer), vgg_full[i_layer]) - - # Add some layers - i_layer = 23 - self.vgg.add_module( - str(i_layer), nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1) - ) - self.vgg.add_module(str(i_layer + 1), nn.ReLU(inplace=True)) - self.vgg.add_module( - str(i_layer + 2), nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1) - ) - self.vgg.add_module(str(i_layer + 3), nn.ReLU(inplace=True)) - - # print('---Belief------------------------------------------------') - # _2 are the belief map stages - self.m1_2 = DopeNetwork.create_stage(128, numBeliefMap, True) - self.m2_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - self.m3_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - self.m4_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - self.m5_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - self.m6_2 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numBeliefMap, False - ) - - # print('---Affinity----------------------------------------------') - # _1 are the affinity map stages - self.m1_1 = DopeNetwork.create_stage(128, numAffinity, True) - self.m2_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - self.m3_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - self.m4_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - self.m5_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - self.m6_1 = DopeNetwork.create_stage( - 128 + numBeliefMap + numAffinity, numAffinity, False - ) - - def forward(self, x): - """Runs inference on the neural network""" - - out1 = self.vgg(x) - - out1_2 = self.m1_2(out1) - out1_1 = self.m1_1(out1) - - if self.stop_at_stage == 1: - return [out1_2], [out1_1] - - out2 = torch.cat([out1_2, out1_1, out1], 1) - out2_2 = self.m2_2(out2) - out2_1 = self.m2_1(out2) - - if self.stop_at_stage == 2: - return [out1_2, out2_2], [out1_1, out2_1] - - out3 = torch.cat([out2_2, out2_1, out1], 1) - out3_2 = self.m3_2(out3) - out3_1 = self.m3_1(out3) - - if self.stop_at_stage == 3: - return [out1_2, out2_2, out3_2], [out1_1, out2_1, out3_1] - - out4 = torch.cat([out3_2, out3_1, out1], 1) - out4_2 = self.m4_2(out4) - out4_1 = self.m4_1(out4) - - if self.stop_at_stage == 4: - return [out1_2, out2_2, out3_2, out4_2], [out1_1, out2_1, out3_1, out4_1] - - out5 = torch.cat([out4_2, out4_1, out1], 1) - out5_2 = self.m5_2(out5) - out5_1 = self.m5_1(out5) - - if self.stop_at_stage == 5: - return [out1_2, out2_2, out3_2, out4_2, out5_2], [ - out1_1, - out2_1, - out3_1, - out4_1, - out5_1, - ] - - out6 = torch.cat([out5_2, out5_1, out1], 1) - out6_2 = self.m6_2(out6) - out6_1 = self.m6_1(out6) - - return [out1_2, out2_2, out3_2, out4_2, out5_2, out6_2], [ - out1_1, - out2_1, - out3_1, - out4_1, - out5_1, - out6_1, - ] - - @staticmethod - def create_stage(in_channels, out_channels, first=False): - """Create the neural network layers for a single stage.""" - - model = nn.Sequential() - mid_channels = 128 - if first: - padding = 1 - kernel = 3 - count = 6 - final_channels = 512 - else: - padding = 3 - kernel = 7 - count = 10 - final_channels = mid_channels - - # First convolution - model.add_module( - "0", - nn.Conv2d( - in_channels, mid_channels, kernel_size=kernel, stride=1, padding=padding - ), - ) - - # Middle convolutions - i = 1 - while i < count - 1: - model.add_module(str(i), nn.ReLU(inplace=True)) - i += 1 - model.add_module( - str(i), - nn.Conv2d( - mid_channels, - mid_channels, - kernel_size=kernel, - stride=1, - padding=padding, - ), - ) - i += 1 - - # Penultimate convolution - model.add_module(str(i), nn.ReLU(inplace=True)) - i += 1 - model.add_module( - str(i), nn.Conv2d(mid_channels, final_channels, kernel_size=1, stride=1) - ) - i += 1 - - # Last convolution - model.add_module(str(i), nn.ReLU(inplace=True)) - i += 1 - model.add_module( - str(i), nn.Conv2d(final_channels, out_channels, kernel_size=1, stride=1) - ) - i += 1 - - return model diff --git a/rbs_perception/scripts/pe_dope_lc.py b/rbs_perception/scripts/pe_dope_lc.py deleted file mode 100755 index 42dcb57..0000000 --- a/rbs_perception/scripts/pe_dope_lc.py +++ /dev/null @@ -1,410 +0,0 @@ -#!/usr/bin/env python3 -""" - pose_estimation_lifecycle_node_with_DOPE - ROS 2 program for 6D Pose Estimation - - Source run: -python inference.py --weights ../output/weights_2996 --data ../sample_dataset100 --object fork --exts jpg \ - --config config/config_pose_fork.yaml --camera config/camera_info_640x480.yaml - - @shalenikol release 0.3 -""" -import os -import json -import yaml - -import rclpy -from rclpy.lifecycle import Node -from rclpy.lifecycle import State -from rclpy.lifecycle import TransitionCallbackReturn - -from ament_index_python.packages import get_package_share_directory -from sensor_msgs.msg import Image, CameraInfo -from geometry_msgs.msg import Pose -# from tf2_ros import TransformException -from tf2_ros.buffer import Buffer - -from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images -import cv2 # OpenCV library - -FILE_DOPE_CONFIG = "pe_dope_config.yaml" -# FILE_TEMP_IMAGE = "image_rgb.png" -CAMERA_LINK_DEFAULT = "outer_rgbd_camera" -NODE_NAME_DEFAULT = "lc_dope" # the name doesn't matter in this node (defined in Launch) -PARAM_SKILL_CFG = "lc_dope_cfg" -# PARAM_SUFFIX = "_cfg" -# node_name = self.cfg_data["Launch"]["name"] -# par_name = node_name + PARAM_SUFFIX - - -def get_transfer_path_() -> str: - return os.path.join(get_package_share_directory("rbs_perception"), "config") - -class PE_DOPE(Node): - """Pose estimation lifecycle node with DOPE.""" - def __init__(self, **kwargs): - """Construct the node.""" - # for other nodes - kwargs["allow_undeclared_parameters"] = True - kwargs["automatically_declare_parameters_from_overrides"] = True - super().__init__(NODE_NAME_DEFAULT, **kwargs) - - str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value - self.skill_cfg = json.loads(str_cfg) - self.nodeName = self.skill_cfg["Launch"]["name"] - out_par = self.skill_cfg["Interface"]["Output"][0] - self.topicSrv = self.nodeName + "/" + out_par["name"] - - # Used to convert between ROS and OpenCV images - self.br = CvBridge() - self.dope_cfg = self._load_config_DOPE() - - self._cam_pose = Pose() - self.tf_buffer = Buffer() - - self._is_camerainfo = False - self.topicImage = "" - self.topicCameraInfo = "" - # self.camera_link = "" - self._set_camera_topic(CAMERA_LINK_DEFAULT) - self._sub = None - self._sub_info = None - self._pub = None - self._image_cnt = 0 - self._K = [] - - def _load_config_DOPE(self): - p = os.path.join(get_transfer_path_(), FILE_DOPE_CONFIG) - with open(p, "r") as f: - y = yaml.load(f, Loader=yaml.FullLoader) - return y - - def _set_camera_topic(self, camera_link: str): - """ Service for camera name topics """ - self.topicImage = "/" + camera_link + "/image" - self.topicCameraInfo = "/" + camera_link +"/camera_info" - # self.camera_link = camera_link - - def listener_camera_info(self, data): - """ CameraInfo callback function. """ - if self._is_camerainfo: # don’t read camera info again - return - - self._res = [data.height, data.width] - k_ = data.k - self._K = [ - [k_[0], k_[1], k_[2]], - [k_[3], k_[4], k_[5]], - [k_[6], k_[7], k_[8]] - ] - # set the indicator for receiving the camera info - self._is_camerainfo = True - - def on_configure(self, state: State) -> TransitionCallbackReturn: - """ - Configure the node, after a configuring transition is requested. - - return: The state machine either invokes a transition to the "inactive" state or stays - in "unconfigured" depending on the return value. - TransitionCallbackReturn.SUCCESS transitions to "inactive". - TransitionCallbackReturn.FAILURE transitions to "unconfigured". - TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing" - """ - json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value - jdata = json.loads(json_param) - dependency = {} - for comm in jdata["BTAction"]: - for par in comm["param"]: - if par["type"] == "weights": - dependency = par["dependency"] - assert dependency, "no dependency" - - for par in jdata["Settings"]: - if par["name"] == "cameraLink": - self._set_camera_topic(par["value"]) - - # Create the subscribers. - self._sub_info = self.create_subscription(CameraInfo, self.topicCameraInfo, self.listener_camera_info, 2) - # Create the publisher. - self._pub = self.create_lifecycle_publisher(Pose, self.topicSrv, 1) - - # Load model weights - w = dependency["weights_file"] - if not os.path.isfile(w): - self.get_logger().warning(f"No weights found <{w}>") - return TransitionCallbackReturn.FAILURE - - obj = dependency["object_name"] - dim = dependency["dimensions"] - self.dope_node = Dope(self.dope_cfg, w, obj, dim) - - self.get_logger().info(f"configure is success (with '{obj}')") - return TransitionCallbackReturn.SUCCESS - - def on_activate(self, state: State) -> TransitionCallbackReturn: - self.get_logger().info("on_activate is called") - # Create the Image subscriber. - self._sub = self.create_subscription(Image, self.topicImage, self.image_callback, 3) - - # # !!! test - # self._timer = self.create_timer(5, self.test_im_proc) - - return super().on_activate(state) - - def on_deactivate(self, state: State) -> TransitionCallbackReturn: - self.get_logger().info("on_deactivate is called") - - # # !!! test - # self.destroy_timer(self._timer) - - # Destroy the Image subscriber. - self.destroy_subscription(self._sub) - return super().on_deactivate(state) - - def on_cleanup(self, state: State) -> TransitionCallbackReturn: - # очистим параметры - # node_param = rclpy.parameter.Parameter("mesh_path", rclpy.Parameter.Type.STRING, "") - # all_node_param = [node_param] - # self.set_parameters(all_node_param) - - self._is_camerainfo = False - - self.destroy_publisher(self._pub) - self.destroy_subscription(self._sub_info) - - self.get_logger().info("on_cleanup is called") - return TransitionCallbackReturn.SUCCESS - - def on_shutdown(self, state: State) -> TransitionCallbackReturn: - self.get_logger().info("on_shutdown is called") - return TransitionCallbackReturn.SUCCESS - - # def test_im_proc(self): - # im = "im_tst.jpg" - # if not os.path.isfile(im): - # print(f"File not found '{im}'") - # return - # frame = cv2.imread(im) - # frame = frame[..., ::-1].copy() - # self._K = [[585.756089952257, 0.0, 319.5], [0.0, 585.756089952257, 239.5], [0.0, 0.0, 1.0]] - # # call the inference node - # p = self.dope_node.image_processing(img=frame, camera_info=self._K) - # print(f"pose = {p}") - - def image_callback(self, data): - """ Image Callback function. """ - if not self._is_camerainfo: - self.get_logger().warning("No data from CameraInfo") - return - # # get camera pose - # camera_name = self.camera_link #self.topicImage.split('/')[1] - # try: - # t = self.tf_buffer.lookup_transform("world", camera_name, rclpy.time.Time()) - # except TransformException as ex: - # self.get_logger().info(f"Could not transform {camera_name} to world: {ex}") - # return - # self._cam_pose.position.x = t.transform.translation.x - # self._cam_pose.position.y = t.transform.translation.y - # self._cam_pose.position.z = t.transform.translation.z - # self._cam_pose.orientation.w = t.transform.rotation.w - # self._cam_pose.orientation.x = t.transform.rotation.x - # self._cam_pose.orientation.y = t.transform.rotation.y - # self._cam_pose.orientation.z = t.transform.rotation.z - - # Convert ROS Image message to OpenCV image - current_frame = self.br.imgmsg_to_cv2(data) - - # # Save image - # frame_im = FILE_TEMP_IMAGE # str(self.objPath / "image_rgb.png") - # cv2.imwrite(frame_im, current_frame) - self._image_cnt += 1 - - self.get_logger().info(f"dope: begin {self._image_cnt}") - current_frame = current_frame[..., ::-1].copy() - pose = self.dope_node.image_processing(img=current_frame, camera_info=self._K) - self.get_logger().info(f"dope: end {self._image_cnt}") - if self._pub is not None and self._pub.is_activated: - # publish pose estimation result - self._pub.publish(pose) - # if self.tf2_send_pose: - # self.tf_obj_pose(t,q) #(self._pose) - -from detector import ModelData, ObjectDetector -from cuboid_pnp_solver import CuboidPNPSolver -from cuboid import Cuboid3d -import numpy as np -# robossembler_ws/src/robossembler-ros2/rbs_perception/scripts/utils.py -# from utils import Draw - -class Dope(object): - """ROS node that listens to image topic, runs DOPE, and publishes DOPE results""" - - def __init__( - self, - config, # config yaml loaded eg dict - weight, # path to weight - class_name, - dim: list # dimensions of model 'class_name' - ): - self.input_is_rectified = config["input_is_rectified"] - self.downscale_height = config["downscale_height"] - - self.config_detect = lambda: None - self.config_detect.mask_edges = 1 - self.config_detect.mask_faces = 1 - self.config_detect.vertex = 1 - self.config_detect.threshold = 0.5 - self.config_detect.softmax = 1000 - self.config_detect.thresh_angle = config["thresh_angle"] - self.config_detect.thresh_map = config["thresh_map"] - self.config_detect.sigma = config["sigma"] - self.config_detect.thresh_points = config["thresh_points"] - - # load network model, create PNP solver - self.model = ModelData(name=class_name, net_path=weight) - - # TODO warn on load_net_model(): - # Loading DOPE model '/home/shalenikol/robossembler_ws/fork_e47.pth'... - # /home/shalenikol/.local/lib/python3.10/site-packages/torchvision/models/_utils.py:208: - # UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead. - - # warnings.warn( - # /home/shalenikol/.local/lib/python3.10/site-packages/torchvision/models/_utils.py:223: - # UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. - # The current behavior is equivalent to passing `weights=None`. - # warnings.warn(msg) - self.model.load_net_model() - # print("Model Loaded") - - # try: - # self.draw_color = tuple(config["draw_colors"][class_name]) - # except: - self.draw_color = (0, 255, 0) - - # TODO load dim from config - # dim = [13.7, 16.5, 20.2] # config["dimensions"][class_name] - self.dimension = tuple(dim) - self.class_id = 1 #config["class_ids"][class_name] - - self.pnp_solver = CuboidPNPSolver(class_name, cuboid3d=Cuboid3d(dim)) - self.class_name = class_name - # print("Ctrl-C to stop") - - def image_processing( - self, - img, - camera_info - # img_name, # this is the name of the img file to save, it needs the .png at the end - # output_folder, # folder where to put the output - # weight - ) -> Pose: - # !!! Allways self.input_is_rectified = True - camera_matrix = np.matrix(camera_info, dtype="float64").copy() - dist_coeffs = np.zeros((4, 1)) - # Update camera matrix and distortion coefficients - # if self.input_is_rectified: - # P = np.matrix( - # camera_info["projection_matrix"]["data"], dtype="float64" - # ).copy() - # P.resize((3, 4)) - # camera_matrix = P[:, :3] - # dist_coeffs = np.zeros((4, 1)) - # else: - # # TODO ??? - # camera_matrix = np.matrix(camera_info.K, dtype="float64") - # camera_matrix.resize((3, 3)) - # dist_coeffs = np.matrix(camera_info.D, dtype="float64") - # dist_coeffs.resize((len(camera_info.D), 1)) - - # Downscale image if necessary - height, width, _ = img.shape - scaling_factor = float(self.downscale_height) / height - if scaling_factor < 1.0: - camera_matrix[:2] *= scaling_factor - img = cv2.resize(img, (int(scaling_factor * width), int(scaling_factor * height))) - - self.pnp_solver.set_camera_intrinsic_matrix(camera_matrix) - self.pnp_solver.set_dist_coeffs(dist_coeffs) - - # # Copy and draw image - # img_copy = img.copy() - # im = Image.fromarray(img_copy) - # draw = Draw(im) - - # # dictionary for the final output - # dict_out = {"camera_data": {}, "objects": []} - - # Detect object - results, _ = ObjectDetector.detect_object_in_image( - self.model.net, self.pnp_solver, img, self.config_detect - ) - - # Publish pose #and overlay cube on image - p = Pose() - for _, result in enumerate(results): - if result["location"] is None: - continue - - l = result["location"] - q = result["quaternion"] - p.position.x = l[0] - p.position.y = l[1] - p.position.z = l[2] - p.orientation.x = q[0] - p.orientation.y = q[1] - p.orientation.z = q[2] - p.orientation.w = q[3] - break # !!! only considering the first option for now - return p - # # save the json files - # with open(f"tmp_result{i}.json", "w") as fp: - # json.dump(result, fp, indent=2) - - # dict_out["objects"].append( - # { - # "class": self.class_name, - # "location": np.array(loc).tolist(), - # "quaternion_xyzw": np.array(ori).tolist(), - # "projected_cuboid": np.array(result["projected_points"]).tolist(), - # } - # ) - # # Draw the cube - # if None not in result["projected_points"]: - # points2d = [] - # for pair in result["projected_points"]: - # points2d.append(tuple(pair)) - # draw.draw_cube(points2d, self.draw_color) - - # # create directory to save image if it does not exist - # img_name_base = img_name.split("/")[-1] - # output_path = os.path.join( - # output_folder, - # weight.split("/")[-1].replace(".pth", ""), - # *img_name.split("/")[:-1], - # ) - # if not os.path.isdir(output_path): - # os.makedirs(output_path, exist_ok=True) - - # im.save(os.path.join(output_path, img_name_base)) - - # json_path = os.path.join( - # output_path, ".".join(img_name_base.split(".")[:-1]) + ".json" - # ) - # # save the json files - # with open(json_path, "w") as fp: - # json.dump(dict_out, fp, indent=2) - -def main(): - rclpy.init() - - executor = rclpy.executors.SingleThreadedExecutor() - # executor = rclpy.executors.MultiThreadedExecutor() - lc_node = PE_DOPE() - executor.add_node(lc_node) - try: - executor.spin() - except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): - lc_node.destroy_node() - -if __name__ == '__main__': - main() \ No newline at end of file