refactor move_linear_cartesian method
This commit is contained in:
parent
fb7ededef7
commit
68a6c712f9
1 changed files with 49 additions and 11 deletions
60
utils.py
60
utils.py
|
@ -6,6 +6,9 @@ import time
|
||||||
import threading
|
import threading
|
||||||
import queue
|
import queue
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy.spatial.transform import Rotation as R, Slerp
|
||||||
|
|
||||||
DEBUG: bool = True
|
DEBUG: bool = True
|
||||||
RESET = "\033[0m"
|
RESET = "\033[0m"
|
||||||
CYAN = "\033[36m"
|
CYAN = "\033[36m"
|
||||||
|
@ -154,12 +157,39 @@ class RobotClient:
|
||||||
if not any(r.get("msg") == "succ" for r in ret):
|
if not any(r.get("msg") == "succ" for r in ret):
|
||||||
raise RuntimeError(f"move_track failed: {ret}")
|
raise RuntimeError(f"move_track failed: {ret}")
|
||||||
|
|
||||||
def move_linear_cartesian(self, pose: dict):
|
def move_linear_cartesian(self, pose: dict, steps: int = 50):
|
||||||
joint_start, _, _ = self.get_current_waypoint()
|
joint_start, pos_start, ori_start_wxyz = self.get_current_waypoint()
|
||||||
self.add_waypoint(joint_start)
|
|
||||||
joint_target = self.inverse_kin(joint_start, pose["pos"], pose["ori"])
|
# Позиции
|
||||||
self.add_waypoint(joint_target)
|
pos_start = np.array(pos_start)
|
||||||
|
pos_end = np.array(pose["pos"])
|
||||||
|
|
||||||
|
# Ориентации: перевести из [w, x, y, z] в [x, y, z, w]
|
||||||
|
ori_start_xyzw = [ori_start_wxyz[1], ori_start_wxyz[2], ori_start_wxyz[3], ori_start_wxyz[0]]
|
||||||
|
ori_end_xyzw = [pose["ori"][1], pose["ori"][2], pose["ori"][3], pose["ori"][0]]
|
||||||
|
|
||||||
|
# SLERP подготовка
|
||||||
|
key_times = [0, 1]
|
||||||
|
key_rots = R.from_quat([ori_start_xyzw, ori_end_xyzw])
|
||||||
|
slerp = Slerp(key_times, key_rots)
|
||||||
|
|
||||||
|
for i in range(steps + 1):
|
||||||
|
alpha = i / steps
|
||||||
|
|
||||||
|
# Интерполяция позиции
|
||||||
|
interp_pos = (1 - alpha) * pos_start + alpha * pos_end
|
||||||
|
|
||||||
|
# SLERP ориентации
|
||||||
|
interp_rot = slerp([alpha])[0]
|
||||||
|
interp_ori_xyzw = interp_rot.as_quat(canonical=True)
|
||||||
|
interp_ori_wxyz = [interp_ori_xyzw[3], *interp_ori_xyzw[:3]]
|
||||||
|
|
||||||
|
# Инверсная кинематика
|
||||||
|
joint_interp = self.inverse_kin(joint_start, interp_pos.tolist(), interp_ori_wxyz)
|
||||||
|
self.add_waypoint(joint_interp)
|
||||||
|
|
||||||
self.move_track()
|
self.move_track()
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
def get_robot_state(self) -> int:
|
def get_robot_state(self) -> int:
|
||||||
cmd = {"command": "get_robot_state"}
|
cmd = {"command": "get_robot_state"}
|
||||||
|
@ -171,7 +201,7 @@ class RobotClient:
|
||||||
def move_line(self, joint_radian: list) -> None:
|
def move_line(self, joint_radian: list) -> None:
|
||||||
cmd = {"command": "move_line", "joint_radian": joint_radian}
|
cmd = {"command": "move_line", "joint_radian": joint_radian}
|
||||||
ret = self._send_single_json(cmd, 10.0)
|
ret = self._send_single_json(cmd, 10.0)
|
||||||
if (ret.get("ret") != 0):
|
if ret.get("ret") != 0:
|
||||||
raise RuntimeError(f"move_line failed: {ret}")
|
raise RuntimeError(f"move_line failed: {ret}")
|
||||||
self.wait_untill_motion_is_done(10.0)
|
self.wait_untill_motion_is_done(10.0)
|
||||||
|
|
||||||
|
@ -194,21 +224,29 @@ class RobotClient:
|
||||||
# First type: {"addr":9, "status":0.0}
|
# First type: {"addr":9, "status":0.0}
|
||||||
if payload.get("addr") == 9 and payload.get("status") == 0.0:
|
if payload.get("addr") == 9 and payload.get("status") == 0.0:
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
print(f"{CYAN}DEBUG: Motion complete (addr/status) event received{RESET}")
|
print(
|
||||||
|
f"{CYAN}DEBUG: Motion complete (addr/status) event received{RESET}"
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Second type: {"code":0, "text":"RobotMoveControlStopDone."}
|
# Second type: {"code":0, "text":"RobotMoveControlStopDone."}
|
||||||
if payload.get("code") == 0 and payload.get("text") == "RobotMoveControlStopDone.":
|
if (
|
||||||
|
payload.get("code") == 0
|
||||||
|
and payload.get("text") == "RobotMoveControlStopDone."
|
||||||
|
):
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
print(f"{CYAN}DEBUG: Motion complete (code/text) event received{RESET}")
|
print(
|
||||||
|
f"{CYAN}DEBUG: Motion complete (code/text) event received{RESET}"
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
print(f"{RED}DEBUG: Failed to parse callback payload: {e}{RESET}")
|
print(
|
||||||
|
f"{RED}DEBUG: Failed to parse callback payload: {e}{RESET}"
|
||||||
|
)
|
||||||
raise TimeoutError("Timeout waiting for motion to complete.")
|
raise TimeoutError("Timeout waiting for motion to complete.")
|
||||||
|
|
||||||
|
|
||||||
def set_joint_maxacc(self, acc: float):
|
def set_joint_maxacc(self, acc: float):
|
||||||
cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6}
|
cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6}
|
||||||
ret = self._send_single_json(cmd)
|
ret = self._send_single_json(cmd)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue