add gripper handle

This commit is contained in:
Ilya Uraev 2025-05-23 20:19:36 +03:00
parent d506f35cf4
commit 77f4e1a831

76
node.py
View file

@ -24,11 +24,13 @@ class RobotControllerNode(Node):
self.robot_client = RobotClient(HOST, PORT)
self.get_logger().info("Robot client connected")
self.loop = asyncio.get_event_loop()
self.waiting_for_gripper = False
# Настройка скорости и ускорения
self.robot_client.set_end_max_line_velc(0.6)
self.robot_client.set_end_max_line_acc(0.8)
self.gripper_joint_name = "left_outer_knuckle_joint" # Укажите правильное имя joint'а
self.gripper_joint_name = "left_outer_knuckle_joint"
self.sequence_step = 0
# Запуск основного сценария через таймер
@ -39,44 +41,67 @@ class RobotControllerNode(Node):
def js_callback(self, msg: JointState):
self.last_js = msg
def send_gripper_command(self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0):
# Создаем и запускаем асинхронную задачу
return asyncio.get_event_loop().run_until_complete(
self.send_gripper_command_async(target_pose, tolerance, timeout)
)
async def send_gripper_command_async(self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0):
async def send_gripper_command_async(
self, target_pose: float, tolerance: float = 0.01, timeout: float = 3.0
):
msg = Float64MultiArray()
msg.data = [target_pose]
self.pub.publish(msg)
self.get_logger().info(f"Sent gripper pose: {target_pose}")
# Ждём данных от joint_states
start_time = time.time()
while rclpy.ok():
rclpy.spin_once(self, timeout_sec=0.1)
joint_name_list = list(self.last_js.name) if self.last_js.name else []
joint_position_list = list(self.last_js.position) if self.last_js.position else []
joint_position_list = (
list(self.last_js.position) if self.last_js.position else []
)
if self.gripper_joint_name in joint_name_list:
index = joint_name_list.index(self.gripper_joint_name)
if index < len(joint_position_list):
current_pose = joint_position_list[index]
if abs(current_pose - target_pose) < tolerance:
self.get_logger().info(f"Gripper reached pose: {current_pose:.3f}")
self.get_logger().info(
f"Gripper reached pose: {current_pose:.3f}"
)
return True
else:
self.get_logger().warn("Gripper joint index out of bounds in position list")
self.get_logger().warn(
"Gripper joint index out of bounds in position list"
)
else:
self.get_logger().warn(f"Gripper joint '{self.gripper_joint_name}' not found in joint_states")
self.get_logger().warn(
f"Gripper joint '{self.gripper_joint_name}' not found in joint_states"
)
return False
if time.time() - start_time > timeout:
self.get_logger().warn("Timeout waiting for gripper to reach position")
return False
async def handle_gripper_step(
self, target_pose: float, timeout: float = 3.0, tolerance: float = 0.01
):
try:
success = await self.send_gripper_command_async(
target_pose, tolerance, timeout
)
if success:
self.sequence_step += 1
self.get_logger().info(
f"Gripper action succeeded, moving to step {self.sequence_step}"
)
else:
self.get_logger().warn(
f"Gripper action failed or timed out at pose {target_pose}"
)
except Exception as e:
self.get_logger().error(f"Exception in handle_gripper_step: {str(e)}")
finally:
self.waiting_for_gripper = False
def run_sequence(self):
print(f"STEP #{self.sequence_step}")
if self.sequence_step == 0:
@ -90,9 +115,13 @@ class RobotControllerNode(Node):
self.sequence_step += 1
elif self.sequence_step == 2:
if self.send_gripper_command(target_pose=0.582, timeout=10.0, tolerance=0.1):
self.get_logger().info("Gripper closed")
self.sequence_step += 1
if not self.waiting_for_gripper:
self.waiting_for_gripper = True
self.loop.create_task(
self.handle_gripper_step(
target_pose=0.582, timeout=10.0, tolerance=0.1
)
)
elif self.sequence_step == 3:
self.get_logger().info("Moving to post-grasp")
@ -115,14 +144,13 @@ class RobotControllerNode(Node):
self.sequence_step += 1
elif self.sequence_step == 7:
if self.send_gripper_command(target_pose=0.54, timeout=10.0):
self.get_logger().info("Gripper closed")
self.sequence_step += 1
if not self.waiting_for_gripper:
self.waiting_for_gripper = True
self.loop.create_task(
self.handle_gripper_step(target_pose=0.54, timeout=10.0)
)
#
# ... и так далее для остальных шагов
elif self.sequence_step >= 8: # Измените на последний шаг + 1
elif self.sequence_step >= 8:
self.get_logger().info("Sequence complete")
self.robot_client.close()
self.destroy_timer(self.timer)