runtime/rbs_bt_executor/scripts/bt_param.py

64 lines
2.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
btParam_node only for passing the path to the interface node
ROS 2 program for Robossembler
@shalenikol release 0.1
"""
# import os
import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.parameter import Parameter
from rcl_interfaces.srv import SetParameters
NODE_NAME = "bt_param"
NODE_INTERFACE = "rbs_interface"
PARAM_BT = "bt_path"
class btParam(Node):
def __init__(self, node_name):
"""Construct the node."""
super().__init__(node_name)
self.declare_parameter(PARAM_BT, rclpy.Parameter.Type.STRING)
self.cb_group = ReentrantCallbackGroup()
timer_period = 0.1 # seconds
self._timer = self.create_timer(timer_period, self.timer_callback, self.cb_group)
def timer_callback(self):
bt_path = self.get_parameter(PARAM_BT).get_parameter_value().string_value
if not self.set_remote_parameter(NODE_INTERFACE, PARAM_BT, bt_path):
# self.get_logger().info(f"'{NODE_NAME}' set param '{bt_path}'")
# else:
self.get_logger().info("Error 'set_remote_parameter'")
self.destroy_timer(self._timer)
def set_remote_parameter(self, remote_node_name: str, parameter_name: str, new_parameter_value) -> bool:
self.cli = self.create_client(SetParameters, remote_node_name + "/set_parameters")
while not self.cli.wait_for_service(timeout_sec=1.1):
self.get_logger().info("'" + remote_node_name + "' service not available, waiting again...")
req = SetParameters.Request()
req.parameters = [Parameter(parameter_name, value=new_parameter_value).to_parameter_msg()]
future = self.cli.call_async(req)
self.executor.spin_until_future_complete(future)
res = future.result()
return res.results[0].successful
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
# executor = rclpy.executors.MultiThreadedExecutor()
i_node = btParam(NODE_NAME)
executor.add_node(i_node)
try:
executor.spin_once()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
i_node.destroy_node()
if __name__ == '__main__':
main()