add wait_untill_motion_is_done method

This commit is contained in:
Ilya Uraev 2025-05-23 14:41:54 +03:00
parent 36a9889a1c
commit 4ebd4a6206

View file

@ -82,7 +82,9 @@ class RobotClient:
response = self._response_queue.get(timeout=5)
try:
extra = self._response_queue.get_nowait()
raise RuntimeError(f"Expected single JSON response, but got extra: {extra}")
raise RuntimeError(
f"Expected single JSON response, but got extra: {extra}"
)
except queue.Empty:
return response
except queue.Empty:
@ -143,10 +145,10 @@ class RobotClient:
def add_waypoint(self, joint_radian: list):
cmd = {"command": "add_waypoint", "joint_radian": joint_radian}
ret = self._send_single_json(cmd)
if (ret.get("msg") != "succ"):
if ret.get("msg") != "succ":
raise RuntimeError(f"add_waypoint failed: {ret}")
def move_track(self, track = "CARTESIAN_MOVEP"):
def move_track(self, track="CARTESIAN_MOVEP"):
cmd = {"command": "move_track", "track": track}
ret = self._send_multi_json(cmd)
if not any(r.get("msg") == "succ" for r in ret):
@ -168,9 +170,44 @@ class RobotClient:
def move_line(self, joint_radian: list) -> None:
cmd = {"command": "move_line", "joint_radian": joint_radian}
ret = self._send_multi_json(cmd)
if not any(r.get("msg") == "succ" or r.get("ret") == 0 for r in ret):
ret = self._send_single_json(cmd)
if (ret.get("ret") != 0):
raise RuntimeError(f"move_line failed: {ret}")
self.wait_untill_motion_is_done(10.0)
def wait_untill_motion_is_done(self, timeout=10.0):
"""
Blocks until a callback with addr=9/status=0.0 or code=0/text="RobotMoveControlStopDone." is received,
or until timeout.
"""
start_time = time.time()
while time.time() - start_time < timeout:
event = self.get_callback_event(timeout=0.1)
if event and event.get("ret") == RobotJasonServerError.RESUTL_CALL_BACK:
try:
msg = event["msg"]
if not msg.startswith("robot callback event="):
continue
payload_str = msg.split("robot callback event=")[-1]
payload = json.loads(payload_str)
# First type: {"addr":9, "status":0.0}
if payload.get("addr") == 9 and payload.get("status") == 0.0:
if DEBUG:
print(f"{CYAN}DEBUG: Motion complete (addr/status) event received{RESET}")
return
# Second type: {"code":0, "text":"RobotMoveControlStopDone."}
if payload.get("code") == 0 and payload.get("text") == "RobotMoveControlStopDone.":
if DEBUG:
print(f"{CYAN}DEBUG: Motion complete (code/text) event received{RESET}")
return
except Exception as e:
if DEBUG:
print(f"{RED}DEBUG: Failed to parse callback payload: {e}{RESET}")
raise TimeoutError("Timeout waiting for motion to complete.")
def set_joint_maxacc(self, acc: float):
cmd = {"command": "set_joint_maxacc", "joint_maxacc": [acc] * 6}