#! /usr/bin/env python import socket import json import time import threading import queue import numpy as np from scipy.spatial.transform import Rotation as R, Slerp DEBUG: bool = True RESET = "\033[0m" CYAN = "\033[36m" YELLOW = "\033[33m" RED = "\033[31m" class RobotStatus: Stopped = 0 Running = 1 Paused = 2 Resumed = 3 class RobotJasonServerError: RESUTL_SUCC = 0 RESUTL_BASE = 3000 RESUTL_ERROR_BAD_COMMAND_STRING = RESUTL_BASE + 1 RESUTL_ERROR_UNKNOWN_COMMAND = RESUTL_BASE + 2 RESUTL_ERROR_ARGS = RESUTL_BASE + 2 RESUTL_ERROR_MOVE = RESUTL_BASE + 3 RESUTL_CALL_BACK = RESUTL_BASE + 4 RESUTL_ERROR_CLIENTS_NUMBER = RESUTL_BASE + 5 RESUTL_ERROR_DISCONNECT_TO_MODBUS_SERVER = RESUTL_BASE + 6 RESUTL_ERROR_CONNECT_TO_MODBUS_SERVER = RESUTL_BASE + 7 class RobotClient: def __init__(self, host, port): self.host = host self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True) self._response_queue = queue.Queue() self._callback_queue = queue.Queue() self._recv_thread.start() def _recv_loop(self): decoder = json.JSONDecoder() buffer = "" while True: try: data = self.sock.recv(4096).decode("utf-8") if not data: if DEBUG: print("DEBUG: socket closed by server") break buffer += data while buffer: try: obj, index = decoder.raw_decode(buffer) buffer = buffer[index:].lstrip() if DEBUG: print(f"{YELLOW}DEBUG: received:{RESET} {obj}") if obj["ret"] == RobotJasonServerError.RESUTL_CALL_BACK: self._callback_queue.put(obj) else: self._response_queue.put(obj) except json.JSONDecodeError: break except Exception as e: if DEBUG: print(f"{RED}DEBUG: Exception in recv loop: {e}{RESET}") break def _send_single_json(self, cmd, timeout=5.0) -> dict: if DEBUG: print(f"{CYAN}DEBUG: sended:{RESET} {cmd}") self.sock.sendall(json.dumps(cmd).encode("utf-8")) try: response = self._response_queue.get(timeout=timeout) try: extra = self._response_queue.get_nowait() raise RuntimeError( f"Expected single JSON response, but got extra: {extra}" ) except queue.Empty: return response except queue.Empty: raise RuntimeError("Timeout waiting for single-json response") def _send_multi_json(self, cmd) -> list: if DEBUG: print(f"{CYAN}DEBUG: sended:{RESET} {cmd}") self.sock.sendall(json.dumps(cmd).encode("utf-8")) results = [] start_time = time.time() while True: try: result = self._response_queue.get(timeout=5) results.append(result) if result.get("msg") == "succ" or result.get("ret") == 0: break except queue.Empty: if time.time() - start_time > 10: raise RuntimeError("Timeout waiting for multi-json response") break return results def get_callback_event(self, timeout=0.1): try: return self._callback_queue.get(timeout=timeout) except queue.Empty: return None def rpy_to_quat(self, arr): cmd = {"command": "rpy_to_quaternion", "rpy": arr} ret = self._send_single_json(cmd) return ret["ret"] def get_current_waypoint(self): cmd = {"command": "get_current_waypoint"} ret = self._send_single_json(cmd) return ret["ret"]["joint"], ret["ret"]["pos"], ret["ret"]["ori"] def inverse_kin(self, joint_radian: list, pos: list, ori: list) -> list: cmd = { "command": "inverse_kin", "joint_radian": joint_radian, "pos": pos, "ori": ori, } ret = self._send_single_json(cmd) if ret["msg"] != "succ": raise RuntimeError(f"Inverse kinematics failed: {ret}. \n Was sended {cmd}") return ret["ret"]["joint"] def move_to_pose_joint(self, pose: dict): joint_start, _, _ = self.get_current_waypoint() joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"]) self.move_joint(joint_target) def move_to_pose_cart(self, pose: dict): joint_start, _, _ = self.get_current_waypoint() joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"]) self.move_line(joint_target) def add_waypoint(self, joint_radian: list): cmd = {"command": "add_waypoint", "joint_radian": joint_radian} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"add_waypoint failed: {ret}") def move_track(self, track="CARTESIAN_MOVEP"): cmd = {"command": "move_track", "track": track} ret = self._send_multi_json(cmd) if not any(r.get("msg") == "succ" for r in ret): raise RuntimeError(f"move_track failed: {ret}") def get_robot_state(self) -> int: cmd = {"command": "get_robot_state"} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"get_robot_state failed: {ret}") return ret["ret"] def move_line(self, joint_radian: list) -> None: cmd = {"command": "move_line", "joint_radian": joint_radian} ret = self._send_single_json(cmd, 10.0) if ret.get("ret") != 0: raise RuntimeError(f"move_line failed: {ret}") self.wait_untill_motion_is_done(10.0) def move_joint(self, joint_radian: list) -> None: cmd = {"command": "move_joint", "joint_radian": joint_radian} ret = self._send_single_json(cmd, 10.0) if ret.get("ret") != 0: raise RuntimeError(f"move_joint failed: {ret}") self.wait_untill_motion_is_done(10.0) def wait_untill_motion_is_done(self, timeout=10.0): """ Blocks until a callback with addr=9/status=0.0 or code=0/text="RobotMoveControlStopDone." is received, or until timeout. """ start_time = time.time() while time.time() - start_time < timeout: event = self.get_callback_event(timeout=0.1) if event and event.get("ret") == RobotJasonServerError.RESUTL_CALL_BACK: try: msg = event["msg"] if not msg.startswith("robot callback event="): continue payload_str = msg.split("robot callback event=")[-1] payload = json.loads(payload_str) # First type: {"addr":9, "status":0.0} if payload.get("addr") == 9 and payload.get("status") == 0.0: if DEBUG: print( f"{CYAN}DEBUG: Motion complete (addr/status) event received{RESET}" ) return # Second type: {"code":0, "text":"RobotMoveControlStopDone."} if ( payload.get("code") == 0 and payload.get("text") == "RobotMoveControlStopDone." ): if DEBUG: print( f"{CYAN}DEBUG: Motion complete (code/text) event received{RESET}" ) return except Exception as e: if DEBUG: print( f"{RED}DEBUG: Failed to parse callback payload: {e}{RESET}" ) raise TimeoutError("Timeout waiting for motion to complete.") def set_joint_maxacc(self, acc: float): cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_joint_maxacc failed: {ret}") def set_joint_maxvelc(self, vel: float): cmd = {"command": "set_joint_maxvelc", "joint_maxvelc": [vel] * 6} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_joint_maxvelc failed: {ret}") def set_end_max_line_velc(self, vel: float): cmd = {"command": "set_end_max_line_velc", "end_maxvelc": vel} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_end_max_line_velc failed: {ret}") def set_end_max_line_acc(self, vel: float): cmd = {"command": "set_end_max_line_acc", "end_maxacc": vel} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_end_max_line_acc failed: {ret}") def set_end_max_angle_acc(self, acc: float): cmd = {"command": "set_end_max_angle_acc", "end_maxacc": acc} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_end_max_line_acc failed: {ret}") def set_end_max_angle_velc(self, vel: float): cmd = {"command": "set_end_max_angle_velc", "end_maxvelc": vel} ret = self._send_single_json(cmd) if ret.get("msg") != "succ": raise RuntimeError(f"set_end_max_line_acc failed: {ret}") def close(self): self.sock.close()