aubo-old-sdk-test/node.py
2025-05-23 18:45:53 +03:00

117 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32MultiArray
from sensor_msgs.msg import JointState
from utils import RobotClient
import time
from main import config, HOST, PORT
class RobotControllerNode(Node):
def __init__(self):
super().__init__("robot_controller_node")
self.pub = self.create_publisher(
Float32MultiArray, "/forward_position_controller/commands", 10
)
self.js_sub = self.create_subscription(
JointState, "/joint_states", self.js_callback, 10
)
self.last_js = JointState()
self.robot_client = RobotClient(HOST, PORT)
self.get_logger().info("Robot client connected")
# Настройка скорости и ускорения
self.robot_client.set_end_max_line_velc(0.6)
self.robot_client.set_end_max_line_acc(0.8)
self.gripper_joint_name = "gripper_joint" # Укажите правильное имя joint'а
self.sequence_step = 0
# Запуск основного сценария через таймер
self.timer = self.create_timer(
1.0, self.run_sequence
) # Задержка 1 сек для инициализации
def js_callback(self, msg: JointState):
self.last_js = msg
def send_gripper_pose(
self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0
):
msg = Float32MultiArray()
msg.data = [target_pose]
self.pub.publish(msg)
self.get_logger().info(f"Sent gripper pose: {target_pose}")
start_time = time.time()
while rclpy.ok():
if not self.last_js.name:
rclpy.spin_once(self, timeout_sec=0.1)
continue
try:
index = self.last_js.name.index(self.gripper_joint_name)
current_pose = self.last_js.position[index]
if abs(current_pose - target_pose) < tolerance:
self.get_logger().info(f"Gripper reached pose: {current_pose:.3f}")
return True
except ValueError:
self.get_logger().warn(
"Gripper joint name not found in joint_states", once=True
)
if time.time() - start_time > timeout:
self.get_logger().warn("Timeout waiting for gripper to reach position")
return False
rclpy.spin_once(self, timeout_sec=0.1)
return False
def run_sequence(self):
if self.sequence_step == 0:
self.get_logger().info("Moving to pre-grasp")
self.robot_client.move_to_pose_joint(config["pre_grasp_position"])
self.sequence_step += 1
elif self.sequence_step == 1:
self.get_logger().info("Moving to grasp")
self.robot_client.move_to_pose_cart(config["grasp_position"])
self.sequence_step += 1
elif self.sequence_step == 2:
self.get_logger().info("Closing gripper")
if self.send_gripper_pose(1.0):
self.sequence_step += 1
time.sleep(1.0)
# elif self.sequence_step == 3:
# self.get_logger().info("Moving to post-grasp")
# self.robot_client.move_to_pose_cart(config["post_grasp_position"])
# self.sequence_step += 1
#
# ... и так далее для остальных шагов
elif self.sequence_step >= 3: # Измените на последний шаг + 1
self.get_logger().info("Sequence complete")
self.robot_client.close()
self.destroy_timer(self.timer)
def main(args=None):
rclpy.init(args=args)
node = RobotControllerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.robot_client.close()
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()