148 lines
4.9 KiB
Python
148 lines
4.9 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import rclpy
|
|
from rclpy.node import Node
|
|
from std_msgs.msg import Float64MultiArray
|
|
from sensor_msgs.msg import JointState
|
|
from utils import RobotClient
|
|
import time
|
|
from main import config, HOST, PORT
|
|
import threading
|
|
|
|
joystick_max = -0.9583490120438358
|
|
joystick_min = -1.8495918498053934
|
|
gripper_max = 0.93
|
|
gripper_min = 0.0
|
|
|
|
|
|
class RobotControllerNode(Node):
|
|
def __init__(self):
|
|
super().__init__("robot_controller_node")
|
|
|
|
self.pub = self.create_publisher(
|
|
Float64MultiArray, "/forward_position_controller/commands", 10
|
|
)
|
|
self.js_sub = self.create_subscription(
|
|
JointState, "/joint_states", self.js_callback, 10
|
|
)
|
|
self.gello_sub = self.create_subscription(
|
|
JointState, "/gello/joint_states", self.gello_callback, 10
|
|
)
|
|
self.last_js = JointState()
|
|
self.last_gello = JointState()
|
|
|
|
self.robot_client = RobotClient(HOST, PORT)
|
|
self.get_logger().info("Robot client connected")
|
|
self.waiting_for_gripper = False
|
|
|
|
self.robot_client.set_end_max_line_velc(0.6)
|
|
self.robot_client.set_end_max_line_acc(0.8)
|
|
self.gripper_joint_name = "left_outer_knuckle_joint"
|
|
self.sequence_step = 0
|
|
|
|
self.timer = self.create_timer(1.0, self.run_sequence)
|
|
|
|
def js_callback(self, msg: JointState):
|
|
self.last_js = msg
|
|
|
|
def gello_callback(self, msg: JointState):
|
|
self.last_gello = msg
|
|
|
|
def map_range(
|
|
self,
|
|
value,
|
|
in_min=joystick_min,
|
|
in_max=joystick_max,
|
|
out_min=gripper_min,
|
|
out_max=gripper_max,
|
|
):
|
|
"""Преобразует значение из одного диапазона в другой."""
|
|
# Защита от деления на ноль
|
|
if in_max == in_min:
|
|
raise ValueError("Невалидный входной диапазон: in_min и in_max равны")
|
|
# Линейное преобразование
|
|
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
|
|
|
|
def send_gripper_command(
|
|
self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0
|
|
) -> bool:
|
|
msg = Float64MultiArray()
|
|
msg.data = [target_pose]
|
|
self.pub.publish(msg)
|
|
self.get_logger().info(f"Sent gripper pose: {target_pose}")
|
|
|
|
start_time = time.time()
|
|
while rclpy.ok():
|
|
# rclpy.spin_once(self, timeout_sec=0.1)
|
|
|
|
joint_name_list = list(self.last_js.name) if self.last_js.name else []
|
|
joint_position_list = (
|
|
list(self.last_js.position) if self.last_js.position else []
|
|
)
|
|
|
|
if self.gripper_joint_name in joint_name_list:
|
|
index = joint_name_list.index(self.gripper_joint_name)
|
|
if index < len(joint_position_list):
|
|
current_pose = joint_position_list[index]
|
|
if abs(current_pose - target_pose) < tolerance:
|
|
self.get_logger().info(
|
|
f"Gripper reached pose: {current_pose:.3f}"
|
|
)
|
|
return True
|
|
else:
|
|
self.get_logger().warn("Gripper joint index out of bounds")
|
|
else:
|
|
self.get_logger().warn(f"Joint '{self.gripper_joint_name}' not found")
|
|
return False
|
|
|
|
if time.time() - start_time > timeout:
|
|
self.get_logger().warn("Timeout waiting for gripper")
|
|
return False
|
|
|
|
def handle_gripper_step(
|
|
self, target_pose: float, timeout: float = 3.0, tolerance: float = 0.01
|
|
):
|
|
try:
|
|
success = self.send_gripper_command(target_pose, tolerance, timeout)
|
|
if success:
|
|
self.get_logger().info("Gripper moved successfully")
|
|
self.waiting_for_gripper = False
|
|
else:
|
|
self.get_logger().warn("Gripper movement failed or timed out.")
|
|
except Exception as e:
|
|
self.get_logger().error(f"Exception in gripper thread: {str(e)}")
|
|
|
|
def run_sequence(self):
|
|
print(f"STEP #{self.sequence_step}")
|
|
if not self.waiting_for_gripper:
|
|
self.waiting_for_gripper = True
|
|
threading.Thread(
|
|
target=self.handle_gripper_step,
|
|
args=(self.map_range(self.last_gello.position[-1]), 10.0, 0.1),
|
|
daemon=True,
|
|
).start()
|
|
self.sequence_step += 1
|
|
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
|
|
try:
|
|
node = RobotControllerNode()
|
|
executor = rclpy.executors.MultiThreadedExecutor()
|
|
executor.add_node(node)
|
|
|
|
try:
|
|
executor.spin()
|
|
finally:
|
|
node.robot_client.close()
|
|
node.destroy_node()
|
|
|
|
except Exception as e:
|
|
print(f"Fatal error: {str(e)}")
|
|
finally:
|
|
rclpy.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|