ADD: Node for recording demo in BT

This commit is contained in:
shalenikol 2024-09-17 08:48:20 +00:00 committed by Igor Brylev
parent b4b452297d
commit 6ad8e387a3
4 changed files with 255 additions and 0 deletions

View file

@ -13,9 +13,11 @@ find_package(tf2_ros REQUIRED)
find_package(tf2_eigen REQUIRED)
find_package(rviz_visual_tools REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(rbs_utils_interfaces REQUIRED)
find_package(dynmsg REQUIRED)
find_package(rclpy REQUIRED)
set(deps
rclcpp
@ -24,6 +26,7 @@ set(deps
tf2_eigen
rviz_visual_tools
geometry_msgs
sensor_msgs
rbs_utils_interfaces
dynmsg
)
@ -34,6 +37,7 @@ ament_python_install_package(${PROJECT_NAME})
install(PROGRAMS
scripts/assembly_config_service.py
scripts/recording_demo_via_rosbag.py
DESTINATION lib/${PROJECT_NAME}
)

View file

@ -9,6 +9,7 @@
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>rclpy</depend>
<depend>rclcpp</depend>
<depend>rclcpp_lifecycle</depend>
<depend>tf2_ros</depend>
@ -19,6 +20,7 @@
<depend>std_msgs</depend>
<depend>rbs_utils_interfaces</depend>
<depend>dynmsg</depend>
<depend>sensor_msgs</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>

View file

@ -0,0 +1,89 @@
### Пример запуска записи демонстрации в RosBag2
Подготовить папку с файлами BT v.4
* Папка /path/to/bt/
* bt.xml
```xml
<?xml version="1.0" encoding="UTF-8"?>
<root BTCPP_format="4">
<BehaviorTree ID="Main">
<Sequence>
<Action ID="RbsAction" do="RecordingDemo" command="rdConfigure" sid="a"></Action>
<Action ID="RbsAction" do="MoveToPose" command="move" sid="c"></Action>
<Action ID="RbsAction" do="RecordingDemo" command="rdStop" sid="a"></Action>
</Sequence>
</BehaviorTree>
<TreeNodesModel>
<Action ID="RbsAction">
<input_port name="do"/>
<input_port name="command"/>
<input_port name="sid"/>
</Action>
</TreeNodesModel>
</root>
```
* skills.json
```json
{
"skills": [
{
"sid": "a",
"SkillPackage": { "name": "Robossembler", "version": "1.0", "format": "1" },
"Module": { "name": "RecordingDemo", "description": "Recording a demo via RosBag", "node_name": "lc_record_demo" },
"Launch": { "package": "rbs_utils", "executable": "recording_demo_via_rosbag.py" },
"BTAction": [
{
"name": "rdConfigure",
"type_action": "action",
"type": "run",
"param": [],
"result":[]
},
{ "name": "rdStop", "type_action": "action", "type": "stop", "param": [], "result": [] }
],
"Settings": [
{ "name": "output_path", "value": "rbs_testbag", "description": "!!! Путь, куда запишется rosbag. Если он уже есть - перетрётся!" }
]
},
{
"sid": "c",
"SkillPackage": { "name": "Robossembler", "version": "1.0", "format": "1" },
"Module": { "name": "MoveToPose", "description": "Move to Pose skill with cartesian controllers", "node_name": "mtp_cartesian" },
"Launch": { "package": "rbss_movetopose", "executable": "mtp_cartesian.py" },
"BTAction": [
{
"name": "move",
"type_action": "action",
"type": "action",
"param": [
{
"type": "move_to_pose",
"dependency": { "robot_name": "arm0",
"pose": { "position": {"x":0.1, "y":0.1, "z":0.9}, "orientation": {"x":0.55, "y":0.0, "z":0.45, "w": 1.0} } }
}
]
}
],
"Settings": [
{ "name": "server_name", "value": "cartesian_move_to_pose" },
{ "name": "end_effector_velocity", "value": 1.0 },
{ "name": "end_effector_acceleration", "value": 1.0 }
]
}
]
}
```
Запустить последовательно три терминала и выполнить запуск 3-х launch-файлов:
* 1 - запуск интерфейсной ноды с серверами навыков
```bash
ros2 launch rbs_interface interface.launch.py bt_path:=/path/to/bt
```
* 2 - запуск робота в сцене
```bash
ros2 launch rbs_bringup single_robot.launch.py
```
* 3 - запуск дерева поведения
```bash
ros2 launch rbs_bt_executor rbs_bt_web.launch.py bt_path:=/path/to/bt
```

View file

@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""
recording_demo_lifecycle_node_via_RosBag
ROS 2 program for recording a demo via RosBag
@shalenikol release 0.1
"""
import os
import shutil
import json
from functools import partial
import rclpy
from rclpy.lifecycle import Node
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
import rosbag2_py
from ros2topic.api import get_topic_names_and_types
from rclpy.serialization import serialize_message
from sensor_msgs.msg import Image, JointState
NODE_NAME_DEFAULT = "lc_record_demo"
PARAM_SKILL_CFG = "lc_record_demo_cfg"
OUTPUT_PATH_DEFAULT = "my_bag"
WRITER_ID = "sqlite3"
TOPIC_TYPES = ["sensor_msgs/msg/JointState", "sensor_msgs/msg/Image"]
TOPIC_CLASSES = [JointState, Image]
class RecordingDemo(Node):
""" lifecycle node """
def __init__(self, **kwargs):
"""Construct the node."""
self.mode: int = 0 # 0 - нет записи, 1 - есть запись
self.topics = []
self._sub = []
self.output_path = OUTPUT_PATH_DEFAULT
# for other nodes
kwargs["allow_undeclared_parameters"] = True
kwargs["automatically_declare_parameters_from_overrides"] = True
super().__init__(NODE_NAME_DEFAULT, **kwargs)
str_cfg = self.get_parameter(PARAM_SKILL_CFG).get_parameter_value().string_value
self.skill_cfg = json.loads(str_cfg)
sets = self.skill_cfg["Settings"]
for s in sets:
if s["name"] == "output_path":
self.output_path = s["value"]
if os.path.isdir(self.output_path):
shutil.rmtree(self.output_path)
self.writer = rosbag2_py.SequentialWriter()
storage_options = rosbag2_py._storage.StorageOptions(
uri=self.output_path,
storage_id=WRITER_ID)
converter_options = rosbag2_py._storage.ConverterOptions("", "")
self.writer.open(storage_options, converter_options)
def get_list_topics(self) -> list:
topic_names_and_types = get_topic_names_and_types(node=self, include_hidden_topics=False)
topic_list = []
for (topic_name, topic_types) in topic_names_and_types:
if topic_types[0] in TOPIC_TYPES:
i = TOPIC_TYPES.index(topic_types[0])
topic_list.append({"name": topic_name, "type": topic_types[0], "class": TOPIC_CLASSES[i]})
return topic_list
def on_configure(self, state: State) -> TransitionCallbackReturn:
"""
Configure the node, after a configuring transition is requested.
:return: The state machine either invokes a transition to the "inactive" state or stays
in "unconfigured" depending on the return value.
TransitionCallbackReturn.SUCCESS transitions to "inactive".
TransitionCallbackReturn.FAILURE transitions to "unconfigured".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
self.topics = self.get_list_topics()
for id, topic in enumerate(self.topics):
topic_info = rosbag2_py._storage.TopicMetadata(
name=topic["name"],
type=topic["type"],
serialization_format="cdr")
self.writer.create_topic(topic_info)
self.get_logger().info(f"on_configure: {id}) {topic}")
# self.get_logger().info("on_configure() is called.")
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_activate() is called.')
# Create the subscribers.
for id, topic in enumerate(self.topics):
sub = self.create_subscription(topic["class"], topic["name"],
partial(self.topic_callback, topic["name"]),
10)
self._sub.append(sub)
self.mode += 1
return super().on_activate(state)
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_deactivate() is called.')
# Destroy the subscribers.
for sub in self._sub:
self.destroy_subscription(sub)
self._sub.clear()
self.mode -= 1
return super().on_deactivate(state)
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
"""
Cleanup the node.
"""
for sub in self._sub:
self.destroy_subscription(sub)
self._sub.clear()
self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
"""
Shutdown the node.
"""
for sub in self._sub:
self.destroy_subscription(sub)
self._sub.clear()
self.get_logger().info('on_shutdown() is called.')
return TransitionCallbackReturn.SUCCESS
def topic_callback(self, topic_name, msg):
self.writer.write(
topic_name,
serialize_message(msg),
self.get_clock().now().nanoseconds)
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
lc_node = RecordingDemo()
executor.add_node(lc_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
lc_node.destroy_node()
if __name__ == '__main__':
main()