Refactor robot arm and runtime initialization logic

- Updated error message in `rbs_arm.py` to use the complete joint names for improved clarity.
- Refactored `scene.py` to replace asynchronous parameter retrieval with synchronous methods for reliability and simplified implementation.
- Enhanced `runtime.launch.py`:
  - Added new launch arguments `use_rbs_utils` and `assembly_config_name`.
  - Included additional parameters in runtime node initialization.
  - Adjusted runtime node arguments for debugging and clarity.
- Updated `runtime.py`:
  - Introduced `MultiThreadedExecutor` for improved threading and node handling.
  - Refactored the `main` function for cleaner node lifecycle management.
This commit is contained in:
Ilya Uraev 2024-12-06 12:10:35 +03:00
parent 1addd4f595
commit b1e20696fe
4 changed files with 61 additions and 58 deletions

View file

@ -150,4 +150,4 @@ class RbsArm(RobotWrapper):
for positions, joint_names in joint_position_data:
print(f"Setting joint positions for {joint_names}: {positions}")
if not model.reset_joint_positions(positions, joint_names):
raise RuntimeError(f"Failed to set initial positions of {joint_names[0].split('_')[0]}'s joints")
raise RuntimeError(f"Failed to set initial positions of {joint_names}'s joints")

View file

@ -90,12 +90,12 @@ class Scene:
self.param_client = node.create_client(
GetParameters, "/robot_state_publisher/get_parameters"
)
asyncio.run(self.get_parameter_async())
scene.robot.urdf_string = (
node.get_parameter("robot_description")
.get_parameter_value()
.string_value
)
self.get_parameter_sync()
# scene.robot.urdf_string = (
# node.get_parameter("robot_description")
# .get_parameter_value()
# .string_value
# )
else:
scene.robot.urdf_string = robot_urdf_string
@ -122,44 +122,32 @@ class Scene:
# self.marker_array = MarkerArray()
async def get_parameter_async(self):
def get_parameter_sync(self):
"""
Asynchronously retrieves the robot_description parameter from the robot state publisher.
This method waits for the parameter service to become available and then sends
a request to retrieve the robot_description parameter. If the parameter is found,
it updates the local node parameter.
Raises:
RuntimeError: If the robot_description parameter is not found or if the
response type is unexpected.
Retrieves the `robot_description` parameter synchronously, waiting for the asynchronous call to complete.
"""
while not self.param_client.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info(
"Waiting for parameter service [robot_state_publisher]..."
)
while not self.param_client.wait_for_service(timeout_sec=5.0):
self.node.get_logger().info("Service /robot_state_publisher/get_parameters is unavailable, waiting ...")
self.node.get_logger().info("Sending request for parameter...")
future = self.param_client.call_async(self.create_get_parameters_request())
request = self.create_get_parameters_request()
future = self.param_client.call_async(request)
rclpy.spin_until_future_complete(self.node, future)
response = await future
if future.result() is None:
raise RuntimeError("Failed to retrieve the robot_description parameter")
match response:
case None:
raise RuntimeError(
"robot_description parameter not found in robot_state_publisher"
)
case GetParameters.Response(values=[value]) if value.string_value:
self.node.get_logger().info(f"Got parameter: {value.string_value}")
param = Parameter(
"robot_description", Parameter.Type.STRING, value.string_value
)
self.node.set_parameters([param])
case _:
raise RuntimeError(
f"I don't know what but the type of robot_description parameter is {type(response)}"
)
response = future.result()
if not response.values or not response.values[0].string_value:
raise RuntimeError("The robot_description parameter is missing or empty")
value = response.values[0]
self.node.get_logger().info("Succesfully got parameter response from robot_state_publisher")
param = Parameter(
"robot_description", Parameter.Type.STRING, value.string_value
)
self.robot.urdf_string = value.string_value
self.node.set_parameters([param])
def create_get_parameters_request(self) -> GetParameters.Request:
"""

View file

@ -32,17 +32,16 @@ def launch_setup(context, *args, **kwargs):
moveit_config_file = LaunchConfiguration("moveit_config_file")
use_sim_time = LaunchConfiguration("use_sim_time")
scene_config_file = LaunchConfiguration("scene_config_file").perform(context)
ee_link_name = LaunchConfiguration("ee_link_name").perform(context)
base_link_name = LaunchConfiguration("base_link_name").perform(context)
control_space = LaunchConfiguration("control_space").perform(context)
control_strategy = LaunchConfiguration("control_strategy").perform(context)
interactive = LaunchConfiguration("interactive").perform(context)
real_robot = LaunchConfiguration("real_robot").perform(context)
use_rbs_utils = LaunchConfiguration("use_rbs_utils")
assembly_config_name = LaunchConfiguration("assembly_config_name")
if not scene_config_file == "":
config_file = {"config_file": scene_config_file}
else:
@ -152,6 +151,8 @@ def launch_setup(context, *args, **kwargs):
"control_space": control_space,
"control_strategy": control_strategy,
"interactive_control": interactive,
"use_rbs_utils": use_rbs_utils,
"assembly_config_name": assembly_config_name
}.items(),
)
@ -170,9 +171,7 @@ def launch_setup(context, *args, **kwargs):
rbs_runtime = Node(
package="rbs_runtime",
executable="runtime",
output='log',
emulate_tty=True,
# arguments=[('log_level:=debug')],
arguments=[('--ros-args --remap log_level:=debug')],
parameters=[robot_description, config_file, {"use_sim_time": True}],
)
@ -333,6 +332,23 @@ def generate_launch_description():
)
declared_arguments.append(
DeclareLaunchArgument(
"use_rbs_utils",
default_value="true",
description="Wheter to use rbs_utils",
),
)
declared_arguments.append(
DeclareLaunchArgument(
"assembly_config_name",
default_value="",
description="Assembly config name from rbs_assets_library",
),
)
return LaunchDescription(
declared_arguments + [OpaqueFunction(function=launch_setup)]
)

View file

@ -7,10 +7,10 @@ from env_manager_interfaces.srv import ResetEnv
from rbs_assets_library import get_world_file
from rclpy.node import Node
from scenario.bindings.gazebo import GazeboSimulator
from rclpy.executors import MultiThreadedExecutor, ExternalShutdownException
from .. import scene_config_loader
class GazeboRuntime(Node):
def __init__(self) -> None:
super().__init__(node_name="rbs_gz_runtime")
@ -61,14 +61,13 @@ class GazeboRuntime(Node):
return res
def main(args=None):
rclpy.init(args=args)
runtime_node = GazeboRuntime()
rclpy.spin(runtime_node)
runtime_node.gazebo.close()
runtime_node.destroy_node()
rclpy.shutdown()
def main():
rclpy.init()
if __name__ == "__main__":
main()
executor = MultiThreadedExecutor()
node = GazeboRuntime()
executor.add_node(node)
try:
executor.spin()
except (KeyboardInterrupt, ExternalShutdownException):
node.destroy_node()