aubo-old-sdk-test/utils.py

255 lines
9.3 KiB
Python
Raw Normal View History

2025-05-21 19:24:55 +03:00
#! /usr/bin/env python
import socket
import json
2025-05-22 16:33:05 +03:00
import time
2025-05-22 17:36:24 +03:00
import threading
import queue
2025-05-23 17:05:46 +03:00
import numpy as np
from scipy.spatial.transform import Rotation as R, Slerp
2025-05-21 19:52:20 +03:00
DEBUG: bool = True
2025-05-22 14:44:04 +03:00
RESET = "\033[0m"
CYAN = "\033[36m"
YELLOW = "\033[33m"
2025-05-22 16:10:21 +03:00
RED = "\033[31m"
2025-05-21 19:52:20 +03:00
2025-05-22 16:33:05 +03:00
class RobotStatus:
Stopped = 0
Running = 1
Paused = 2
Resumed = 3
2025-05-22 17:18:19 +03:00
class RobotJasonServerError:
RESUTL_SUCC = 0
RESUTL_BASE = 3000
RESUTL_ERROR_BAD_COMMAND_STRING = RESUTL_BASE + 1
RESUTL_ERROR_UNKNOWN_COMMAND = RESUTL_BASE + 2
RESUTL_ERROR_ARGS = RESUTL_BASE + 2
RESUTL_ERROR_MOVE = RESUTL_BASE + 3
RESUTL_CALL_BACK = RESUTL_BASE + 4
RESUTL_ERROR_CLIENTS_NUMBER = RESUTL_BASE + 5
RESUTL_ERROR_DISCONNECT_TO_MODBUS_SERVER = RESUTL_BASE + 6
RESUTL_ERROR_CONNECT_TO_MODBUS_SERVER = RESUTL_BASE + 7
2025-05-21 19:24:55 +03:00
class RobotClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
2025-05-22 17:36:24 +03:00
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self._response_queue = queue.Queue()
self._callback_queue = queue.Queue()
self._recv_thread.start()
def _recv_loop(self):
2025-05-22 17:47:26 +03:00
decoder = json.JSONDecoder()
buffer = ""
2025-05-22 17:36:24 +03:00
while True:
try:
data = self.sock.recv(4096).decode("utf-8")
if not data:
if DEBUG:
print("DEBUG: socket closed by server")
break
2025-05-22 17:47:26 +03:00
buffer += data
while buffer:
try:
obj, index = decoder.raw_decode(buffer)
buffer = buffer[index:].lstrip()
if DEBUG:
print(f"{YELLOW}DEBUG: received:{RESET} {obj}")
if obj["ret"] == RobotJasonServerError.RESUTL_CALL_BACK:
self._callback_queue.put(obj)
else:
self._response_queue.put(obj)
except json.JSONDecodeError:
break
2025-05-22 17:36:24 +03:00
except Exception as e:
if DEBUG:
print(f"{RED}DEBUG: Exception in recv loop: {e}{RESET}")
break
2025-05-23 14:54:12 +03:00
def _send_single_json(self, cmd, timeout=5.0) -> dict:
2025-05-22 14:44:04 +03:00
if DEBUG:
print(f"{CYAN}DEBUG: sended:{RESET} {cmd}")
2025-05-21 19:24:55 +03:00
self.sock.sendall(json.dumps(cmd).encode("utf-8"))
2025-05-22 17:47:26 +03:00
try:
2025-05-23 14:54:12 +03:00
response = self._response_queue.get(timeout=timeout)
2025-05-22 17:47:26 +03:00
try:
extra = self._response_queue.get_nowait()
2025-05-23 14:41:54 +03:00
raise RuntimeError(
f"Expected single JSON response, but got extra: {extra}"
)
2025-05-22 17:47:26 +03:00
except queue.Empty:
return response
except queue.Empty:
raise RuntimeError("Timeout waiting for single-json response")
def _send_multi_json(self, cmd) -> list:
if DEBUG:
print(f"{CYAN}DEBUG: sended:{RESET} {cmd}")
self.sock.sendall(json.dumps(cmd).encode("utf-8"))
results = []
start_time = time.time()
2025-05-22 17:18:19 +03:00
while True:
2025-05-22 17:36:24 +03:00
try:
2025-05-22 17:47:26 +03:00
result = self._response_queue.get(timeout=5)
results.append(result)
if result.get("msg") == "succ" or result.get("ret") == 0:
break
2025-05-22 17:36:24 +03:00
except queue.Empty:
2025-05-22 17:47:26 +03:00
if time.time() - start_time > 10:
raise RuntimeError("Timeout waiting for multi-json response")
break
return results
2025-05-22 17:36:24 +03:00
def get_callback_event(self, timeout=0.1):
try:
return self._callback_queue.get(timeout=timeout)
except queue.Empty:
return None
2025-05-21 19:24:55 +03:00
def rpy_to_quat(self, arr):
cmd = {"command": "rpy_to_quaternion", "rpy": arr}
2025-05-22 17:47:26 +03:00
ret = self._send_single_json(cmd)
2025-05-21 19:24:55 +03:00
return ret["ret"]
def get_current_waypoint(self):
cmd = {"command": "get_current_waypoint"}
2025-05-22 17:47:26 +03:00
ret = self._send_single_json(cmd)
2025-05-21 19:24:55 +03:00
return ret["ret"]["joint"], ret["ret"]["pos"], ret["ret"]["ori"]
def inverse_kin(self, joint_radian: list, pos: list, ori: list) -> list:
cmd = {
"command": "inverse_kin",
"joint_radian": joint_radian,
"pos": pos,
"ori": ori,
}
2025-05-22 17:47:26 +03:00
ret = self._send_single_json(cmd)
2025-05-21 19:24:55 +03:00
if ret["msg"] != "succ":
raise RuntimeError(f"Inverse kinematics failed: {ret}. \n Was sended {cmd}")
2025-05-22 17:47:26 +03:00
return ret["ret"]["joint"]
2025-05-21 19:24:55 +03:00
def move_to_pose_joint(self, pose: dict):
2025-05-21 19:24:55 +03:00
joint_start, _, _ = self.get_current_waypoint()
joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"])
2025-05-23 17:44:16 +03:00
self.move_joint(joint_target)
2025-05-22 16:21:40 +03:00
def move_to_pose_cart(self, pose: dict):
joint_start, _, _ = self.get_current_waypoint()
joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"])
self.move_line(joint_target)
2025-05-21 19:24:55 +03:00
def add_waypoint(self, joint_radian: list):
cmd = {"command": "add_waypoint", "joint_radian": joint_radian}
2025-05-22 17:47:26 +03:00
ret = self._send_single_json(cmd)
2025-05-23 14:41:54 +03:00
if ret.get("msg") != "succ":
2025-05-21 19:24:55 +03:00
raise RuntimeError(f"add_waypoint failed: {ret}")
2025-05-23 14:41:54 +03:00
def move_track(self, track="CARTESIAN_MOVEP"):
2025-05-21 19:24:55 +03:00
cmd = {"command": "move_track", "track": track}
2025-05-22 17:47:26 +03:00
ret = self._send_multi_json(cmd)
if not any(r.get("msg") == "succ" for r in ret):
2025-05-21 19:24:55 +03:00
raise RuntimeError(f"move_track failed: {ret}")
2025-05-22 16:21:40 +03:00
def get_robot_state(self) -> int:
cmd = {"command": "get_robot_state"}
2025-05-22 17:47:26 +03:00
ret = self._send_single_json(cmd)
2025-05-22 16:21:40 +03:00
if ret.get("msg") != "succ":
raise RuntimeError(f"get_robot_state failed: {ret}")
return ret["ret"]
2025-05-21 19:24:55 +03:00
def move_line(self, joint_radian: list) -> None:
cmd = {"command": "move_line", "joint_radian": joint_radian}
2025-05-23 14:54:12 +03:00
ret = self._send_single_json(cmd, 10.0)
2025-05-23 17:05:46 +03:00
if ret.get("ret") != 0:
2025-05-21 19:24:55 +03:00
raise RuntimeError(f"move_line failed: {ret}")
2025-05-23 17:44:16 +03:00
self.wait_untill_motion_is_done(10.0)
def move_joint(self, joint_radian: list) -> None:
cmd = {"command": "move_joint", "joint_radian": joint_radian}
ret = self._send_single_json(cmd, 10.0)
if ret.get("ret") != 0:
raise RuntimeError(f"move_joint failed: {ret}")
2025-05-23 14:41:54 +03:00
self.wait_untill_motion_is_done(10.0)
def wait_untill_motion_is_done(self, timeout=10.0):
"""
Blocks until a callback with addr=9/status=0.0 or code=0/text="RobotMoveControlStopDone." is received,
or until timeout.
"""
start_time = time.time()
while time.time() - start_time < timeout:
event = self.get_callback_event(timeout=0.1)
if event and event.get("ret") == RobotJasonServerError.RESUTL_CALL_BACK:
try:
msg = event["msg"]
if not msg.startswith("robot callback event="):
continue
payload_str = msg.split("robot callback event=")[-1]
payload = json.loads(payload_str)
# First type: {"addr":9, "status":0.0}
if payload.get("addr") == 9 and payload.get("status") == 0.0:
if DEBUG:
2025-05-23 17:05:46 +03:00
print(
f"{CYAN}DEBUG: Motion complete (addr/status) event received{RESET}"
)
2025-05-23 14:41:54 +03:00
return
# Second type: {"code":0, "text":"RobotMoveControlStopDone."}
2025-05-23 17:05:46 +03:00
if (
payload.get("code") == 0
and payload.get("text") == "RobotMoveControlStopDone."
):
2025-05-23 14:41:54 +03:00
if DEBUG:
2025-05-23 17:05:46 +03:00
print(
f"{CYAN}DEBUG: Motion complete (code/text) event received{RESET}"
)
2025-05-23 14:41:54 +03:00
return
except Exception as e:
if DEBUG:
2025-05-23 17:05:46 +03:00
print(
f"{RED}DEBUG: Failed to parse callback payload: {e}{RESET}"
)
2025-05-23 14:41:54 +03:00
raise TimeoutError("Timeout waiting for motion to complete.")
2025-05-21 19:24:55 +03:00
def set_joint_maxacc(self, acc: float):
cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6}
2025-05-22 17:47:26 +03:00
ret = self._send_single_json(cmd)
2025-05-21 19:24:55 +03:00
if ret.get("msg") != "succ":
raise RuntimeError(f"set_joint_maxacc failed: {ret}")
def set_joint_maxvelc(self, vel: float):
cmd = {"command": "set_joint_maxvelc", "joint_maxvelc": [vel] * 6}
2025-05-22 17:47:26 +03:00
ret = self._send_single_json(cmd)
2025-05-21 19:24:55 +03:00
if ret.get("msg") != "succ":
raise RuntimeError(f"set_joint_maxvelc failed: {ret}")
2025-05-23 17:13:26 +03:00
def set_end_max_line_velc(self, vel: float):
cmd = {"command": "set_end_max_line_velc", "end_maxvelc": vel}
ret = self._send_single_json(cmd)
if ret.get("msg") != "succ":
raise RuntimeError(f"set_end_max_line_velc failed: {ret}")
def set_end_max_line_acc(self, vel: float):
cmd = {"command": "set_end_max_line_acc", "end_maxacc": vel}
ret = self._send_single_json(cmd)
if ret.get("msg") != "succ":
raise RuntimeError(f"set_end_max_line_acc failed: {ret}")
2025-05-21 19:24:55 +03:00
def close(self):
self.sock.close()