fix: rewrite recording_demo.py as node

This commit is contained in:
Ilya Uraev 2025-05-28 18:20:46 +03:00
parent 3e36193606
commit b22a64e5e9
2 changed files with 76 additions and 31 deletions

View file

@ -38,6 +38,7 @@ install(PROGRAMS
scripts/assembly_config_service.py
scripts/recording_demo_via_rosbag.py
scripts/example_recording_demo.py
rbs_utils/recording_demo.py
DESTINATION lib/${PROJECT_NAME}
)

106
rbs_utils/rbs_utils/rbs_utils/recording_demo.py Normal file → Executable file
View file

@ -1,18 +1,23 @@
#!/usr/bin/env python3
"""
recording_demo_lib
ROS 2 program for recording a demo via RosBag
recording_demo_lib
ROS 2 program for recording a demo via RosBag
@shalenikol release 0.1
@shalenikol release 0.1
"""
import os
import json
from enum import Enum
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.action.client import ActionClient
from rbs_skill_interfaces.action import RbsBt
from ament_index_python.packages import get_package_share_directory
from rbs_utils_interfaces.msg import RecorderCommands
from rbs_utils_interfaces.srv import Recorder
class CommandType(str, Enum):
RUN = "run"
@ -20,23 +25,51 @@ class CommandType(str, Enum):
CANCEL_AND_NEXT = "cancel_and_next"
STOP_AND_SAVE = "stop_and_save"
STOP_AND_CANCEL = "stop_and_cancel"
# list of all values
COMMAND_TYPES = [cmd.value for cmd in CommandType]
SERVER_NAME = "rbs_interface_a"
SERVER_NAME = "rbs_interface_a"
FILE_TEMPLATE_SKILL = "recording_demo_skill.json"
FILE_SKILL = "skill.json"
KEY_SIDPATH = "@path@" # prefix for filepath
KEY_SIDPATH = "@path@" # prefix for filepath
class RbsActionClient(Node):
def __init__(self):
self.res_ok = False
super().__init__("rbs_action_client")
self._action_client = ActionClient(self, RbsBt, SERVER_NAME)
self._recorder_service = self.create_service(
Recorder, "recorder", self.recorder
)
def recorder(
self, request: Recorder.Request, response: Recorder.Response
) -> Recorder.Response:
# response.ok = recording_demo(request.command.command)
if request.command.command == RecorderCommands.RUN:
response.ok = recording_demo(CommandType.RUN)
elif request.command.command == RecorderCommands.SAVE_AND_NEXT:
response.ok = recording_demo(CommandType.SAVE_AND_NEXT)
elif request.command.command == RecorderCommands.CANCEL_AND_NEXT:
response.ok = recording_demo(CommandType.CANCEL_AND_NEXT)
elif request.command.command == RecorderCommands.STOP_AND_SAVE:
response.ok = recording_demo(CommandType.STOP_AND_SAVE)
elif request.command.command == RecorderCommands.STOP_AND_CANCEL:
response.ok = recording_demo(CommandType.STOP_AND_CANCEL)
else:
response.ok = False
return response
def send_goal(self, sid: str, action: str, command: str):
self.res_ok = False
self.get_logger().info(f"Waiting for action server...")
self.get_logger().info("Waiting for action server...")
self._action_client.wait_for_server()
goal_msg = RbsBt.Goal()
@ -44,17 +77,21 @@ class RbsActionClient(Node):
goal_msg.action = action
goal_msg.command = command
self.get_logger().info(f"Sending goal: sid={sid}, action={action}, command={command}")
send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self.get_logger().info(
f"Sending goal: sid={sid}, action={action}, command={command}"
)
send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback
)
rclpy.spin_until_future_complete(self, send_goal_future)
goal_handle = send_goal_future.result()
if not goal_handle.accepted:
self.get_logger().warn('Goal rejected')
self.get_logger().warn("Goal rejected")
return
self.get_logger().info('Goal accepted, waiting for result...')
self.get_logger().info("Goal accepted, waiting for result...")
get_result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, get_result_future)
result = get_result_future.result().result
@ -62,23 +99,25 @@ class RbsActionClient(Node):
self.res_ok = result.ok
if self.res_ok:
self.get_logger().info('Result: SUCCESS')
self.get_logger().info("Result: SUCCESS")
else:
self.get_logger().error('Result: FAILURE')
self.get_logger().error("Result: FAILURE")
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback}')
self.get_logger().info(f"Received feedback: {feedback}")
def get_config_path() -> str:
return os.path.join(get_package_share_directory("rbs_utils"), "config")
def get_config_file() -> str:
return os.path.join(get_config_path(), FILE_TEMPLATE_SKILL)
def recording_demo(command:CommandType = CommandType.RUN) -> bool:
if not command in COMMAND_TYPES:
def recording_demo(command: CommandType = CommandType.RUN) -> bool:
if command not in COMMAND_TYPES:
return False
# read template of skill description ("RecordingDemo")
@ -93,14 +132,7 @@ def recording_demo(command:CommandType = CommandType.RUN) -> bool:
for act in rec_demo["BTAction"]:
if act["name"] == "rdStop":
act["param"] = [
{
"type": "formBuilder",
"dependency": {
"output": {
"do": command
}
}
}
{"type": "formBuilder", "dependency": {"output": {"do": command}}}
]
# write params of skill description
@ -108,7 +140,6 @@ def recording_demo(command:CommandType = CommandType.RUN) -> bool:
json.dump(cfg, f, indent=2, ensure_ascii=False)
ret = False
rclpy.init()
action_client = RbsActionClient()
if command == CommandType.RUN:
@ -116,14 +147,27 @@ def recording_demo(command:CommandType = CommandType.RUN) -> bool:
elif command in [CommandType.SAVE_AND_NEXT, CommandType.CANCEL_AND_NEXT]:
action_client.send_goal(sid=sid, action="RecordingDemo", command="rdStop")
if action_client.res_ok:
action_client.send_goal(sid=sid, action="RecordingDemo", command="rdConfigure")
else: # command in [CommandType.STOP_AND_CANCEL, CommandType.STOP_AND_SAVE]:
action_client.send_goal(
sid=sid, action="RecordingDemo", command="rdConfigure"
)
else: # command in [CommandType.STOP_AND_CANCEL, CommandType.STOP_AND_SAVE]:
action_client.send_goal(sid=sid, action="RecordingDemo", command="rdStop")
ret = action_client.res_ok
action_client.destroy_node()
rclpy.shutdown()
return ret
if __name__ == '__main__':
recording_demo()
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
lc_node = RbsActionClient()
executor.add_node(lc_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
lc_node.destroy_node()
if __name__ == "__main__":
main()