add API for recording_demo

This commit is contained in:
shalenikol 2025-05-16 16:07:39 +03:00
parent ac265f74a3
commit 52b0fa5378
3 changed files with 205 additions and 20 deletions

View file

@ -0,0 +1,37 @@
{
"skills": [
{
"sid": "RecordingDemo_cfg",
"SkillPackage": { "name": "Robossembler", "version": "1.0", "format": "1" },
"Module": { "name": "RecordingDemo", "description": "Recording a demo via RosBag", "node_name": "lc_record_demo" },
"Launch": { "package": "rbs_utils", "executable": "recording_demo_via_rosbag.py" },
"BTAction": [
{
"name": "rdConfigure",
"type_action": "ACTION",
"type": "run",
"param": []
},
{ "name": "rdStop", "type_action": "ACTION", "type": "stop",
"param": [
{
"type": "formBuilder",
"dependency": {
"output": {
"do": "stop_and_cancel"
}
}
}
]
}
],
"Settings": {
"output": {
"params": [
{ "name": "output_path", "value": "rbs_demo", "description": "!!! Путь, куда запишутся эпизоды в формате rosbag" }
]
}
}
}
]
}

View file

@ -0,0 +1,129 @@
"""
recording_demo_lib
ROS 2 program for recording a demo via RosBag
@shalenikol release 0.1
"""
import os
import json
from enum import Enum
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rbs_skill_interfaces.action import RbsBt
from ament_index_python.packages import get_package_share_directory
class CommandType(str, Enum):
RUN = "run"
SAVE_AND_NEXT = "save_and_next"
CANCEL_AND_NEXT = "cancel_and_next"
STOP_AND_SAVE = "stop_and_save"
STOP_AND_CANCEL = "stop_and_cancel"
# list of all values
COMMAND_TYPES = [cmd.value for cmd in CommandType]
SERVER_NAME = "rbs_interface_a"
FILE_TEMPLATE_SKILL = "recording_demo_skill.json"
FILE_SKILL = "skill.json"
KEY_SIDPATH = "@path@" # prefix for filepath
class RbsActionClient(Node):
def __init__(self):
self.res_ok = False
super().__init__("rbs_action_client")
self._action_client = ActionClient(self, RbsBt, SERVER_NAME)
def send_goal(self, sid: str, action: str, command: str):
self.res_ok = False
self.get_logger().info(f"Waiting for action server...")
self._action_client.wait_for_server()
goal_msg = RbsBt.Goal()
goal_msg.sid = sid
goal_msg.action = action
goal_msg.command = command
self.get_logger().info(f"Sending goal: sid={sid}, action={action}, command={command}")
send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
rclpy.spin_until_future_complete(self, send_goal_future)
goal_handle = send_goal_future.result()
if not goal_handle.accepted:
self.get_logger().warn('Goal rejected')
return
self.get_logger().info('Goal accepted, waiting for result...')
get_result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, get_result_future)
result = get_result_future.result().result
self.res_ok = result.ok
if self.res_ok:
self.get_logger().info('Result: SUCCESS')
else:
self.get_logger().error('Result: FAILURE')
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback}')
def get_config_path() -> str:
return os.path.join(get_package_share_directory("rbs_utils"), "config")
def get_config_file() -> str:
return os.path.join(get_config_path(), FILE_TEMPLATE_SKILL)
def recording_demo(command:CommandType = CommandType.RUN) -> bool:
if not command in COMMAND_TYPES:
return False
# read template of skill description ("RecordingDemo")
with open(get_config_file(), "r") as f:
cfg = json.load(f)
file_skill = os.path.join(get_config_path(), FILE_SKILL)
sid = KEY_SIDPATH + file_skill
rec_demo = cfg["skills"][0]
rec_demo["sid"] = sid
if not command == CommandType.RUN:
for act in rec_demo["BTAction"]:
if act["name"] == "rdStop":
act["param"] = [
{
"type": "formBuilder",
"dependency": {
"output": {
"do": command
}
}
}
]
# write params of skill description
with open(file_skill, "w") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
ret = False
rclpy.init()
action_client = RbsActionClient()
if command == CommandType.RUN:
action_client.send_goal(sid=sid, action="RecordingDemo", command="rdConfigure")
elif command in [CommandType.SAVE_AND_NEXT, CommandType.CANCEL_AND_NEXT]:
action_client.send_goal(sid=sid, action="RecordingDemo", command="rdStop")
if action_client.res_ok:
action_client.send_goal(sid=sid, action="RecordingDemo", command="rdConfigure")
else: # command in [CommandType.STOP_AND_CANCEL, CommandType.STOP_AND_SAVE]:
action_client.send_goal(sid=sid, action="RecordingDemo", command="rdStop")
ret = action_client.res_ok
action_client.destroy_node()
rclpy.shutdown()
return ret
if __name__ == '__main__':
recording_demo()

View file

@ -4,7 +4,7 @@
ROS 2 program for recording a demo via RosBag
@shalenikol release 0.3
!!! In "Humble" writer.close() not worked !!! Waiting "Jazzy"
@shalenikol release 0.4 In "Jazzy" writer.close()
"""
import os
import shutil
@ -16,16 +16,19 @@ from rclpy.lifecycle import Node
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
import rosbag2_py
import rosbag2_py # 0.26.7 (jazzy)
from ros2topic.api import get_topic_names_and_types
from rclpy.serialization import serialize_message
from sensor_msgs.msg import Image, JointState
from rbs_utils.recording_demo import CommandType
NODE_NAME_DEFAULT = "lc_record_demo"
PARAM_SKILL_CFG = "lc_record_demo_cfg"
OUTPUT_PATH_DEFAULT = "rbs_bag"
RECORD_PATH = "episode"
WRITER_ID = "sqlite3"
TOPIC_TYPES = ["sensor_msgs/msg/JointState", "sensor_msgs/msg/Image"]
TOPIC_CLASSES = [JointState, Image]
@ -62,32 +65,42 @@ class RecordingDemo(Node):
return topic_list
def check_output_folder(self, path: str) -> None:
def init_output_folder(self, path: str) -> str:
# check for output folder existence
if os.path.isdir(path):
if not os.path.isdir(path):
os.makedirs(path)
rec_folder = os.path.join(path, RECORD_PATH)
if os.path.isdir(rec_folder):
shutil.rmtree(rec_folder)
return rec_folder
def save_episode(self) -> None:
if os.path.isdir(self.rec_folder):
x = 1
while os.path.isdir(path + str(x)):
while os.path.isdir(self.rec_folder + str(x)):
x += 1
shutil.move(path, path + str(x))
shutil.move(self.rec_folder, self.rec_folder + str(x))
def on_configure(self, state: State) -> TransitionCallbackReturn:
self.check_output_folder(self.output_path)
self.rec_folder = self.init_output_folder(self.output_path)
self.writer = rosbag2_py.SequentialWriter()
storage_options = rosbag2_py._storage.StorageOptions(uri=self.output_path, storage_id=WRITER_ID)
storage_options = rosbag2_py._storage.StorageOptions(uri=self.rec_folder, storage_id=WRITER_ID)
converter_options = rosbag2_py._storage.ConverterOptions("", "")
self.writer.open(storage_options, converter_options)
self.topics = self.get_list_topics()
for id, topic in enumerate(self.topics):
topic_info = rosbag2_py._storage.TopicMetadata(
id,
name=topic["name"],
type=topic["type"],
serialization_format="cdr")
self.writer.create_topic(topic_info)
self.get_logger().info(f"on_configure: {id}) {topic}")
self.get_logger().info(f"Topics: {id}) {topic}")
# self.get_logger().info("on_configure() is called.")
self.get_logger().info("on_configure() is called.")
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
@ -117,10 +130,24 @@ class RecordingDemo(Node):
self.destroy_subscription(sub)
self._sub.clear()
# self.writer.close() # / del self.writer
self.writer.close()
do_param = CommandType.STOP_AND_SAVE # default
json_param = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(json_param)
for comm in self.skill_cfg["BTAction"]:
if comm["name"] == "rdStop":
for par in comm["param"]:
if par["type"] == "formBuilder":
do_param = par["dependency"]["output"]["do"]
if do_param in [CommandType.STOP_AND_SAVE, CommandType.SAVE_AND_NEXT]:
self.save_episode()
elif do_param == CommandType.STOP_AND_CANCEL:
shutil.rmtree(self.rec_folder)
self.get_logger().info("on_cleanup() is called")
self.timer = self.create_timer(0.01, self.timer_callback)
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
@ -137,14 +164,6 @@ class RecordingDemo(Node):
serialize_message(msg),
self.get_clock().now().nanoseconds)
def timer_callback(self):
self.get_logger().info("Restarting node..")
self.destroy_node()
rclpy.shutdown()
rclpy.init()
lc_node = RecordingDemo()
rclpy.spin(lc_node)
def main():
rclpy.init()