#!/usr/bin/env python3 """ rbsInterface_node ROS 2 program for Robossembler @shalenikol release 0.1 @shalenikol release 0.2 BT v.4 @shalenikol release 0.3 synchronize """ import os import json import rclpy from rclpy.node import Node from rclpy.action import ActionServer from rclpy.callback_groups import ReentrantCallbackGroup from ament_index_python.packages import get_package_share_directory from rclpy.parameter import Parameter from rcl_interfaces.srv import SetParameters #, GetParameters from rcl_interfaces.msg import SetParametersResult #, ParameterEvent from lifecycle_msgs.srv import ChangeState from lifecycle_msgs.msg import Transition from rbs_skill_interfaces.srv import RbsBt from rbs_skill_interfaces.action import RbsBt as RbsBtAction # from rclpy.parameter_client import AsyncParameterClient # only Iron CONDITION_SRV_NAME = "/condition" COMPLETION_SRV_NAME = "/completion" BT_PARAM = "bt_path" NODE_NAME = "rbs_interface" SERVICE_NAME = "rbs_interface_s" SERVER_NAME = "rbs_interface_a" FILE_SKILLS = "skills.json" PARAM_SUFFIX = "_cfg" KEY_BTPARAM = "BTAction" class rbsInterface(Node): def __init__(self, node_name): """Construct the node.""" self.bt_path = "" # path to the current BehaviorTree self.cfg_data = None # config for current action super().__init__(node_name) self.declare_parameter(BT_PARAM, rclpy.Parameter.Type.STRING) self.cb_group = ReentrantCallbackGroup() # for Condition self._service = self.create_service(RbsBt, SERVICE_NAME, self.service_callback, callback_group=self.cb_group) # for Action self._action = ActionServer(self, RbsBtAction, SERVER_NAME, self.action_callback, callback_group=self.cb_group) # self.client = AsyncParameterClient(self.client_node, 'test_parameter_client_target') # only Iron self.add_on_set_parameters_callback(self._on_set_btpath_param) def get_transfer_path(self): if self.bt_path: return self.bt_path return os.path.join(get_package_share_directory("rbs_bt_executor"), "config") def _on_set_btpath_param(self, parameter_list): for parameter in parameter_list: if parameter.name == BT_PARAM: self.bt_path = parameter.value # self.get_logger().info(f'$ {parameter.name}={parameter.value}') return SetParametersResult(successful=True) # def get_remote_parameter(self, remote_node_name, param_name): # cli = self.create_client(GetParameters, remote_node_name + '/get_parameters') # while not cli.wait_for_service(timeout_sec=1.0): # self.get_logger().info('service not available, waiting again...') # req = GetParameters.Request() # req.names = [param_name] # future = cli.call_async(req) # while rclpy.ok(): # rclpy.spin_once(self) # if future.done(): # try: # res = future.result() # return getattr(res.values[0], self.type_arr[res.values[0].type]) # except Exception as e: # self.get_logger().warn('Service call failed %r' % (e,)) # break def set_remote_parameter(self, remote_node_name: str, parameter_name: str, new_parameter_value) -> bool: self.cli = self.create_client(SetParameters, remote_node_name + "/set_parameters") while not self.cli.wait_for_service(timeout_sec=1.0): self.get_logger().info("'" + remote_node_name + "' service not available, waiting again...") req = SetParameters.Request() req.parameters = [Parameter(parameter_name, value=new_parameter_value).to_parameter_msg()] future = self.cli.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() return res.results[0].successful def _deserialize(self, file_path: str, sid: str): with open(file_path, "r") as f: # if file_path.split() == ".yaml": # s = yaml.load(f, Loader=yaml.FullLoader) # else: # ".json" data = json.load(f) for skill in data["skills"]: if skill["sid"] == sid: return skill assert False, f"Error: sid not valid '{sid}'" def _load_config(self, sid: str): p = os.path.join(self.get_transfer_path(), FILE_SKILLS) # action+".json") # load config return self._deserialize(p,sid) def run_action(self, command_data: dict) -> bool: p_list = command_data["param"] oper_type = command_data["type"] node_name = self.cfg_data["Module"]["node_name"] par_name = node_name + PARAM_SUFFIX if len(p_list) > 0: data = json.dumps(self.cfg_data) if not self.set_remote_parameter(node_name, par_name, data): return False ret = False # default return value if oper_type == "run": self.cli_changestate = self.create_client(ChangeState, f"/{node_name}/change_state") #, callback_group=self.cb_group) while not self.cli_changestate.wait_for_service(timeout_sec=1.0): self.get_logger().info(f"'{node_name}' not available... wait") req = ChangeState.Request() req.transition.id = Transition.TRANSITION_CONFIGURE future = self.cli_changestate.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() if res: # is not None: if res.success: req = ChangeState.Request() req.transition.id = Transition.TRANSITION_ACTIVATE future = self.cli_changestate.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() if res: # is not None: ret = res.success elif oper_type == "stop": self.cli_changestate = self.create_client(ChangeState, f"{node_name}/change_state") #, callback_group=self.cb_group) while not self.cli_changestate.wait_for_service(timeout_sec=1.0): self.get_logger().info(f"'{node_name}' not available... wait") req = ChangeState.Request() req.transition.id = Transition.TRANSITION_DEACTIVATE future = self.cli_changestate.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() if res: # is not None: if res.success: req = ChangeState.Request() req.transition.id = Transition.TRANSITION_CLEANUP future = self.cli_changestate.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() if res: # is not None: ret = res.success elif oper_type == "action": # cnt = 0 while not self.run_condition(COMPLETION_SRV_NAME, command_data["name"]): pass # cnt += 1 # self.get_logger().info(f"action + run_condition: {cnt}") ret = True return ret def run_condition(self, srv_name: str, command: str) -> bool: srv_topic = self.cfg_data["Module"]["node_name"] + srv_name self.cli = self.create_client(RbsBt, srv_topic) while not self.cli.wait_for_service(timeout_sec=1.0): self.get_logger().info(f"'{srv_topic}' service not available, waiting again...") req = RbsBt.Request() req.command = command future = self.cli.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() return res.ok def _interface(self, sid: str, action: str, command: str, isCondition: bool) -> bool: self.cfg_data = self._load_config(sid) typeBTnode = "Condition" if isCondition else "Action" self.get_logger().info(f'Incoming request ({action}/{command}) {typeBTnode} ({self.cfg_data["Module"]["description"]})') is_success = False for comm in self.cfg_data[KEY_BTPARAM]: if comm["name"] == command: is_success = self.run_condition(CONDITION_SRV_NAME, command) if isCondition else self.run_action(comm) return is_success def action_callback(self, goal_h): goal = goal_h.request is_success = self._interface(goal.sid, goal.action, goal.command, isCondition=False) res = RbsBtAction.Result() res.ok = is_success if is_success: goal_h.succeed() else: goal_h.abort() return res def service_callback(self, request, response): response.ok = self._interface(request.sid, request.action, request.command, isCondition=True) return response def main(): rclpy.init() executor = rclpy.executors.SingleThreadedExecutor() # executor = rclpy.executors.MultiThreadedExecutor() # can't be used i_node = rbsInterface(NODE_NAME) executor.add_node(i_node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): i_node.destroy_node() if __name__ == '__main__': main()