#! /usr/bin/env python import socket import json import time DEBUG: bool = True RESET = "\033[0m" CYAN = "\033[36m" YELLOW = "\033[33m" RED = "\033[31m" class RobotStatus: Stopped = 0 Running = 1 Paused = 2 Resumed = 3 class RobotClient: def __init__(self, host, port): self.host = host self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) def _send(self, cmd) -> dict: if DEBUG: print(f"{CYAN}DEBUG: sended:{RESET} {cmd}") self.sock.sendall(json.dumps(cmd).encode("utf-8")) try: ret = self._recv_full_json() if DEBUG: print(f"{YELLOW}DEBUG: received:{RESET} {ret}") return ret except Exception as e: print(f"{RED}ERROR: {e}{RESET}") return {} def _recv_full_json(self) -> dict: buffer = "" while True: chunk = self.sock.recv(4096).decode("utf-8") if not chunk: raise ConnectionError("Connection closed") buffer += chunk try: return json.loads(buffer) except json.JSONDecodeError: continue # not полный JSON, продолжаем читать def rpy_to_quat(self, arr): cmd = {"command": "rpy_to_quaternion", "rpy": arr} ret = self._send(cmd) return ret["ret"] def get_current_waypoint(self): cmd = {"command": "get_current_waypoint"} ret = self._send(cmd) return ret["ret"]["joint"], ret["ret"]["pos"], ret["ret"]["ori"] def inverse_kin(self, joint_radian: list, pos: list, ori: list) -> list: """ Выполняет обратную кинематику, возвращает углы суставов (в радианах). :param joint_radian: текущая поза (6 углов в рад) :param pos: целевая позиция [x, y, z] в метрах :param ori: ориентация в виде кватерниона [w, x, y, z] :return: список из 6 углов суставов (в радианах) """ cmd = { "command": "inverse_kin", "joint_radian": joint_radian, "pos": pos, "ori": ori, } ret = self._send(cmd) if ret["msg"] != "succ": raise RuntimeError(f"Inverse kinematics failed: {ret}. \n Was sended {cmd}") return ret["ret"][ "joint" ] # возвращается как {"joint": [...], "pos": [...], "ori": [...]} def move_to_pose(self, pose: dict): """ Перемещает робота в указанную позицию и ориентацию. :param command: словарь с ключами "pos" и "ori" (ори — в радианах, RPY) """ # quat = self.rpy_to_quat(command["ori"]) # 2. Получаем текущее состояние робота joint_start, _, _ = self.get_current_waypoint() # 3. Вычисляем инверсную кинематику joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"]) # 4. Выполняем линейное перемещение self.move_line(joint_target) self.wait_until_stopped() def wait_until_stopped(self, timeout: float = 10.0, poll_interval: float = 0.3): """ Ожидает, пока робот не перейдёт в состояние Stopped (0). :param timeout: максимальное время ожидания в секундах :param poll_interval: интервал опроса в секундах :raises TimeoutError: если робот не остановился за указанное время """ start = time.time() while time.time() - start < timeout: try: status = self.get_robot_state() if DEBUG: print(f"{CYAN}DEBUG: robot status:{RESET} {status}") if status == RobotStatus.Stopped: return except Exception as e: if DEBUG: print(f"{YELLOW}DEBUG: failed to get robot status:{RESET} {e}") time.sleep(poll_interval) raise TimeoutError("Ожидание состояния 'Stopped' превысило таймаут") def add_waypoint(self, joint_radian: list): cmd = {"command": "add_waypoint", "joint_radian": joint_radian} ret = self._send(cmd) if (ret.get("msg") != "succ"): raise RuntimeError(f"add_waypoint failed: {ret}") def move_track(self, track = "CARTESIAN_MOVEP"): cmd = {"command": "move_track", "track": track} ret = self._send(cmd) if (ret.get("msg") != "succ"): raise RuntimeError(f"move_track failed: {ret}") def move_linear_cartesian(self, pose: dict): """ Выполняет линейное движение в декартовом пространстве между текущей позицией и заданной позой. :param pose: словарь с ключами "pos" (XYZ в метрах) и "ori" (кватернион [w, x, y, z]) """ # 1. Получаем текущее состояние joint_start, _, _ = self.get_current_waypoint() # 2. Добавляем текущую позицию как waypoint self.add_waypoint(joint_start) # 3. Вычисляем IK joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"]) # 4. Добавляем целевую позицию как waypoint self.add_waypoint(joint_target) # 5. Выполняем траекторию self.move_track() def get_robot_state(self) -> int: """ Получает текущее состояние робота. :return: Одно из значений: 0 — Stopped 1 — Running 2 — Paused 3 — Resumed :raises RuntimeError: если не удалось получить состояние """ cmd = {"command": "get_robot_state"} ret = self._send(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"get_robot_state failed: {ret}") return ret["ret"] def move_line(self, joint_radian: list) -> None: """ Выполняет линейное перемещение в заданную позу, сохраняя текущую ориентацию. :param joint_radian: целевая поза (список из 6 значений в радианах) :raises RuntimeError: если команда не была выполнена успешно """ cmd = {"command": "move_line", "joint_radian": joint_radian} ret = self._send(cmd) if ret.get("ret") == 3004: return if ret.get("msg") != "succ": raise RuntimeError(f"move_line failed: {ret}") def set_joint_maxacc(self, acc: float): """ Устанавливает одинаковое максимальное ускорение для всех суставов (в rad/s²). :param acc: ускорение для всех 6 суставов :raises RuntimeError: если команда завершилась с ошибкой """ cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6} ret = self._send(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_joint_maxacc failed: {ret}") def set_joint_maxvelc(self, vel: float): """ Устанавливает одинаковую максимальную скорость для всех суставов (в rad/s). :param vel: скорость для всех 6 суставов :raises RuntimeError: если команда завершилась с ошибкой """ cmd = {"command": "set_joint_maxvelc", "joint_maxvelc": [vel] * 6} ret = self._send(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_joint_maxvelc failed: {ret}") def close(self): self.sock.close()