#! /usr/bin/env python import socket import json import time import threading import queue DEBUG: bool = True RESET = "\033[0m" CYAN = "\033[36m" YELLOW = "\033[33m" RED = "\033[31m" class RobotStatus: Stopped = 0 Running = 1 Paused = 2 Resumed = 3 class RobotJasonServerError: RESUTL_SUCC = 0 RESUTL_BASE = 3000 RESUTL_ERROR_BAD_COMMAND_STRING = RESUTL_BASE + 1 RESUTL_ERROR_UNKNOWN_COMMAND = RESUTL_BASE + 2 RESUTL_ERROR_ARGS = RESUTL_BASE + 2 RESUTL_ERROR_MOVE = RESUTL_BASE + 3 RESUTL_CALL_BACK = RESUTL_BASE + 4 RESUTL_ERROR_CLIENTS_NUMBER = RESUTL_BASE + 5 RESUTL_ERROR_DISCONNECT_TO_MODBUS_SERVER = RESUTL_BASE + 6 RESUTL_ERROR_CONNECT_TO_MODBUS_SERVER = RESUTL_BASE + 7 class RobotClient: def __init__(self, host, port): self.host = host self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True) self._response_queue = queue.Queue() self._callback_queue = queue.Queue() self._recv_thread.start() def _recv_loop(self): decoder = json.JSONDecoder() buffer = "" while True: try: data = self.sock.recv(4096).decode("utf-8") if not data: if DEBUG: print("DEBUG: socket closed by server") break buffer += data while buffer: try: obj, index = decoder.raw_decode(buffer) buffer = buffer[index:].lstrip() if DEBUG: print(f"{YELLOW}DEBUG: received:{RESET} {obj}") if obj["ret"] == RobotJasonServerError.RESUTL_CALL_BACK: self._callback_queue.put(obj) else: self._response_queue.put(obj) except json.JSONDecodeError: break except Exception as e: if DEBUG: print(f"{RED}DEBUG: Exception in recv loop: {e}{RESET}") break def _send_single_json(self, cmd) -> dict: if DEBUG: print(f"{CYAN}DEBUG: sended:{RESET} {cmd}") self.sock.sendall(json.dumps(cmd).encode("utf-8")) try: response = self._response_queue.get(timeout=5) try: extra = self._response_queue.get_nowait() raise RuntimeError(f"Expected single JSON response, but got extra: {extra}") except queue.Empty: return response except queue.Empty: raise RuntimeError("Timeout waiting for single-json response") def _send_multi_json(self, cmd) -> list: if DEBUG: print(f"{CYAN}DEBUG: sended:{RESET} {cmd}") self.sock.sendall(json.dumps(cmd).encode("utf-8")) results = [] start_time = time.time() while True: try: result = self._response_queue.get(timeout=5) results.append(result) if result.get("msg") == "succ" or result.get("ret") == 0: break except queue.Empty: if time.time() - start_time > 10: raise RuntimeError("Timeout waiting for multi-json response") break return results def get_callback_event(self, timeout=0.1): try: return self._callback_queue.get(timeout=timeout) except queue.Empty: return None def rpy_to_quat(self, arr): cmd = {"command": "rpy_to_quaternion", "rpy": arr} ret = self._send_single_json(cmd) return ret["ret"] def get_current_waypoint(self): cmd = {"command": "get_current_waypoint"} ret = self._send_single_json(cmd) return ret["ret"]["joint"], ret["ret"]["pos"], ret["ret"]["ori"] def inverse_kin(self, joint_radian: list, pos: list, ori: list) -> list: cmd = { "command": "inverse_kin", "joint_radian": joint_radian, "pos": pos, "ori": ori, } ret = self._send_single_json(cmd) if ret["msg"] != "succ": raise RuntimeError(f"Inverse kinematics failed: {ret}. \n Was sended {cmd}") return ret["ret"]["joint"] def move_to_pose(self, pose: dict): joint_start, _, _ = self.get_current_waypoint() joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"]) self.move_line(joint_target) def add_waypoint(self, joint_radian: list): cmd = {"command": "add_waypoint", "joint_radian": joint_radian} ret = self._send_single_json(cmd) if (ret.get("msg") != "succ"): raise RuntimeError(f"add_waypoint failed: {ret}") def move_track(self, track = "CARTESIAN_MOVEP"): cmd = {"command": "move_track", "track": track} ret = self._send_multi_json(cmd) if not any(r.get("msg") == "succ" for r in ret): raise RuntimeError(f"move_track failed: {ret}") def move_linear_cartesian(self, pose: dict): joint_start, _, _ = self.get_current_waypoint() self.add_waypoint(joint_start) joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"]) self.add_waypoint(joint_target) self.move_track() def get_robot_state(self) -> int: cmd = {"command": "get_robot_state"} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"get_robot_state failed: {ret}") return ret["ret"] def move_line(self, joint_radian: list) -> None: cmd = {"command": "move_line", "joint_radian": joint_radian} ret = self._send_multi_json(cmd) if not any(r.get("msg") == "succ" or r.get("ret") == 0 for r in ret): raise RuntimeError(f"move_line failed: {ret}") def set_joint_maxacc(self, acc: float): cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_joint_maxacc failed: {ret}") def set_joint_maxvelc(self, vel: float): cmd = {"command": "set_joint_maxvelc", "joint_maxvelc": [vel] * 6} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_joint_maxvelc failed: {ret}") def close(self): self.sock.close()