#!/usr/bin/env python3 import rclpy from rclpy.node import Node from std_msgs.msg import Float64MultiArray from sensor_msgs.msg import JointState from utils import RobotClient import time from main import config, HOST, PORT import threading class RobotControllerNode(Node): def __init__(self): super().__init__("robot_controller_node") self.pub = self.create_publisher( Float64MultiArray, "/forward_position_controller/commands", 10 ) self.js_sub = self.create_subscription( JointState, "/joint_states", self.js_callback, 10 ) self.last_js = JointState() self.robot_client = RobotClient(HOST, PORT) self.get_logger().info("Robot client connected") self.waiting_for_gripper = False self.robot_client.set_end_max_line_velc(0.6) self.robot_client.set_end_max_line_acc(0.8) self.gripper_joint_name = "left_outer_knuckle_joint" self.sequence_step = 0 self.timer = self.create_timer(1.0, self.run_sequence) def js_callback(self, msg: JointState): self.last_js = msg def send_gripper_command( self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0 ) -> bool: msg = Float64MultiArray() msg.data = [target_pose] self.pub.publish(msg) self.get_logger().info(f"Sent gripper pose: {target_pose}") start_time = time.time() while rclpy.ok(): # rclpy.spin_once(self, timeout_sec=0.1) joint_name_list = list(self.last_js.name) if self.last_js.name else [] joint_position_list = ( list(self.last_js.position) if self.last_js.position else [] ) if self.gripper_joint_name in joint_name_list: index = joint_name_list.index(self.gripper_joint_name) if index < len(joint_position_list): current_pose = joint_position_list[index] if abs(current_pose - target_pose) < tolerance: self.get_logger().info( f"Gripper reached pose: {current_pose:.3f}" ) return True else: self.get_logger().warn("Gripper joint index out of bounds") else: self.get_logger().warn(f"Joint '{self.gripper_joint_name}' not found") return False if time.time() - start_time > timeout: self.get_logger().warn("Timeout waiting for gripper") return False def handle_gripper_step( self, target_pose: float, timeout: float = 3.0, tolerance: float = 0.01 ): try: success = self.send_gripper_command(target_pose, tolerance, timeout) if success: self.get_logger().info("Gripper moved successfully") self.waiting_for_gripper = False else: self.get_logger().warn("Gripper movement failed or timed out.") except Exception as e: self.get_logger().error(f"Exception in gripper thread: {str(e)}") def run_sequence(self): print(f"STEP #{self.sequence_step}") if self.sequence_step == 0: self.robot_client.set_joint_maxacc(1) self.robot_client.set_joint_maxvelc(1) self.robot_client.set_end_max_line_velc(1) self.robot_client.set_end_max_line_acc(1) self.get_logger().info("Moving to pre-grasp") self.robot_client.move_to_pose_joint(config["pre_grasp_position"]) self.sequence_step += 1 elif self.sequence_step == 1: if not self.waiting_for_gripper: self.waiting_for_gripper = True threading.Thread( target=self.handle_gripper_step, args=(0.45, 10.0, 0.1), daemon=True, ).start() self.sequence_step += 1 elif self.sequence_step == 2: self.get_logger().info("Moving to grasp") self.robot_client.move_to_pose_cart(config["grasp_position"]) self.sequence_step += 1 elif self.sequence_step == 3: if not self.waiting_for_gripper: self.waiting_for_gripper = True threading.Thread( target=self.handle_gripper_step, args=(0.6, 10.0, 0.1), daemon=True, ).start() self.sequence_step += 1 elif self.sequence_step == 4: self.get_logger().info("Moving to post-grasp") self.robot_client.move_to_pose_cart(config["post_grasp_position"]) self.sequence_step += 1 elif self.sequence_step == 5: self.get_logger().info("Moving to pre_insert_position") self.robot_client.move_to_pose_cart(config["pre_insert_position"]) self.sequence_step += 1 elif self.sequence_step == 6: self.get_logger().info("Moving to insert_position") self.robot_client.move_to_pose_cart(config["insert_position"]) self.sequence_step += 1 elif self.sequence_step == 7: self.get_logger().info("Moving to post_insert_position") self.robot_client.move_to_pose_cart(config["post_insert_position"]) self.sequence_step += 1 elif self.sequence_step == 8: # if not self.waiting_for_gripper: # self.waiting_for_gripper = True # threading.Thread( # target=self.handle_gripper_step, args=(0.56, 10.0), daemon=True # ).start() self.sequence_step += 1 elif self.sequence_step == 9: self.get_logger().info("Moving away") if not self.waiting_for_gripper: self.waiting_for_gripper = True threading.Thread( target=self.handle_gripper_step, args=(0.56, 10.0), daemon=True ).start() self.robot_client.move_to_pose_cart(config["final_moveaway"]) self.sequence_step += 1 elif self.sequence_step == 10: self.get_logger().info("Moving to init position") self.robot_client.move_to_pose_joint(config["init"]) self.sequence_step += 1 elif self.sequence_step >= 11: self.get_logger().info("Sequence complete") self.robot_client.close() self.destroy_timer(self.timer) def main(args=None): rclpy.init(args=args) try: node = RobotControllerNode() executor = rclpy.executors.MultiThreadedExecutor() executor.add_node(node) try: executor.spin() finally: node.destroy_node() except Exception as e: print(f"Fatal error: {str(e)}") finally: rclpy.shutdown() if __name__ == "__main__": main()