#!/usr/bin/env python3 import time import gym_gz_models import gym_gz from scenario import gazebo as scenario_gazebo from scenario import core as scenario_core import rclpy from rclpy.node import Node from scipy.spatial.transform import Rotation as R import numpy as np from geometry_msgs.msg import PoseStamped from rclpy.executors import MultiThreadedExecutor from rbs_skill_servers import CartesianControllerPublisher, TakePose from rclpy.action import ActionClient from control_msgs.action import GripperCommand class Spawner(Node): def __init__(self): super().__init__("spawner") self.gazebo = scenario_gazebo.GazeboSimulator(step_size=0.001, rtf=1.0, steps_per_run=1) self.cartesian_pose = self.create_publisher( PoseStamped, "/" + "arm0" + "/cartesian_motion_controller/target_frame", 10) self.current_pose_sub = self.create_subscription(PoseStamped, "/arm0/cartesian_motion_controller/current_pose", self.callback, 10) self._action_client = ActionClient(self, GripperCommand, "/" + "arm0" + "/gripper_controller/gripper_cmd") timer_period = 0.001 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.ano_timer = self.create_timer(timer_period, self.another_timer) scenario_gazebo.set_verbosity(scenario_gazebo.Verbosity_info) self.gazebo.insert_world_from_sdf( "/home/bill-finger/rbs_ws/install/rbs_simulation/share/rbs_simulation/worlds/asm2.sdf") self.gazebo.initialize() self.world = self.gazebo.get_world() self.current_pose: PoseStamped = PoseStamped() self.init_sim() self.cube = self.world.get_model("cube") self.stage = 0 self.gripper_open = False def callback(self, msg: PoseStamped): self.current_pose = msg def timer_callback(self): self.gazebo.run() def send_goal(self, goal: float): goal_msg = GripperCommand.Goal() goal_msg._command.position = goal goal_msg._command.max_effort = 1.0 self._action_client.wait_for_server() self.gripper_open = not self.gripper_open self._send_goal_future = self._action_client.send_goal_async(goal_msg) self._send_goal_future.add_done_callback(self.goal_response_callback) def goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: self.get_logger().info('Goal rejected :(') return self.get_logger().info('Goal accepted :)') self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def get_result_callback(self, future): result = future.result().result self.get_logger().info('Result: {0}'.format(result.position)) def another_timer(self): position_over_cube = np.array(self.cube.base_position()) + np.array([0, 0, 0.2]) position_cube = np.array(self.cube.base_position()) + np.array([0, 0, 0.03]) quat_xyzw = R.from_euler(seq="y", angles=180, degrees=True).as_quat() if self.stage == 0: if self.distance_to_target(position_over_cube, quat_xyzw) > 0.01: self.cartesian_pose.publish(self.get_pose(position_over_cube, quat_xyzw)) if self.distance_to_target(position_over_cube, quat_xyzw) < 0.01: self.stage += 1 if self.stage == 1: if self.distance_to_target(position_cube, quat_xyzw) > 0.01: if not self.gripper_open: self.send_goal(0.064) # rclpy.spin_until_future_complete(self, future) self.cartesian_pose.publish(self.get_pose(position_cube, quat_xyzw)) if self.distance_to_target(position_cube, quat_xyzw) < 0.01: self.stage += 1 def distance_to_target(self, position, orientation): target_pose = self.get_pose(position, orientation) current_position = np.array([ self.current_pose.pose.position.x, self.current_pose.pose.position.y, self.current_pose.pose.position.z ]) target_position = np.array([ target_pose.pose.position.x, target_pose.pose.position.y, target_pose.pose.position.z ]) distance = np.linalg.norm(current_position - target_position) return distance def init_sim(self): # Create the simulator self.gazebo.gui() self.gazebo.run(paused=True) self.world.to_gazebo().set_gravity((0, 0, -9.8)) self.world.insert_model("/home/bill-finger/rbs_ws/current.urdf") self.gazebo.run(paused=True) for model_name in self.world.model_names(): model = self.world.get_model(model_name) print(f"Model: {model_name}") print(f" Base link: {model.base_frame()}") print("LINKS") for name in model.link_names(): position = model.get_link(name).position() orientation_wxyz = np.asarray(model.get_link(name).orientation()) orientation = R.from_quat(orientation_wxyz[[1, 2, 3, 0]]).as_euler("xyz") print(f" {name}:", (*position, *tuple(orientation))) print("JOINTS") for name in model.joint_names(): print(f"{name}") uri = lambda org, name: f"https://fuel.gazebosim.org/{org}/models/{name}" # Download the cube SDF file cube_sdf = scenario_gazebo.get_model_file_from_fuel( uri=uri(org="openrobotics", name="wood cube 5cm"), use_cache=False ) # Sample a random position random_position = np.random.uniform(low=[-0.2, -0.2, 0.0], high=[-0.3, 0.2, 0.0]) # Get a unique name model_name = gym_gz.utils.scenario.get_unique_model_name( world=self.world, model_name="cube" ) # Insert the model assert self.world.insert_model( cube_sdf, scenario_core.Pose(random_position, [1.0, 0, 0, 0]), model_name ) model = self.world.get_model("rbs_arm") self.cube = self.world.get_model("cube") ok_reset_pos = model.to_gazebo().reset_joint_positions( [0.0, -0.240, -3.142, 1.090, 0, 1.617, 0.0, 0.0, 0.0], [name for name in model.joint_names() if "_joint" in name] ) if not ok_reset_pos: raise RuntimeError("Failed to reset the robot state") def get_pose(self, position, orientation) -> PoseStamped: msg = PoseStamped() msg.header.stamp = self.get_clock().now().to_msg() msg.header.frame_id = "base_link" msg.pose.position.x = position[0] msg.pose.position.y = position[1] msg.pose.position.z = position[2] msg.pose.orientation.x = orientation[0] msg.pose.orientation.y = orientation[1] msg.pose.orientation.z = orientation[2] msg.pose.orientation.w = orientation[3] return msg def main(args=None): rclpy.init(args=args) executor = MultiThreadedExecutor() my_node = Spawner() executor.add_node(my_node) executor.spin() my_node.gazebo.close() my_node.destroy_node() rclpy.shutdown() if __name__ == "__main__": main()