aubo-old-sdk-test/utils.py
2025-05-22 14:44:04 +03:00

148 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#! /usr/bin/env python
import socket
import json
DEBUG: bool = True
RESET = "\033[0m"
CYAN = "\033[36m"
YELLOW = "\033[33m"
class RobotClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
def _send(self, cmd) -> dict:
if DEBUG:
print(f"{CYAN}DEBUG: sended:{RESET} {cmd}")
self.sock.sendall(json.dumps(cmd).encode("utf-8"))
data = self.sock.recv(1024).decode("utf-8")
ret = json.loads(data)
if DEBUG:
print(f"{YELLOW}DEBUG: received:{RESET} {ret}")
return ret
def rpy_to_quat(self, arr):
cmd = {"command": "rpy_to_quaternion", "rpy": arr}
ret = self._send(cmd)
return ret["ret"]
def get_current_waypoint(self):
cmd = {"command": "get_current_waypoint"}
ret = self._send(cmd)
return ret["ret"]["joint"], ret["ret"]["pos"], ret["ret"]["ori"]
def inverse_kin(self, joint_radian: list, pos: list, ori: list) -> list:
"""
Выполняет обратную кинематику, возвращает углы суставов (в радианах).
:param joint_radian: текущая поза (6 углов в рад)
:param pos: целевая позиция [x, y, z] в метрах
:param ori: ориентация в виде кватерниона [w, x, y, z]
:return: список из 6 углов суставов (в радианах)
"""
cmd = {
"command": "inverse_kin",
"joint_radian": joint_radian,
"pos": pos,
"ori": ori,
}
ret = self._send(cmd)
if ret["msg"] != "succ":
raise RuntimeError(f"Inverse kinematics failed: {ret}. \n Was sended {cmd}")
return ret["ret"][
"joint"
] # возвращается как {"joint": [...], "pos": [...], "ori": [...]}
def move_to_pose(self, pose: dict):
"""
Перемещает робота в указанную позицию и ориентацию.
:param command: словарь с ключами "pos" и "ori" (ори — в радианах, RPY)
"""
# quat = self.rpy_to_quat(command["ori"])
# 2. Получаем текущее состояние робота
joint_start, _, _ = self.get_current_waypoint()
# 3. Вычисляем инверсную кинематику
joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"])
# 4. Выполняем линейное перемещение
self.move_line(joint_target)
def add_waypoint(self, joint_radian: list):
cmd = {"command": "add_waypoint", "joint_radian": joint_radian}
ret = self._send(cmd)
if (ret.get("msg") != "succ"):
raise RuntimeError(f"add_waypoint failed: {ret}")
def move_track(self, track = "CARTESIAN_MOVEP"):
cmd = {"command": "move_track", "track": track}
ret = self._send(cmd)
if (ret.get("msg") != "succ"):
raise RuntimeError(f"move_track failed: {ret}")
def move_linear_cartesian(self, pose: dict):
"""
Выполняет линейное движение в декартовом пространстве между текущей позицией и заданной позой.
:param pose: словарь с ключами "pos" (XYZ в метрах) и "ori" (кватернион [w, x, y, z])
"""
# 1. Получаем текущее состояние
joint_start, _, _ = self.get_current_waypoint()
# 2. Добавляем текущую позицию как waypoint
self.add_waypoint(joint_start)
# 3. Вычисляем IK
joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"])
# 4. Добавляем целевую позицию как waypoint
self.add_waypoint(joint_target)
# 5. Выполняем траекторию
self.move_track()
def move_line(self, joint_radian: list) -> None:
"""
Выполняет линейное перемещение в заданную позу, сохраняя текущую ориентацию.
:param joint_radian: целевая поза (список из 6 значений в радианах)
:raises RuntimeError: если команда не была выполнена успешно
"""
cmd = {"command": "move_line", "joint_radian": joint_radian}
ret = self._send(cmd)
if ret.get("ret") != 0:
raise RuntimeError(f"move_line failed: {ret}")
def set_joint_maxacc(self, acc: float):
"""
Устанавливает одинаковое максимальное ускорение для всех суставов (в rad/s²).
:param acc: ускорение для всех 6 суставов
:raises RuntimeError: если команда завершилась с ошибкой
"""
cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6}
ret = self._send(cmd)
if ret.get("msg") != "succ":
raise RuntimeError(f"set_joint_maxacc failed: {ret}")
def set_joint_maxvelc(self, vel: float):
"""
Устанавливает одинаковую максимальную скорость для всех суставов (в rad/s).
:param vel: скорость для всех 6 суставов
:raises RuntimeError: если команда завершилась с ошибкой
"""
cmd = {"command": "set_joint_maxvelc", "joint_maxvelc": [vel] * 6}
ret = self._send(cmd)
if ret.get("msg") != "succ":
raise RuntimeError(f"set_joint_maxvelc failed: {ret}")
def close(self):
self.sock.close()