aubo-old-sdk-test/node.py

151 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
from sensor_msgs.msg import JointState
from utils import RobotClient
import time
from main import config, HOST, PORT
import asyncio
class RobotControllerNode(Node):
def __init__(self):
super().__init__("robot_controller_node")
self.pub = self.create_publisher(
Float64MultiArray, "/forward_position_controller/commands", 10
)
self.js_sub = self.create_subscription(
JointState, "/joint_states", self.js_callback, 10
)
self.last_js = JointState()
self.robot_client = RobotClient(HOST, PORT)
self.get_logger().info("Robot client connected")
# Настройка скорости и ускорения
self.robot_client.set_end_max_line_velc(0.6)
self.robot_client.set_end_max_line_acc(0.8)
self.gripper_joint_name = "left_outer_knuckle_joint" # Укажите правильное имя joint'а
self.sequence_step = 0
# Запуск основного сценария через таймер
self.timer = self.create_timer(
1.0, self.run_sequence
) # Задержка 1 сек для инициализации
def js_callback(self, msg: JointState):
self.last_js = msg
def send_gripper_command(self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0):
# Создаем и запускаем асинхронную задачу
return asyncio.get_event_loop().run_until_complete(
self.send_gripper_command_async(target_pose, tolerance, timeout)
)
async def send_gripper_command_async(self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0):
msg = Float64MultiArray()
msg.data = [target_pose]
self.pub.publish(msg)
self.get_logger().info(f"Sent gripper pose: {target_pose}")
# Ждём данных от joint_states
start_time = time.time()
while rclpy.ok():
rclpy.spin_once(self, timeout_sec=0.1)
joint_name_list = list(self.last_js.name) if self.last_js.name else []
joint_position_list = list(self.last_js.position) if self.last_js.position else []
if self.gripper_joint_name in joint_name_list:
index = joint_name_list.index(self.gripper_joint_name)
if index < len(joint_position_list):
current_pose = joint_position_list[index]
if abs(current_pose - target_pose) < tolerance:
self.get_logger().info(f"Gripper reached pose: {current_pose:.3f}")
return True
else:
self.get_logger().warn("Gripper joint index out of bounds in position list")
else:
self.get_logger().warn(f"Gripper joint '{self.gripper_joint_name}' not found in joint_states")
return False
if time.time() - start_time > timeout:
self.get_logger().warn("Timeout waiting for gripper to reach position")
return False
def run_sequence(self):
print(f"STEP #{self.sequence_step}")
if self.sequence_step == 0:
self.get_logger().info("Moving to pre-grasp")
self.robot_client.move_to_pose_joint(config["pre_grasp_position"])
self.sequence_step += 1
elif self.sequence_step == 1:
self.get_logger().info("Moving to grasp")
self.robot_client.move_to_pose_cart(config["grasp_position"])
self.sequence_step += 1
elif self.sequence_step == 2:
if self.send_gripper_command(target_pose=0.582, timeout=10.0, tolerance=0.1):
self.get_logger().info("Gripper closed")
self.sequence_step += 1
elif self.sequence_step == 3:
self.get_logger().info("Moving to post-grasp")
self.robot_client.move_to_pose_cart(config["post_grasp_position"])
self.sequence_step += 1
elif self.sequence_step == 4:
self.get_logger().info("Moving to pre_insert_position")
self.robot_client.move_to_pose_cart(config["pre_insert_position"])
self.sequence_step += 1
elif self.sequence_step == 5:
self.get_logger().info("Moving to insert_position")
self.robot_client.move_to_pose_joint(config["insert_position"])
self.sequence_step += 1
elif self.sequence_step == 6:
self.get_logger().info("Moving to post_insert_position")
self.robot_client.move_to_pose_joint(config["post_insert_position"])
self.sequence_step += 1
elif self.sequence_step == 7:
if self.send_gripper_command(target_pose=0.54, timeout=10.0):
self.get_logger().info("Gripper closed")
self.sequence_step += 1
#
# ... и так далее для остальных шагов
elif self.sequence_step >= 8: # Измените на последний шаг + 1
self.get_logger().info("Sequence complete")
self.robot_client.close()
self.destroy_timer(self.timer)
def main(args=None):
rclpy.init(args=args)
try:
node = RobotControllerNode()
executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
finally:
node.destroy_node()
except Exception as e:
print(f"Fatal error: {str(e)}")
finally:
rclpy.shutdown()
if __name__ == "__main__":
main()