runtime/rbs_perception/scripts/rbs_interface.py

222 lines
No EOL
9.1 KiB
Python
Executable file

#!/usr/bin/env python3
"""
rbsInterface_node
ROS 2 program for Robossembler
@shalenikol release 0.1
"""
import os
import json
import yaml
import rclpy
from rclpy.node import Node
# from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.callback_groups import ReentrantCallbackGroup
from ament_index_python.packages import get_package_share_directory
from rclpy.parameter import Parameter
from rcl_interfaces.srv import SetParameters #, GetParameters
# from rcl_interfaces.msg import Parameter, ParameterValue
# rcl_interfaces.msg.ParameterValue
from lifecycle_msgs.srv import ChangeState, GetState
from lifecycle_msgs.msg import Transition
# from lifecycle_msgs.msg import State
from rbs_skill_interfaces.srv import RbsBt
# from ros2node.api import get_node_names as get_all_node_names
PARAM_NAME = "str_param"
PARAM_SKILL_CONFIG = "skill_cfg"
def get_transfer_path_():
return os.path.join(get_package_share_directory("rbs_perception"), "config")
class rbsInterface(Node):
def __init__(self, node_name):
"""Construct the node."""
self._i_exe = 0 # run index
self._timer = None # defer run after callback
# self._cli_changestate = None # client for lifecycle node
self.cfg_data = None # config for current action
super().__init__(node_name)
# self.declare_parameter(PARAM_NAME, rclpy.Parameter.Type.STRING)
self.cb_group = ReentrantCallbackGroup()
self._service = self.create_service(RbsBt, "rbs_interface", self.service_callback, callback_group=self.cb_group)
# def get_remote_parameter(self, remote_node_name, param_name):
# cli = self.create_client(GetParameters, remote_node_name + '/get_parameters')
# while not cli.wait_for_service(timeout_sec=1.0):
# self.get_logger().info('service not available, waiting again...')
# req = GetParameters.Request()
# req.names = [param_name]
# future = cli.call_async(req)
# while rclpy.ok():
# rclpy.spin_once(self)
# if future.done():
# try:
# res = future.result()
# return getattr(res.values[0], self.type_arr[res.values[0].type])
# except Exception as e:
# self.get_logger().warn('Service call failed %r' % (e,))
# break
def set_remote_parameter(self, remote_node_name: str, parameter_name: str, new_parameter_value):
self.cli = self.create_client(SetParameters, remote_node_name + "/set_parameters")
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info("'" + remote_node_name + "' service not available, waiting again...")
req = SetParameters.Request()
req.parameters = [Parameter(parameter_name, value=new_parameter_value).to_parameter_msg()]
future = self.cli.call_async(req)
self.executor.spin_until_future_complete(future)
res = future.result()
# self.get_logger().info(f"result : {type(res)}/{res}")
return res.results[0].successful
# while rclpy.ok():
# rclpy.spin_once(self)
# if future.done():
# try:
# res = future.result()
# return res.results[0].successful
# except Exception as e:
# self.get_logger().warn("Service call failed %r" % (e,))
# break
def _deserialize(self, file_path: str):
with open(file_path, "r") as f:
if file_path.split() == ".yaml":
s = yaml.load(f, Loader=yaml.FullLoader)
else: # ".json"
s = json.load(f)
return s
def _load_config(self, action: str): #, command: str):
p = os.path.join(get_transfer_path_(), action+".json")
# load config
return self._deserialize(p)
def run_action(self, command_data: dict) -> bool:
p_list = command_data["param"]
oper_type = command_data["type"]
node_name = self.cfg_data["ROS2"]["node_name"]
if len(p_list) > 0:
ext = command_data["format"] # 'yaml' or 'json'
param_file = os.path.join(get_transfer_path_(), command_data["name"]+"."+ext)
with open(param_file, "r") as f:
data = f.read()
if not self.set_remote_parameter(node_name, PARAM_NAME, data):
return False
if not self.set_remote_parameter(node_name, PARAM_SKILL_CONFIG, yaml.dump(self.cfg_data)):
return False
ret = False # default return value
if oper_type == "run":
# if not self._cli_changestate:
# self.cb_group = ReentrantCallbackGroup()
self.cli_changestate = self.create_client(ChangeState, f"{node_name}/change_state") #, callback_group=self.cb_group)
# self._cli_changestate = self.create_client(GetState, f"{node_name}/get_state")
while not self.cli_changestate.wait_for_service(timeout_sec=1.0):
self.get_logger().info(f"'{node_name}' not available... wait")
req = ChangeState.Request()
# self._req = GetState.Request()
req.transition.id = Transition.TRANSITION_CONFIGURE
# self._i_exe = 1 # call_async(self._req)
# self._timer = self.create_timer(0.0001, self.timer_callback)
future = self.cli_changestate.call_async(req)
self.executor.spin_until_future_complete(future)
res = future.result()
if res: # is not None:
if res.success:
req = ChangeState.Request()
req.transition.id = Transition.TRANSITION_ACTIVATE
future = self.cli_changestate.call_async(req)
self.executor.spin_until_future_complete(future)
res = future.result()
if res: # is not None:
ret = res.success
# self.cli_changestate.destroy()
# else:
# return False # state = future.exception()
# self.get_logger().info(f"state : {type(state)}/{state}")
elif oper_type == "stop":
self.cli_changestate = self.create_client(ChangeState, f"{node_name}/change_state") #, callback_group=self.cb_group)
while not self.cli_changestate.wait_for_service(timeout_sec=1.0):
self.get_logger().info(f"'{node_name}' not available... wait")
req = ChangeState.Request()
req.transition.id = Transition.TRANSITION_DEACTIVATE
future = self.cli_changestate.call_async(req)
self.executor.spin_until_future_complete(future)
res = future.result()
if res: # is not None:
if res.success:
req = ChangeState.Request()
req.transition.id = Transition.TRANSITION_CLEANUP
future = self.cli_changestate.call_async(req)
self.executor.spin_until_future_complete(future)
res = future.result()
if res: # is not None:
ret = res.success
# self.cli_changestate.destroy()
return ret
def timer_callback(self):
if self._i_exe == 1:
self._i_exe = 2
# self.get_logger().info(f"{self._oper_type} timer_callback 1")
# self._future = self._cli_changestate.call_async(self._req)
# self.get_logger().info(f"self._future : {type(self._future)}/{self._future}")
elif self._i_exe == 2:
# self.get_logger().info(f"{self._oper_type} timer_callback 2")
responce = True #self._future.result()
if responce:
# self.get_logger().info(f"responce : {responce}")
self._i_exe == 0
self._timer.cancel()
else:
self._timer.cancel()
# def changestate_callback(self):
# self.get_logger().info(f"changestate_callback is")
def service_callback(self, request, response):
self.get_logger().info(f"Incoming request for Action ({request.action}/{request.command})")
self.cfg_data = self._load_config(request.action) #, request.command)
self.get_logger().info(f'Config: Ok ({self.cfg_data["Module"]["description"]})')
is_action = False
for comm in self.cfg_data["BTAction"]:
if comm["name"] == request.command:
is_action = self.run_action(comm)
# if not os.path.isfile(request.object.mesh_path):
# response.call_status = False
# response.error_msg = f"{request.object.mesh_path}: no such file"
response.ok = is_action #True
return response
def main():
rclpy.init()
# i_node = rbsInterface("rbs_interface")
# rclpy.spin(i_node)
# rclpy.shutdown()
# executor = rclpy.executors.SingleThreadedExecutor()
executor = rclpy.executors.MultiThreadedExecutor()
i_node = rbsInterface("rbs_interface")
executor.add_node(i_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
i_node.destroy_node()
if __name__ == '__main__':
main()