aubo-old-sdk-test/utils.py

228 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#! /usr/bin/env python
import socket
import json
import time
import threading
import queue
DEBUG: bool = True
RESET = "\033[0m"
CYAN = "\033[36m"
YELLOW = "\033[33m"
RED = "\033[31m"
class RobotStatus:
Stopped = 0
Running = 1
Paused = 2
Resumed = 3
class RobotJasonServerError:
RESUTL_SUCC = 0
RESUTL_BASE = 3000
RESUTL_ERROR_BAD_COMMAND_STRING = RESUTL_BASE + 1
RESUTL_ERROR_UNKNOWN_COMMAND = RESUTL_BASE + 2
RESUTL_ERROR_ARGS = RESUTL_BASE + 2
RESUTL_ERROR_MOVE = RESUTL_BASE + 3
RESUTL_CALL_BACK = RESUTL_BASE + 4
RESUTL_ERROR_CLIENTS_NUMBER = RESUTL_BASE + 5
RESUTL_ERROR_DISCONNECT_TO_MODBUS_SERVER = RESUTL_BASE + 6
RESUTL_ERROR_CONNECT_TO_MODBUS_SERVER = RESUTL_BASE + 7
class RobotClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self._response_queue = queue.Queue()
self._callback_queue = queue.Queue()
self._recv_thread.start()
def _recv_loop(self):
while True:
try:
data = self.sock.recv(4096).decode("utf-8")
if not data:
if DEBUG:
print("DEBUG: socket closed by server")
break
ret = json.loads(data)
if DEBUG:
print(f"{YELLOW}DEBUG: received:{RESET} {ret}")
if ret["ret"] == RobotJasonServerError.RESUTL_CALL_BACK:
self._callback_queue.put(ret)
if DEBUG:
print(f"{YELLOW}DEBUG: received callback event: {ret['msg']}")
else:
self._response_queue.put(ret)
except Exception as e:
if DEBUG:
print(f"{RED}DEBUG: Exception in recv loop: {e}{RESET}")
break
def _send(self, cmd) -> dict:
if DEBUG:
print(f"{CYAN}DEBUG: sended:{RESET} {cmd}")
self.sock.sendall(json.dumps(cmd).encode("utf-8"))
while True:
try:
ret = self._response_queue.get(timeout=5)
if ret:
return ret
except queue.Empty:
raise RuntimeError("Timeout waiting for response")
def get_callback_event(self, timeout=0.1):
try:
return self._callback_queue.get(timeout=timeout)
except queue.Empty:
return None
def rpy_to_quat(self, arr):
cmd = {"command": "rpy_to_quaternion", "rpy": arr}
ret = self._send(cmd)
return ret["ret"]
def get_current_waypoint(self):
cmd = {"command": "get_current_waypoint"}
ret = self._send(cmd)
return ret["ret"]["joint"], ret["ret"]["pos"], ret["ret"]["ori"]
def inverse_kin(self, joint_radian: list, pos: list, ori: list) -> list:
"""
Выполняет обратную кинематику, возвращает углы суставов (в радианах).
:param joint_radian: текущая поза (6 углов в рад)
:param pos: целевая позиция [x, y, z] в метрах
:param ori: ориентация в виде кватерниона [w, x, y, z]
:return: список из 6 углов суставов (в радианах)
"""
cmd = {
"command": "inverse_kin",
"joint_radian": joint_radian,
"pos": pos,
"ori": ori,
}
ret = self._send(cmd)
if ret["msg"] != "succ":
raise RuntimeError(f"Inverse kinematics failed: {ret}. \n Was sended {cmd}")
return ret["ret"][
"joint"
] # возвращается как {"joint": [...], "pos": [...], "ori": [...]}
def move_to_pose(self, pose: dict):
"""
Перемещает робота в указанную позицию и ориентацию.
:param command: словарь с ключами "pos" и "ori" (ори — в радианах, RPY)
"""
# quat = self.rpy_to_quat(command["ori"])
# 2. Получаем текущее состояние робота
joint_start, _, _ = self.get_current_waypoint()
# 3. Вычисляем инверсную кинематику
joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"])
# 4. Выполняем линейное перемещение
self.move_line(joint_target)
def add_waypoint(self, joint_radian: list):
cmd = {"command": "add_waypoint", "joint_radian": joint_radian}
ret = self._send(cmd)
if (ret.get("msg") != "succ"):
raise RuntimeError(f"add_waypoint failed: {ret}")
def move_track(self, track = "CARTESIAN_MOVEP"):
cmd = {"command": "move_track", "track": track}
ret = self._send(cmd)
if (ret.get("msg") != "succ"):
raise RuntimeError(f"move_track failed: {ret}")
def move_linear_cartesian(self, pose: dict):
"""
Выполняет линейное движение в декартовом пространстве между текущей позицией и заданной позой.
:param pose: словарь с ключами "pos" (XYZ в метрах) и "ori" (кватернион [w, x, y, z])
"""
# 1. Получаем текущее состояние
joint_start, _, _ = self.get_current_waypoint()
# 2. Добавляем текущую позицию как waypoint
self.add_waypoint(joint_start)
# 3. Вычисляем IK
joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"])
# 4. Добавляем целевую позицию как waypoint
self.add_waypoint(joint_target)
# 5. Выполняем траекторию
self.move_track()
def get_robot_state(self) -> int:
"""
Получает текущее состояние робота.
:return: Одно из значений:
0 — Stopped
1 — Running
2 — Paused
3 — Resumed
:raises RuntimeError: если не удалось получить состояние
"""
cmd = {"command": "get_robot_state"}
ret = self._send(cmd)
if ret.get("msg") != "succ":
raise RuntimeError(f"get_robot_state failed: {ret}")
return ret["ret"]
def move_line(self, joint_radian: list) -> None:
"""
Выполняет линейное перемещение в заданную позу, сохраняя текущую ориентацию.
:param joint_radian: целевая поза (список из 6 значений в радианах)
:raises RuntimeError: если команда не была выполнена успешно
"""
cmd = {"command": "move_line", "joint_radian": joint_radian}
ret = self._send(cmd)
if ret.get("ret") == 3004:
return
if ret.get("msg") != "succ":
raise RuntimeError(f"move_line failed: {ret}")
def set_joint_maxacc(self, acc: float):
"""
Устанавливает одинаковое максимальное ускорение для всех суставов (в rad/s²).
:param acc: ускорение для всех 6 суставов
:raises RuntimeError: если команда завершилась с ошибкой
"""
cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6}
ret = self._send(cmd)
if ret.get("msg") != "succ":
raise RuntimeError(f"set_joint_maxacc failed: {ret}")
def set_joint_maxvelc(self, vel: float):
"""
Устанавливает одинаковую максимальную скорость для всех суставов (в rad/s).
:param vel: скорость для всех 6 суставов
:raises RuntimeError: если команда завершилась с ошибкой
"""
cmd = {"command": "set_joint_maxvelc", "joint_maxvelc": [vel] * 6}
ret = self._send(cmd)
if ret.get("msg") != "succ":
raise RuntimeError(f"set_joint_maxvelc failed: {ret}")
def close(self):
self.sock.close()