#!/usr/bin/env python3 """ rbsInterface_node ROS 2 program for Robossembler @shalenikol release 0.1 """ import os import json import yaml import rclpy from rclpy.node import Node # from rclpy.action import ActionServer, CancelResponse, GoalResponse from rclpy.callback_groups import ReentrantCallbackGroup from ament_index_python.packages import get_package_share_directory from rclpy.parameter import Parameter from rcl_interfaces.srv import SetParameters #, GetParameters # from rcl_interfaces.msg import Parameter, ParameterValue # rcl_interfaces.msg.ParameterValue from lifecycle_msgs.srv import ChangeState, GetState from lifecycle_msgs.msg import Transition # from lifecycle_msgs.msg import State from rbs_skill_interfaces.srv import RbsBt # from ros2node.api import get_node_names as get_all_node_names PARAM_NAME = "str_param" PARAM_SKILL_CONFIG = "skill_cfg" def get_transfer_path_(): return os.path.join(get_package_share_directory("rbs_perception"), "config") class rbsInterface(Node): def __init__(self, node_name): """Construct the node.""" self._i_exe = 0 # run index self._timer = None # defer run after callback # self._cli_changestate = None # client for lifecycle node self.cfg_data = None # config for current action super().__init__(node_name) # self.declare_parameter(PARAM_NAME, rclpy.Parameter.Type.STRING) self.cb_group = ReentrantCallbackGroup() self._service = self.create_service(RbsBt, "rbs_interface", self.service_callback, callback_group=self.cb_group) # def get_remote_parameter(self, remote_node_name, param_name): # cli = self.create_client(GetParameters, remote_node_name + '/get_parameters') # while not cli.wait_for_service(timeout_sec=1.0): # self.get_logger().info('service not available, waiting again...') # req = GetParameters.Request() # req.names = [param_name] # future = cli.call_async(req) # while rclpy.ok(): # rclpy.spin_once(self) # if future.done(): # try: # res = future.result() # return getattr(res.values[0], self.type_arr[res.values[0].type]) # except Exception as e: # self.get_logger().warn('Service call failed %r' % (e,)) # break def set_remote_parameter(self, remote_node_name: str, parameter_name: str, new_parameter_value): self.cli = self.create_client(SetParameters, remote_node_name + "/set_parameters") while not self.cli.wait_for_service(timeout_sec=1.0): self.get_logger().info("'" + remote_node_name + "' service not available, waiting again...") req = SetParameters.Request() req.parameters = [Parameter(parameter_name, value=new_parameter_value).to_parameter_msg()] future = self.cli.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() # self.get_logger().info(f"result : {type(res)}/{res}") return res.results[0].successful # while rclpy.ok(): # rclpy.spin_once(self) # if future.done(): # try: # res = future.result() # return res.results[0].successful # except Exception as e: # self.get_logger().warn("Service call failed %r" % (e,)) # break def _deserialize(self, file_path: str): with open(file_path, "r") as f: if file_path.split() == ".yaml": s = yaml.load(f, Loader=yaml.FullLoader) else: # ".json" s = json.load(f) return s def _load_config(self, action: str): #, command: str): p = os.path.join(get_transfer_path_(), action+".json") # load config return self._deserialize(p) def run_action(self, command_data: dict) -> bool: p_list = command_data["param"] oper_type = command_data["type"] node_name = self.cfg_data["ROS2"]["node_name"] if len(p_list) > 0: ext = command_data["format"] # 'yaml' or 'json' param_file = os.path.join(get_transfer_path_(), command_data["name"]+"."+ext) with open(param_file, "r") as f: data = f.read() if not self.set_remote_parameter(node_name, PARAM_NAME, data): return False if not self.set_remote_parameter(node_name, PARAM_SKILL_CONFIG, yaml.dump(self.cfg_data)): return False ret = False # default return value if oper_type == "run": # if not self._cli_changestate: # self.cb_group = ReentrantCallbackGroup() self.cli_changestate = self.create_client(ChangeState, f"{node_name}/change_state") #, callback_group=self.cb_group) # self._cli_changestate = self.create_client(GetState, f"{node_name}/get_state") while not self.cli_changestate.wait_for_service(timeout_sec=1.0): self.get_logger().info(f"'{node_name}' not available... wait") req = ChangeState.Request() # self._req = GetState.Request() req.transition.id = Transition.TRANSITION_CONFIGURE # self._i_exe = 1 # call_async(self._req) # self._timer = self.create_timer(0.0001, self.timer_callback) future = self.cli_changestate.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() if res: # is not None: if res.success: req = ChangeState.Request() req.transition.id = Transition.TRANSITION_ACTIVATE future = self.cli_changestate.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() if res: # is not None: ret = res.success # self.cli_changestate.destroy() # else: # return False # state = future.exception() # self.get_logger().info(f"state : {type(state)}/{state}") elif oper_type == "stop": self.cli_changestate = self.create_client(ChangeState, f"{node_name}/change_state") #, callback_group=self.cb_group) while not self.cli_changestate.wait_for_service(timeout_sec=1.0): self.get_logger().info(f"'{node_name}' not available... wait") req = ChangeState.Request() req.transition.id = Transition.TRANSITION_DEACTIVATE future = self.cli_changestate.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() if res: # is not None: if res.success: req = ChangeState.Request() req.transition.id = Transition.TRANSITION_CLEANUP future = self.cli_changestate.call_async(req) self.executor.spin_until_future_complete(future) res = future.result() if res: # is not None: ret = res.success # self.cli_changestate.destroy() return ret def timer_callback(self): if self._i_exe == 1: self._i_exe = 2 # self.get_logger().info(f"{self._oper_type} timer_callback 1") # self._future = self._cli_changestate.call_async(self._req) # self.get_logger().info(f"self._future : {type(self._future)}/{self._future}") elif self._i_exe == 2: # self.get_logger().info(f"{self._oper_type} timer_callback 2") responce = True #self._future.result() if responce: # self.get_logger().info(f"responce : {responce}") self._i_exe == 0 self._timer.cancel() else: self._timer.cancel() # def changestate_callback(self): # self.get_logger().info(f"changestate_callback is") def service_callback(self, request, response): self.get_logger().info(f"Incoming request for Action ({request.action}/{request.command})") self.cfg_data = self._load_config(request.action) #, request.command) self.get_logger().info(f'Config: Ok ({self.cfg_data["Module"]["description"]})') is_action = False for comm in self.cfg_data["BTAction"]: if comm["name"] == request.command: is_action = self.run_action(comm) # if not os.path.isfile(request.object.mesh_path): # response.call_status = False # response.error_msg = f"{request.object.mesh_path}: no such file" response.ok = is_action #True return response def main(): rclpy.init() # i_node = rbsInterface("rbs_interface") # rclpy.spin(i_node) # rclpy.shutdown() # executor = rclpy.executors.SingleThreadedExecutor() executor = rclpy.executors.MultiThreadedExecutor() i_node = rbsInterface("rbs_interface") executor.add_node(i_node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): i_node.destroy_node() if __name__ == '__main__': main()