#!/usr/bin/env python3 import rclpy import asyncio from rclpy.node import Node from std_msgs.msg import Float32MultiArray from sensor_msgs.msg import JointState from utils import RobotClient import time from main import config, HOST, PORT class RobotControllerNode(Node): def __init__(self): super().__init__("robot_controller_node") self.pub = self.create_publisher( Float32MultiArray, "/forward_position_controller/commands", 10 ) self.js_sub = self.create_subscription( JointState, "/joint_states", self.js_callback, 10 ) self.last_js = JointState() self.robot_client = RobotClient(HOST, PORT) self.get_logger().info("Robot client connected") # Настройка скорости и ускорения self.robot_client.set_end_max_line_velc(0.6) self.robot_client.set_end_max_line_acc(0.8) self.gripper_joint_name = "gripper_joint" # Укажите правильное имя joint'а self.sequence_step = 0 # Запуск основного сценария через таймер self.timer = self.create_timer( 1.0, self.run_sequence ) # Задержка 1 сек для инициализации def js_callback(self, msg: JointState): self.last_js = msg async def send_gripper_command( self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0 ): if self.last_js is None or not self.last_js.name: self.get_logger().warn( "No joint states received yet", throttle_duration_sec=5 ) return False msg = Float32MultiArray() msg.data = [target_pose] self.pub.publish(msg) start_time = time.time() while (time.time() - start_time) < timeout and rclpy.ok(): try: idx = self.last_js.name.index(self.gripper_joint_name) current = self.last_js.position[idx] if abs(current - target_pose) < tolerance: return True except ValueError: self.get_logger().error(f"Joint {self.gripper_joint_name} not found!") return False await asyncio.sleep(0.1) # Неблокирующее ожидание return False def run_sequence(self): if self.sequence_step == 0: self.get_logger().info("Moving to pre-grasp") self.robot_client.move_to_pose_joint(config["pre_grasp_position"]) self.sequence_step += 1 elif self.sequence_step == 1: self.get_logger().info("Moving to grasp") self.robot_client.move_to_pose_cart(config["grasp_position"]) self.sequence_step += 1 elif self.sequence_step == 3: if self.send_gripper_command(target_pose=1.0, timeout=10.0): self.get_logger().info("Gripper closed") self.sequence_step += 1 # elif self.sequence_step == 3: # self.get_logger().info("Moving to post-grasp") # self.robot_client.move_to_pose_cart(config["post_grasp_position"]) # self.sequence_step += 1 # # ... и так далее для остальных шагов elif self.sequence_step >= 3: # Измените на последний шаг + 1 self.get_logger().info("Sequence complete") self.robot_client.close() self.destroy_timer(self.timer) def main(args=None): rclpy.init(args=args) try: node = RobotControllerNode() executor = rclpy.executors.MultiThreadedExecutor() executor.add_node(node) try: executor.spin() finally: node.destroy_node() except Exception as e: print(f"Fatal error: {str(e)}") finally: rclpy.shutdown() if __name__ == "__main__": main()