diff --git a/env_manager/rbs_gym/img_color.png b/env_manager/rbs_gym/img_color.png new file mode 100644 index 0000000..e08e0c8 Binary files /dev/null and b/env_manager/rbs_gym/img_color.png differ diff --git a/env_manager/rbs_gym/rbs_gym/envs/__init__.py b/env_manager/rbs_gym/rbs_gym/envs/__init__.py index be5ce99..fd29d15 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/__init__.py +++ b/env_manager/rbs_gym/rbs_gym/envs/__init__.py @@ -13,7 +13,7 @@ except: pass from os import environ, path -from typing import Dict, Tuple +from typing import Dict, Tuple, Any import numpy as np from ament_index_python.packages import get_package_share_directory @@ -22,15 +22,13 @@ from gymnasium.envs.registration import register from rbs_gym.utils.utils import str2bool from . import tasks +from dataclasses import dataclass, field ###################### # Runtime Entrypoint # ###################### -# Entrypoint for tasks (can be simulated or real) -RBS_ENVS_TASK_ENTRYPOINT: str = ( - "gym_gz.runtimes.gazebo_runtime:GazeboRuntime" -) +RBS_ENVS_TASK_ENTRYPOINT: str = "gym_gz.runtimes.gazebo_runtime:GazeboRuntime" ################### @@ -43,9 +41,7 @@ RBS_ENVS_ROBOT_MODEL: str = "rbs_arm" # Datasets and paths # ###################### # Path to directory containing base SDF worlds -RBS_ENVS_WORLDS_DIR: str = path.join( - get_package_share_directory("rbs_gym"), "worlds" -) +RBS_ENVS_WORLDS_DIR: str = path.join(get_package_share_directory("rbs_gym"), "worlds") ########### @@ -53,7 +49,7 @@ RBS_ENVS_WORLDS_DIR: str = path.join( ########### # Gravity preset for Earth GRAVITY_EARTH: Tuple[float, float, float] = (0.0, 0.0, -9.80665) -GRAVITY_EARTH_STD: Tuple[float, float, float] = (0.0, 0.0, 0.0232) +GRAVITY_EARTH_STD: Tuple[float, float, float] = (0.0, 0.0, 0.0232) ############################ # Additional Configuration # @@ -67,7 +63,7 @@ BROADCAST_GUI: bool = str2bool( # Reach # ######### REACH_MAX_EPISODE_STEPS: int = 100 -REACH_KWARGS: Dict[str, any] = { +REACH_KWARGS: Dict[str, Any] = { "agent_rate": 4.0, "robot_model": RBS_ENVS_ROBOT_MODEL, "workspace_frame_id": "world", @@ -85,20 +81,84 @@ REACH_KWARGS: Dict[str, any] = { "required_accuracy": 0.05, "num_threads": 3, } -REACH_KWARGS_SIM: Dict[str, any] = { +REACH_KWARGS_SIM: Dict[str, Any] = { "physics_rate": 1000.0, "real_time_factor": float(np.finfo(np.float32).max), "world": path.join(RBS_ENVS_WORLDS_DIR, "default.sdf"), } + + +@dataclass +class ObjectRandomizerData: + count: int = field(default=0) + random_pose: bool = field(default=False) + random_position: bool = field(default=False) + random_orientation: bool = field(default=False) + random_model: bool = field(default=False) + random_spawn_position_segments: list = field(default_factory=list) + random_spawn_position_update_workspace_centre: bool = field(default=False) + random_spawn_volume: tuple = field(default=(0.5, 0.5, 0.5)) + models_rollouts_num: int = field(default=0) + + +@dataclass +class ObjectData: + name: str = field(default="object") + relative_to: str = field(default="world") + position: tuple = field(default=(0.0, 0.0, 0.0)) + orientation: tuple = field(default=(1.0, 0.0, 0.0, 0.0)) + static: bool = field(default=False) + randomize: ObjectRandomizerData = field(default_factory=ObjectRandomizerData) + + +@dataclass +class PrimitiveObjectData(ObjectData): + collision: bool = field(default=True) + visual: bool = field(default=True) + color: tuple = field(default=(0.8, 0.8, 0.8, 1.0)) + mass: float = field(default=0.1) + + +@dataclass +class SphereObjectData(PrimitiveObjectData): + radius: float = field(default=0.025) + friction: float = field(default=1.0) + + +@dataclass +class PlaneObjectData(PrimitiveObjectData): + size: tuple = field(default=(1.0, 1.0)) + direction: tuple = field(default=(0.0, 0.0, 1.0)) + friction: float = field(default=1.0) + + +@dataclass +class CylinderObjectData(PrimitiveObjectData): + radius: float = field(default=0.025) + length: float = field(default=0.1) + friction: float = field(default=1.0) + + +@dataclass +class BoxObjectData(PrimitiveObjectData): + size: tuple = field(default=(0.05, 0.05, 0.05)) + friction: float = field(default=1.0) + + +@dataclass +class MeshObjectData(ObjectData): + texture: list[float] = field(default_factory=list) + + REACH_RANDOMIZER: str = "rbs_gym.envs.randomizers:ManipulationGazeboEnvRandomizer" -REACH_KWARGS_RANDOMIZER: Dict[str, any] = { +REACH_KWARGS_RANDOMIZER: Dict[str, Any] = { "gravity": GRAVITY_EARTH, "gravity_std": GRAVITY_EARTH_STD, "plugin_scene_broadcaster": BROADCAST_GUI, "plugin_user_commands": BROADCAST_GUI, "plugin_sensors_render_engine": "ogre2", "robot_random_pose": False, - "robot_random_joint_positions": True, # FIXME: + "robot_random_joint_positions": True, # FIXME: "robot_random_joint_positions_std": 0.2, "robot_random_joint_positions_above_object_spawn": False, "robot_random_joint_positions_above_object_spawn_elevation": 0.0, @@ -111,42 +171,136 @@ REACH_KWARGS_RANDOMIZER: Dict[str, any] = { "light_visual": False, "light_radius": 25.0, "light_model_rollouts_num": 1, - "object_enable": True, - "object_type": "sphere", - "objects_relative_to": "base_link", - "object_static": True, - "object_collision": False, - "object_visual": True, - "object_color": (0.0, 0.0, 1.0, 1.0), - "object_dimensions": [0.025, 0.025, 0.025], - "object_count": 1, - "object_spawn_position": (-0.4, 0, 0.3), - "object_random_pose": True, - "object_random_spawn_position_segments": [], - "object_random_spawn_volume": (0.2, 0.2, 0.2), - "object_models_rollouts_num": 0, "underworld_collision_plane": False, } -REACH_KWARGS_RANDOMIZER_CAMERA: Dict[str, any] = { - "camera_enable": True, - "camera_width": 64, - "camera_height": 64, - "camera_update_rate": 1.2 * REACH_KWARGS["agent_rate"], - "camera_horizontal_fov": np.pi / 3.0, - "camera_vertical_fov": np.pi / 3.0, - "camera_noise_mean": 0.0, - "camera_noise_stddev": 0.001, - "camera_relative_to": "base_link", - "camera_spawn_position": (0.85, -0.4, 0.45), - "camera_spawn_quat_xyzw": (-0.0402991, -0.0166924, 0.9230002, 0.3823192), - "camera_random_pose_rollouts_num": 0, - "camera_random_pose_mode": "orbit", - "camera_random_pose_orbit_distance": 1.0, - "camera_random_pose_orbit_height_range": (0.1, 0.7), - "camera_random_pose_orbit_ignore_arc_behind_robot": np.pi / 8, - "camera_random_pose_select_position_options": [], - "camera_random_pose_focal_point_z_offset": 0.0, -} +REACH_KWARGS_OBJECT_RANDOMIZER: list = [ + MeshObjectData( + "bishop", + "base_link", + (-0.3, 0.0, 0.0), + randomize=ObjectRandomizerData( + random_position=True, random_spawn_volume=(0.2, 0.5, 0.0) + ), + ), + SphereObjectData( + "sphere", + "base_link", + (-0.3, 0.0, 0.3), + mass=1.0, + color=(0.0, 0.0, 1.0, 1.0), + # randomize=ObjectRandomizerData( + # random_pose=True, random_spawn_volume=(0.2, 0.2, 0.0) + # ), + ), +] +# REACH_KWARGS_RANDOMIZER_CAMERA: Dict[str, Any] = { +# "camera_enable": True, +# "camera_width": 64, +# "camera_height": 64, +# "camera_update_rate": 1.2 * REACH_KWARGS["agent_rate"], +# "camera_horizontal_fov": np.pi / 3.0, +# "camera_vertical_fov": np.pi / 3.0, +# "camera_noise_mean": 0.0, +# "camera_noise_stddev": 0.001, +# "camera_relative_to": "base_link", +# "camera_spawn_position": (0.85, -0.4, 0.45), +# "camera_spawn_quat_xyzw": (-0.0402991, -0.0166924, 0.9230002, 0.3823192), +# "camera_random_pose_rollouts_num": 0, +# "camera_random_pose_mode": "orbit", +# "camera_random_pose_orbit_distance": 1.0, +# "camera_random_pose_orbit_height_range": (0.1, 0.7), +# "camera_random_pose_orbit_ignore_arc_behind_robot": np.pi / 8, +# "camera_random_pose_select_position_options": [], +# "camera_random_pose_focal_point_z_offset": 0.0, +# } + + +@dataclass +class CameraData: + name: str | None = None + enable: bool = field(default=True) + type: str = field(default="rgbd_camera") + relative_to: str = field(default="base_link") + + width: int = field(default=128) + height: int = field(default=128) + image_format: str = field(default="R8G8B8") + update_rate: int = field(default=10) + horizontal_fov: float = field(default=np.pi / 3.0) + vertical_fov: float = field(default=np.pi / 3.0) + + clip_color: tuple[float, float] = field(default=(0.01, 1000.0)) + clip_depth: tuple[float, float] = field(default=(0.05, 10.0)) + + noise_mean: float | None = None + noise_stddev: float | None = None + + publish_color: bool = field(default=False) + publish_depth: bool = field(default=False) + publish_points: bool = field(default=False) + + spawn_position: tuple[float, float, float] = field(default=(0, 0, 1)) + spawn_quat_xyzw: tuple[float, float, float, float] = field( + default=(0, 0.70710678118, 0, 0.70710678118) + ) + + random_pose_rollouts_num: int = field(default=1) + random_pose_mode: str = field(default="orbit") + random_pose_orbit_distance: float = field(default=1.0) + random_pose_orbit_height_range: tuple[float, float] = field(default=(0.1, 0.7)) + random_pose_orbit_ignore_arc_behind_robot: float = field(default=np.pi / 8) + random_pose_select_position_options: list[tuple[float, float, float]] = field( + default_factory=list + ) + random_pose_focal_point_z_offset: float = field(default=0.0) + + +REACH_KWARGS_RANDOMIZER_CAMERAS: list[CameraData] = [ + CameraData( + name="inner_robot_camera", + enable=True, + type="rgbd_camera", + relative_to="rbs_gripper_rot_base_link", + width=320, + height=240, + update_rate=1.2 * REACH_KWARGS["agent_rate"], + horizontal_fov=np.pi / 3.0, + vertical_fov=np.pi / 3.0, + noise_mean=0.0, + publish_color=True, + noise_stddev=0.001, + spawn_position=(-0.02, 0.0, 0.03), + spawn_quat_xyzw=(0.0, -0.707, 0.0, 0.707), + random_pose_mode="orbit", + random_pose_rollouts_num=0, + random_pose_orbit_distance=1.0, + random_pose_orbit_height_range=(0.1, 0.7), + random_pose_focal_point_z_offset=0.0, + random_pose_orbit_ignore_arc_behind_robot=np.pi / 8, + ), + CameraData( + name="outer_robot_camera", + enable=True, + type="rgbd_camera", + relative_to="base_link", + width=320, + height=240, + update_rate=1.2 * REACH_KWARGS["agent_rate"], + publish_color=True, + horizontal_fov=np.pi / 3.0, + vertical_fov=np.pi / 3.0, + noise_mean=0.0, + noise_stddev=0.001, + spawn_position=(-0.2, 1.0, 1.0), + spawn_quat_xyzw=(0.183, 0.183, -0.683, 0.683), + random_pose_mode="orbit", + random_pose_rollouts_num=0, + random_pose_orbit_distance=1.0, + random_pose_orbit_height_range=(0.1, 0.7), + random_pose_focal_point_z_offset=0.0, + random_pose_orbit_ignore_arc_behind_robot=np.pi / 8, + ), +] # Task register( @@ -186,7 +340,7 @@ register( "env": "Reach-v0", **REACH_KWARGS_SIM, **REACH_KWARGS_RANDOMIZER, - "camera_enable": False, + "object_args": REACH_KWARGS_OBJECT_RANDOMIZER, }, ) register( @@ -197,9 +351,8 @@ register( "env": "Reach-ColorImage-v0", **REACH_KWARGS_SIM, **REACH_KWARGS_RANDOMIZER, - **REACH_KWARGS_RANDOMIZER_CAMERA, - "camera_type": "rgbd_camera", - "camera_publish_color": True, + "camera_args": REACH_KWARGS_RANDOMIZER_CAMERAS, + "object_args": REACH_KWARGS_OBJECT_RANDOMIZER, }, ) register( @@ -210,8 +363,7 @@ register( "env": "Reach-DepthImage-v0", **REACH_KWARGS_SIM, **REACH_KWARGS_RANDOMIZER, - **REACH_KWARGS_RANDOMIZER_CAMERA, - "camera_type": "depth_camera", - "camera_publish_depth": True, + "camera_args": REACH_KWARGS_RANDOMIZER_CAMERAS, + "object_args": REACH_KWARGS_OBJECT_RANDOMIZER, }, ) diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/objects/__init__.py b/env_manager/rbs_gym/rbs_gym/envs/models/objects/__init__.py index 6914549..70a220c 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/objects/__init__.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/objects/__init__.py @@ -1,35 +1,51 @@ +from enum import Enum + from gym_gz.scenario.model_wrapper import ModelWrapper +from .model import Mesh from .primitives import Box, Cylinder, Plane, Sphere -from .random_lunar_rock import RandomLunarRock from .random_object import RandomObject from .random_primitive import RandomPrimitive -from .rock import Rock -def get_object_model_class(object_type: str) -> ModelWrapper: - # TODO: Refactor into enum +# Enum для типов объектов +class ObjectType(Enum): + BOX = "box" + SPHERE = "sphere" + CYLINDER = "cylinder" + PLANE = "plane" + RANDOM_PRIMITIVE = "random_primitive" + RANDOM_MESH = "random_mesh" + MESH = "mesh" - if "box" == object_type: - return Box - elif "sphere" == object_type: - return Sphere - elif "cylinder" == object_type: - return Cylinder - elif "random_primitive" == object_type: - return RandomPrimitive - elif "random_mesh" == object_type: - return RandomObject - elif "rock" == object_type: - return Rock - elif "random_lunar_rock" == object_type: - return RandomLunarRock +OBJECT_MODEL_CLASSES = { + ObjectType.BOX: Box, + ObjectType.SPHERE: Sphere, + ObjectType.CYLINDER: Cylinder, + ObjectType.PLANE: Plane, + ObjectType.RANDOM_PRIMITIVE: RandomPrimitive, + ObjectType.RANDOM_MESH: RandomObject, + ObjectType.MESH: Mesh, +} + + +RANDOMIZABLE_TYPES = { + ObjectType.RANDOM_PRIMITIVE, + ObjectType.RANDOM_MESH, +} + + +def get_object_model_class(object_type: str) -> type[ModelWrapper]: + try: + object_enum = ObjectType(object_type) + return OBJECT_MODEL_CLASSES[object_enum] + except KeyError: + raise ValueError(f"Unsupported object type: {object_type}") def is_object_type_randomizable(object_type: str) -> bool: - - return ( - "random_primitive" == object_type - or "random_mesh" == object_type - or "random_lunar_rock" == object_type - ) + try: + object_enum = ObjectType(object_type) + return object_enum in RANDOMIZABLE_TYPES + except ValueError: + return False diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/objects/model.py b/env_manager/rbs_gym/rbs_gym/envs/models/objects/model.py new file mode 100644 index 0000000..cb6cd9f --- /dev/null +++ b/env_manager/rbs_gym/rbs_gym/envs/models/objects/model.py @@ -0,0 +1,51 @@ +from typing import List + +from gym_gz.scenario import model_wrapper +from gym_gz.utils import misc +from gym_gz.utils.scenario import get_unique_model_name +from scenario import core as scenario_core +from scenario import gazebo as scenario_gazebo +from rbs_assets_library import get_model_file + + +class Mesh(model_wrapper.ModelWrapper): + def __init__( + self, + world: scenario_gazebo.World, + name: str = "object", + position: tuple[float, float, float] = (0, 0, 0), + orientation: tuple[float, float, float, float] = (1, 0, 0, 0), + gui_only: bool = False, + **kwargs, + ): + + # Get a unique model name + model_name = get_unique_model_name(world, name) + + # Initial pose + initial_pose = scenario_core.Pose(position, orientation) + + # Create SDF string for the model + sdf = self.get_sdf( + model_name=name, + ) + + # Insert the model + ok_model = world.to_gazebo().insert_model( + sdf, initial_pose, model_name + ) + if not ok_model: + raise RuntimeError("Failed to insert " + model_name) + + # Get the model + model = world.get_model(model_name) + + # Initialize base class + super().__init__(model=model) + + @classmethod + def get_sdf( + cls, + model_name: str, + ) -> str: + return get_model_file(model_name) diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/box.py b/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/box.py index 340544e..2bf6327 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/box.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/box.py @@ -1,26 +1,24 @@ -from typing import List - from gym_gz.scenario import model_wrapper -from gym_gz.utils import misc from gym_gz.utils.scenario import get_unique_model_name -from scenario import core as scenario +from scenario import core as scenario_core +from scenario import gazebo as scenario_gazebo class Box(model_wrapper.ModelWrapper): def __init__( self, - world: scenario.World, + world: scenario_gazebo.World, name: str = "box", - position: List[float] = (0, 0, 0), - orientation: List[float] = (1, 0, 0, 0), - size: List[float] = (0.05, 0.05, 0.05), + position: tuple[float, float, float] = (0, 0, 0), + orientation: tuple[float, float, float, float] = (1, 0, 0, 0), + size: tuple[float, float, float] = (0.05, 0.05, 0.05), mass: float = 0.1, static: bool = False, collision: bool = True, friction: float = 1.0, visual: bool = True, gui_only: bool = False, - color: List[float] = (0.8, 0.8, 0.8, 1.0), + color: tuple[float, float, float, float] = (0.8, 0.8, 0.8, 1.0), **kwargs, ): @@ -28,7 +26,7 @@ class Box(model_wrapper.ModelWrapper): model_name = get_unique_model_name(world, name) # Initial pose - initial_pose = scenario.Pose(position, orientation) + initial_pose = scenario_core.Pose(position, orientation) # Create SDF string for the model sdf = self.get_sdf( @@ -48,26 +46,26 @@ class Box(model_wrapper.ModelWrapper): sdf, initial_pose, model_name ) if not ok_model: - raise RuntimeError("Failed to insert " + model_name) + raise RuntimeError(f"Failed to insert {model_name}") # Get the model model = world.get_model(model_name) # Initialize base class - model_wrapper.ModelWrapper.__init__(self, model=model) + super().__init__(model=model) @classmethod def get_sdf( - self, + cls, model_name: str, - size: List[float], + size: tuple[float, float, float], mass: float, static: bool, collision: bool, friction: float, visual: bool, gui_only: bool, - color: List[float], + color: tuple[float, float, float, float], ) -> str: return f''' @@ -108,7 +106,7 @@ class Box(model_wrapper.ModelWrapper): {color[0]} {color[1]} {color[2]} {color[3]} {color[0]} {color[1]} {color[2]} {color[3]} - {1.0-color[3]} + {1.0 - color[3]} {'1 false' if gui_only else ''} """ if visual else "" @@ -116,9 +114,9 @@ class Box(model_wrapper.ModelWrapper): {mass} - {(size[1]**2 + size[2]**2)*mass/12} - {(size[0]**2 + size[2]**2)*mass/12} - {(size[0]**2 + size[1]**2)*mass/12} + {(size[1]**2 + size[2]**2) * mass / 12} + {(size[0]**2 + size[2]**2) * mass / 12} + {(size[0]**2 + size[1]**2) * mass / 12} 0.0 0.0 0.0 diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/cylinder.py b/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/cylinder.py index 1d60aee..34aa17b 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/cylinder.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/cylinder.py @@ -1,18 +1,16 @@ -from typing import List - from gym_gz.scenario import model_wrapper -from gym_gz.utils import misc from gym_gz.utils.scenario import get_unique_model_name -from scenario import core as scenario +from scenario import core as scenario_core +from scenario import gazebo as scenario_gazebo class Cylinder(model_wrapper.ModelWrapper): def __init__( self, - world: scenario.World, + world: scenario_gazebo.World, name: str = "cylinder", - position: List[float] = (0, 0, 0), - orientation: List[float] = (1, 0, 0, 0), + position: tuple[float, float, float] = (0, 0, 0), + orientation: tuple[float, float, float, float] = (1, 0, 0, 0), radius: float = 0.025, length: float = 0.1, mass: float = 0.1, @@ -21,17 +19,17 @@ class Cylinder(model_wrapper.ModelWrapper): friction: float = 1.0, visual: bool = True, gui_only: bool = False, - color: List[float] = (0.8, 0.8, 0.8, 1.0), + color: tuple[float, float, float, float] = (0.8, 0.8, 0.8, 1.0), **kwargs, ): - # Get a unique model name + # Получаем уникальное имя модели model_name = get_unique_model_name(world, name) - # Initial pose - initial_pose = scenario.Pose(position, orientation) + # Инициализируем начальную позу + initial_pose = scenario_core.Pose(position, orientation) - # Create SDF string for the model + # Создаем строку SDF для модели sdf = self.get_sdf( model_name=model_name, radius=radius, @@ -45,22 +43,22 @@ class Cylinder(model_wrapper.ModelWrapper): color=color, ) - # Insert the model + # Вставляем модель ok_model = world.to_gazebo().insert_model_from_string( sdf, initial_pose, model_name ) if not ok_model: - raise RuntimeError("Failed to insert " + model_name) + raise RuntimeError(f"Failed to insert {model_name}") - # Get the model + # Получаем модель model = world.get_model(model_name) - # Initialize base class - model_wrapper.ModelWrapper.__init__(self, model=model) + # Вызов конструктора родительского класса + super().__init__(model=model) @classmethod def get_sdf( - self, + cls, model_name: str, radius: float, length: float, @@ -70,9 +68,9 @@ class Cylinder(model_wrapper.ModelWrapper): friction: float, visual: bool, gui_only: bool, - color: List[float], + color: tuple[float, float, float, float], ) -> str: - # Inertia is identical for xx and yy components, compute only once + # Инерция одинакова для xx и yy, вычисляем ее один раз inertia_xx_yy = (3 * radius**2 + length**2) * mass / 12 return f''' diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/plane.py b/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/plane.py index 0699781..8f6db76 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/plane.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/plane.py @@ -1,33 +1,30 @@ -from typing import List - from gym_gz.scenario import model_wrapper -from gym_gz.utils import misc from gym_gz.utils.scenario import get_unique_model_name -from scenario import core as scenario +from scenario import core as scenario_core +from scenario import gazebo as scenario_gazebo class Plane(model_wrapper.ModelWrapper): def __init__( self, - world: scenario.World, + world: scenario_gazebo.World, name: str = "plane", - position: List[float] = (0, 0, 0), - orientation: List[float] = (1, 0, 0, 0), - size: List[float] = (1.0, 1.0), - direction: List[float] = (0.0, 0.0, 1.0), + position: tuple[float, float, float] = (0, 0, 0), + orientation: tuple[float, float, float, float] = (1, 0, 0, 0), + size: tuple[float, float] = (1.0, 1.0), + direction: tuple[float, float, float] = (0.0, 0.0, 1.0), collision: bool = True, friction: float = 1.0, visual: bool = True, **kwargs, ): - - # Get a unique model name + # Получаем уникальное имя модели model_name = get_unique_model_name(world, name) - # Initial pose - initial_pose = scenario.Pose(position, orientation) + # Инициализируем начальную позу + initial_pose = scenario_core.Pose(position, orientation) - # Create SDF string for the model + # Генерация строки SDF для модели sdf = f''' true @@ -76,15 +73,15 @@ class Plane(model_wrapper.ModelWrapper): ''' - # Insert the model + # Вставка модели ok_model = world.to_gazebo().insert_model_from_string( sdf, initial_pose, model_name ) if not ok_model: - raise RuntimeError("Failed to insert " + model_name) + raise RuntimeError(f"Failed to insert {model_name}") - # Get the model + # Получение модели model = world.get_model(model_name) - # Initialize base class - model_wrapper.ModelWrapper.__init__(self, model=model) + # Вызов конструктора родительского класса + super().__init__(model=model) diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/sphere.py b/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/sphere.py index 5e90170..959ddd8 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/sphere.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/objects/primitives/sphere.py @@ -1,18 +1,16 @@ -from typing import List - from gym_gz.scenario import model_wrapper -from gym_gz.utils import misc from gym_gz.utils.scenario import get_unique_model_name -from scenario import core as scenario +from scenario import core as scenario_core +from scenario import gazebo as scenario_gazebo class Sphere(model_wrapper.ModelWrapper): def __init__( self, - world: scenario.World, + world: scenario_gazebo.World, name: str = "sphere", - position: List[float] = (0, 0, 0), - orientation: List[float] = (1, 0, 0, 0), + position: tuple[float, float, float] = (0, 0, 0), + orientation: tuple[float, float, float, float] = (1, 0, 0, 0), radius: float = 0.025, mass: float = 0.1, static: bool = False, @@ -20,7 +18,7 @@ class Sphere(model_wrapper.ModelWrapper): friction: float = 1.0, visual: bool = True, gui_only: bool = False, - color: List[float] = (0.8, 0.8, 0.8, 1.0), + color: tuple[float, float, float, float] = (0.8, 0.8, 0.8, 1.0), **kwargs, ): @@ -28,7 +26,7 @@ class Sphere(model_wrapper.ModelWrapper): model_name = get_unique_model_name(world, name) # Initial pose - initial_pose = scenario.Pose(position, orientation) + initial_pose = scenario_core.Pose(position, orientation) # Create SDF string for the model sdf = self.get_sdf( @@ -48,17 +46,17 @@ class Sphere(model_wrapper.ModelWrapper): sdf, initial_pose, model_name ) if not ok_model: - raise RuntimeError("Failed to insert " + model_name) + raise RuntimeError(f"Failed to insert {model_name}") # Get the model model = world.get_model(model_name) # Initialize base class - model_wrapper.ModelWrapper.__init__(self, model=model) + super().__init__(model=model) @classmethod def get_sdf( - self, + cls, model_name: str, radius: float, mass: float, @@ -67,7 +65,7 @@ class Sphere(model_wrapper.ModelWrapper): friction: float, visual: bool, gui_only: bool, - color: List[float], + color: tuple[float, float, float, float], ) -> str: # Inertia is identical for all axes inertia_xx_yy_zz = (mass * radius**2) * 2 / 5 @@ -111,7 +109,7 @@ class Sphere(model_wrapper.ModelWrapper): {color[0]} {color[1]} {color[2]} {color[3]} {color[0]} {color[1]} {color[2]} {color[3]} - {1.0-color[3]} + {1.0 - color[3]} {'1 false' if gui_only else ''} """ if visual else "" diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/objects/random_lunar_rock.py b/env_manager/rbs_gym/rbs_gym/envs/models/objects/random_lunar_rock.py deleted file mode 100644 index ff237c8..0000000 --- a/env_manager/rbs_gym/rbs_gym/envs/models/objects/random_lunar_rock.py +++ /dev/null @@ -1,57 +0,0 @@ -import os -from typing import List, Optional - -import numpy as np -from gym_gz.scenario import model_wrapper -from gym_gz.utils.scenario import get_unique_model_name -from numpy.random import RandomState -from scenario import core as scenario - - -class RandomLunarRock(model_wrapper.ModelWrapper): - def __init__( - self, - world: scenario.World, - name: str = "rock", - position: List[float] = (0, 0, 0), - orientation: List[float] = (1, 0, 0, 0), - models_dir: Optional[str] = None, - np_random: Optional[RandomState] = None, - **kwargs, - ): - - if np_random is None: - np_random = np.random.default_rng() - - # Get a unique model name - model_name = get_unique_model_name(world, name) - - # Initial pose - initial_pose = scenario.Pose(position, orientation) - - # Get path to all lunar rock models - if not models_dir: - models_dir = os.environ.get("SDF_PATH_LUNAR_ROCK", default="") - - # Make sure the path exists - if not os.path.exists(models_dir): - raise ValueError( - f"Invalid path '{models_dir}' pointed by 'SDF_PATH_LUNAR_ROCK' environment variable." - ) - - # Select a single model at random - model_dir = np_random.choice(os.listdir(models_dir)) - sdf_filepath = os.path.join(model_dir, "model.sdf") - - # Insert the model - ok_model = world.to_gazebo().insert_model_from_file( - sdf_filepath, initial_pose, model_name - ) - if not ok_model: - raise RuntimeError("Failed to insert " + model_name) - - # Get the model - model = world.get_model(model_name) - - # Initialize base class - model_wrapper.ModelWrapper.__init__(self, model=model) diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/objects/rock.py b/env_manager/rbs_gym/rbs_gym/envs/models/objects/rock.py deleted file mode 100644 index fa3844c..0000000 --- a/env_manager/rbs_gym/rbs_gym/envs/models/objects/rock.py +++ /dev/null @@ -1,52 +0,0 @@ -from typing import List - -from gym_gz.scenario import model_with_file, model_wrapper -from gym_gz.utils.scenario import get_unique_model_name -from scenario import core as scenario -from scenario import gazebo as scenario_gazebo - - -class Rock(model_wrapper.ModelWrapper, model_with_file.ModelWithFile): - def __init__( - self, - world: scenario.World, - name: str = "rock", - position: List[float] = (0, 0, 0), - orientation: List[float] = (1, 0, 0, 0), - model_file: str = None, - use_fuel: bool = True, - variant: int = 6, - **kwargs, - ): - - # Allow passing of custom model file as an argument - if model_file is None: - model_file = self.get_model_file(fuel=use_fuel, variant=variant) - - # Get a unique model name - model_name = get_unique_model_name(world, name) - - # Setup initial pose - initial_pose = scenario.Pose(position, orientation) - - # Insert the model - ok_model = world.to_gazebo().insert_model_from_file( - model_file, initial_pose, model_name - ) - if not ok_model: - raise RuntimeError("Failed to insert " + model_name) - - # Get the model - model = world.get_model(model_name) - - # Initialize base class - super().__init__(model=model) - - @classmethod - def get_model_file(self, fuel: bool = False, variant: int = 6) -> str: - if fuel: - return scenario_gazebo.get_model_file_from_fuel( - f"https://fuel.ignitionrobotics.org/1.0/OpenRobotics/models/Falling Rock {variant}" - ) - else: - return "lunar_surface" diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/robots/__init__.py b/env_manager/rbs_gym/rbs_gym/envs/models/robots/__init__.py index 5822b01..2a117fa 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/robots/__init__.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/robots/__init__.py @@ -6,10 +6,12 @@ from .rbs_arm import RbsArm # TODO: When adding new a robot, create abstract classes to simplify such process -def get_robot_model_class(robot_model: str) -> ModelWrapper: +def get_robot_model_class(robot_model: str) -> type[RbsArm]: # TODO: Refactor into enum if "rbs_arm" == robot_model: return RbsArm + else: + return RbsArm # elif "panda" == robot_model: # return Panda diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/robots/rbs_arm.py b/env_manager/rbs_gym/rbs_gym/envs/models/robots/rbs_arm.py index cdfd929..f7942ba 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/robots/rbs_arm.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/robots/rbs_arm.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Any from gym_gz.scenario import model_with_file, model_wrapper from gym_gz.utils.scenario import get_unique_model_name @@ -8,6 +8,10 @@ import numpy as np from scenario import core as scenario from scenario import gazebo as scenario_gazebo +# from robot_builder.elements.robot import Robot +# from robot_builder.writer.urdf import URDF_writer +# from robot_builder.parser.urdf import URDF_parser + class RbsArm(model_wrapper.ModelWrapper, model_with_file.ModelWithFile): DEFAULT_ARM_JOINT_POSITIONS: List[float] = ( @@ -34,13 +38,13 @@ class RbsArm(model_wrapper.ModelWrapper, model_with_file.ModelWithFile): self, world: scenario.World, name: str = "rbs_arm", - position: List[float] = (0.0, 0.0, 0.0), - orientation: List[float] = (1.0, 0, 0, 0), - model_file: Optional[str] = None, + position: tuple[float, float, float] = (0.0, 0.0, 0.0), + orientation: tuple[float, float, float, float] = (1.0, 0, 0, 0), + model_file: str | None = None, use_fuel: bool = False, use_xacro: bool = False, xacro_file: str = "", - xacro_mappings: Dict[str, any] = {}, + xacro_mappings: Dict[str, Any] = {}, initial_arm_joint_positions: List[float] = DEFAULT_ARM_JOINT_POSITIONS, initial_gripper_joint_positions: List[float] = DEFAULT_GRIPPER_JOINT_POSITIONS, **kwargs diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/sensors/camera.py b/env_manager/rbs_gym/rbs_gym/envs/models/sensors/camera.py index 6a089a1..d5b20d4 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/sensors/camera.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/sensors/camera.py @@ -1,10 +1,10 @@ import os from threading import Thread -from typing import List, Optional, Union from gym_gz.scenario import model_wrapper from gym_gz.utils.scenario import get_unique_model_name -from scenario import core as scenario +from scenario import core as scenario_core +from scenario import gazebo as scenario_gz from rbs_gym.envs.models.utils import ModelCollectionRandomizer @@ -12,10 +12,10 @@ from rbs_gym.envs.models.utils import ModelCollectionRandomizer class Camera(model_wrapper.ModelWrapper): def __init__( self, - world: scenario.World, - name: Union[str, None] = None, - position: List[float] = (0, 0, 0), - orientation: List[float] = (1, 0, 0, 0), + world: scenario_gz.World, + name: str | None = None, + position: tuple[float, float, float] = (0, 0, 0), + orientation: tuple[float, float, float, float] = (1, 0, 0, 0), static: bool = True, camera_type: str = "rgbd_camera", width: int = 212, @@ -24,31 +24,37 @@ class Camera(model_wrapper.ModelWrapper): update_rate: int = 15, horizontal_fov: float = 1.567821, vertical_fov: float = 1.022238, - clip_color: List[float] = (0.02, 1000.0), - clip_depth: List[float] = (0.02, 10.0), - noise_mean: float = None, - noise_stddev: float = None, + clip_color: tuple[float, float] = (0.02, 1000.0), + clip_depth: tuple[float, float] = (0.02, 10.0), + noise_mean: float | None = None, + noise_stddev: float | None = None, ros2_bridge_color: bool = False, ros2_bridge_depth: bool = False, ros2_bridge_points: bool = False, visibility_mask: int = 0, - visual: Optional[str] = None, + visual: str | None = None, # visual: Optional[str] = "intel_realsense_d435", ): - # Get a unique model name if name is not None: model_name = get_unique_model_name(world, name) else: model_name = get_unique_model_name(world, camera_type) self._model_name = model_name + self._camera_type = camera_type # Initial pose - initial_pose = scenario.Pose(position, orientation) + initial_pose = scenario_core.Pose(position, orientation) + use_mesh: bool = False + mesh_path_visual: str = "" + + albedo_map = None + normal_map = None + roughness_map = None + metalness_map = None # Get resources for visual (if enabled) if visual: - use_mesh: bool = False if "intel_realsense_d435" == visual: use_mesh = True @@ -71,10 +77,6 @@ class Camera(model_wrapper.ModelWrapper): ) # Find PBR textures - albedo_map = None - normal_map = None - roughness_map = None - metalness_map = None if texture_dir: # List all files texture_files = os.listdir(texture_dir) @@ -94,7 +96,7 @@ class Camera(model_wrapper.ModelWrapper): metalness_map = os.path.join(texture_dir, texture) if not (albedo_map and normal_map and roughness_map and metalness_map): - raise ValueError(f"Not all textures for Camera model were found.") + raise ValueError("Not all textures for Camera model were found.") # Create SDF string for the model sdf = f''' @@ -225,6 +227,17 @@ class Camera(model_wrapper.ModelWrapper): if ros2_bridge_color or ros2_bridge_depth or ros2_bridge_points: self.__threads = [] + self.__threads.append( + Thread( + target=self.construct_ros2_bridge, + args=( + self.info_topic, + "sensor_msgs/msg/CameraInfo", + "gz.msgs.CameraInfo", + ), + daemon=True, + ) + ) if ros2_bridge_color: self.__threads.append( Thread( @@ -232,7 +245,7 @@ class Camera(model_wrapper.ModelWrapper): args=( self.color_topic, "sensor_msgs/msg/Image", - "ignition.msgs.Image", + "gz.msgs.Image", ), daemon=True, ) @@ -245,7 +258,7 @@ class Camera(model_wrapper.ModelWrapper): args=( self.depth_topic, "sensor_msgs/msg/Image", - "ignition.msgs.Image", + "gz.msgs.Image", ), daemon=True, ) @@ -258,7 +271,7 @@ class Camera(model_wrapper.ModelWrapper): args=( self.points_topic, "sensor_msgs/msg/PointCloud2", - "ignition.msgs.PointCloudPacked", + "gz.msgs.PointCloudPacked", ), daemon=True, ) @@ -273,10 +286,10 @@ class Camera(model_wrapper.ModelWrapper): thread.join() @classmethod - def construct_ros2_bridge(self, topic: str, ros_msg: str, ign_msg: str): + def construct_ros2_bridge(cls, topic: str, ros_msg: str, ign_msg: str): node_name = "parameter_bridge" + topic.replace("/", "_") command = ( - f"ros2 run ros_ign_bridge parameter_bridge {topic}@{ros_msg}[{ign_msg} " + f"ros2 run ros_gz_bridge parameter_bridge {topic}@{ros_msg}[{ign_msg} " + f"--ros-args --remap __node:={node_name} --ros-args -p use_sim_time:=true" ) os.system(command) @@ -290,22 +303,22 @@ class Camera(model_wrapper.ModelWrapper): return self.get_frame_id(self._model_name) @classmethod - def get_color_topic(cls, model_name: str) -> str: - return f"/{model_name}/image" if "rgbd" in model_name else f"/{model_name}" + def get_color_topic(cls, model_name: str, camera_type: str) -> str: + return f"/{model_name}/image" if "rgbd" in camera_type else f"/{model_name}" @property def color_topic(self) -> str: - return self.get_color_topic(self._model_name) + return self.get_color_topic(self._model_name, self._camera_type) @classmethod - def get_depth_topic(cls, model_name: str) -> str: + def get_depth_topic(cls, model_name: str, camera_type: str) -> str: return ( - f"/{model_name}/depth_image" if "rgbd" in model_name else f"/{model_name}" + f"/{model_name}/depth_image" if "rgbd" in camera_type else f"/{model_name}" ) @property def depth_topic(self) -> str: - return self.get_depth_topic(self._model_name) + return self.get_depth_topic(self._model_name, self._camera_type) @classmethod def get_points_topic(cls, model_name: str) -> str: @@ -315,6 +328,10 @@ class Camera(model_wrapper.ModelWrapper): def points_topic(self) -> str: return self.get_points_topic(self._model_name) + @property + def info_topic(self): + return f"/{self._model_name}/camera_info" + @classmethod def get_link_name(cls, model_name: str) -> str: return f"{model_name}_link" diff --git a/env_manager/rbs_gym/rbs_gym/envs/models/terrains/__init__.py b/env_manager/rbs_gym/rbs_gym/envs/models/terrains/__init__.py index d28644e..69d8435 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/models/terrains/__init__.py +++ b/env_manager/rbs_gym/rbs_gym/envs/models/terrains/__init__.py @@ -7,7 +7,7 @@ from .random_ground import RandomGround from .random_lunar_surface import RandomLunarSurface -def get_terrain_model_class(terrain_type: str) -> ModelWrapper: +def get_terrain_model_class(terrain_type: str) -> type[ModelWrapper]: # TODO: Refactor into enum if "flat" == terrain_type: @@ -20,6 +20,8 @@ def get_terrain_model_class(terrain_type: str) -> ModelWrapper: return LunarSurface elif "random_lunar_surface" == terrain_type: return RandomLunarSurface + else: + raise AttributeError(f"Unsupported terrain [{terrain_type}]") def is_terrain_type_randomizable(terrain_type: str) -> bool: diff --git a/env_manager/rbs_gym/rbs_gym/envs/randomizers/manipulation.py b/env_manager/rbs_gym/rbs_gym/envs/randomizers/manipulation.py index e66180f..5de5e3c 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/randomizers/manipulation.py +++ b/env_manager/rbs_gym/rbs_gym/envs/randomizers/manipulation.py @@ -1,6 +1,7 @@ import abc -from typing import List, Tuple, Union +from typing import List, Tuple +from gym_gz.scenario.model_wrapper import ModelWrapper import numpy as np from gym_gz import randomizers from gym_gz.randomizers import gazebo_env_randomizer @@ -8,17 +9,28 @@ from gym_gz.randomizers.gazebo_env_randomizer import MakeEnvCallable from scenario import gazebo as scenario from scipy.spatial import distance from scipy.spatial.transform import Rotation +from dataclasses import asdict -from rbs_gym.envs import models, tasks +from rbs_gym.envs import ( + BoxObjectData, + CylinderObjectData, + MeshObjectData, + PlaneObjectData, + PrimitiveObjectData, + SphereObjectData, + models, + tasks, +) from rbs_gym.envs.utils.conversions import quat_to_wxyz -from rbs_gym.envs.utils.gazebo import * +from rbs_gym.envs.utils.gazebo import ( + transform_move_to_model_pose, + transform_move_to_model_position, +) from rbs_gym.envs.utils.logging import set_log_level +from rbs_gym.envs import CameraData, ObjectData + +SupportedTasks = tasks.Reach | tasks.ReachColorImage | tasks.ReachDepthImage -SupportedTasks = [ - tasks.Reach, - tasks.ReachColorImage, - tasks.ReachDepthImage -] class ManipulationGazeboEnvRandomizer( gazebo_env_randomizer.GazeboEnvRandomizer, @@ -62,39 +74,7 @@ class ManipulationGazeboEnvRandomizer( robot_random_joint_positions_above_object_spawn_elevation: float = 0.2, robot_random_joint_positions_above_object_spawn_xy_randomness: float = 0.2, # Camera # - camera_enable: bool = True, - camera_type: str = "rgbd_camera", - camera_relative_to: str = "base_link", - camera_width: int = 128, - camera_height: int = 128, - camera_image_format: str = "R8G8B8", - camera_update_rate: int = 10, - camera_horizontal_fov: float = np.pi / 3.0, - camera_vertical_fov: float = np.pi / 3.0, - camera_clip_color: Tuple[float, float] = (0.01, 1000.0), - camera_clip_depth: Tuple[float, float] = (0.05, 10.0), - camera_noise_mean: float = None, - camera_noise_stddev: float = None, - camera_publish_color: bool = False, - camera_publish_depth: bool = False, - camera_publish_points: bool = False, - # Note: Camera pose is with respect to the pose of `camera_relative_to` link (or world) - camera_spawn_position: Tuple[float, float, float] = (0, 0, 1), - camera_spawn_quat_xyzw: Tuple[float, float, float, float] = ( - 0, - 0.70710678118, - 0, - 0.70710678118, - ), - camera_random_pose_rollouts_num: int = 1, - camera_random_pose_mode: str = "orbit", - camera_random_pose_orbit_distance: float = 1.0, - camera_random_pose_orbit_height_range: Tuple[float, float] = (0.1, 0.7), - camera_random_pose_orbit_ignore_arc_behind_robot: float = np.pi / 8, - camera_random_pose_select_position_options: List[ - Tuple[float, float, float] - ] = [], - camera_random_pose_focal_point_z_offset: float = 0.0, + camera_args: list[CameraData] = [], # Terrain terrain_enable: bool = True, terrain_type: str = "flat", @@ -113,23 +93,24 @@ class ManipulationGazeboEnvRandomizer( light_radius: float = 25.0, light_model_rollouts_num: int = 1, # Objects - object_enable: bool = True, - object_type: str = "box", - objects_relative_to: str = "base_link", - object_static: bool = False, - object_collision: bool = True, - object_visual: bool = True, - object_color: Tuple[float, float, float, float] = (0.8, 0.8, 0.8, 1.0), - object_dimensions: List[float] = [0.05, 0.05, 0.05], - object_mass: float = 0.1, - object_count: int = 1, - object_randomize_count: bool = False, - object_spawn_position: Tuple[float, float, float] = (0.0, 0.0, 0.0), - object_random_pose: bool = True, - object_random_spawn_position_segments: List[Tuple[float, float, float]] = [], - object_random_spawn_position_update_workspace_centre: bool = False, - object_random_spawn_volume: Tuple[float, float, float] = (0.5, 0.5, 0.5), - object_models_rollouts_num: int = 1, + object_args: list = [], + # object_enable: bool = True, + # object_type: str = "box", + # objects_relative_to: str = "base_link", + # object_static: bool = False, + # object_collision: bool = True, + # object_visual: bool = True, + # object_color: Tuple[float, float, float, float] = (0.8, 0.8, 0.8, 1.0), + # object_dimensions: List[float] = [0.05, 0.05, 0.05], + # object_mass: float = 0.1, + # object_count: int = 1, + # object_randomize_count: bool = False, + # object_spawn_position: Tuple[float, float, float] = (0.0, 0.0, 0.0), + # object_random_pose: bool = True, + # object_random_spawn_position_segments: List[Tuple[float, float, float]] = [], + # object_random_spawn_position_update_workspace_centre: bool = False, + # object_random_spawn_volume: Tuple[float, float, float] = (0.5, 0.5, 0.5), + # object_models_rollouts_num: int = 1, # Collision plane below terrain underworld_collision_plane: bool = True, boundary_collision_walls: bool = False, @@ -139,7 +120,6 @@ class ManipulationGazeboEnvRandomizer( visualise_spawn_volume: bool = False, **kwargs, ): - # TODO (a lot of work): Implement proper physics randomization. if physics_rollouts_num != 0: raise TypeError( @@ -147,13 +127,17 @@ class ManipulationGazeboEnvRandomizer( ) # Update kwargs before passing them to the task constructor (some tasks might need them) - kwargs.update( - { - "camera_type": camera_type, - "camera_width": camera_width, - "camera_height": camera_height, - } - ) + # TODO: update logic when will need choose between cameras + cameras = [] + for camera in camera_args: + camera_info = {} + camera_info["name"] = camera.name + camera_info["type"] = camera.type + camera_info["width"] = camera.width + camera_info["height"] = camera.height + cameras.append(camera_info) + + kwargs.update({"camera_info": cameras}) # Initialize base classes randomizers.abc.TaskRandomizer.__init__(self) @@ -192,42 +176,11 @@ class ManipulationGazeboEnvRandomizer( ) # Camera - self._camera_enable = camera_enable - self._camera_type = camera_type - self._camera_relative_to = camera_relative_to - self._camera_width = camera_width - self._camera_height = camera_height - self._camera_image_format = camera_image_format - self._camera_update_rate = camera_update_rate - self._camera_horizontal_fov = camera_horizontal_fov - self._camera_vertical_fov = camera_vertical_fov - self._camera_clip_color = camera_clip_color - self._camera_clip_depth = camera_clip_depth - self._camera_noise_mean = camera_noise_mean - self._camera_noise_stddev = camera_noise_stddev - self._camera_publish_color = camera_publish_color - self._camera_publish_depth = camera_publish_depth - self._camera_publish_points = camera_publish_points - self._camera_spawn_position = camera_spawn_position - self._camera_spawn_quat_xyzw = camera_spawn_quat_xyzw - self._camera_random_pose_rollouts_num = camera_random_pose_rollouts_num - self._camera_random_pose_mode = camera_random_pose_mode - self._camera_random_pose_orbit_distance = camera_random_pose_orbit_distance - self._camera_random_pose_orbit_height_range = ( - camera_random_pose_orbit_height_range - ) - self._camera_random_pose_orbit_ignore_arc_behind_robot = ( - camera_random_pose_orbit_ignore_arc_behind_robot - ) - self._camera_random_pose_select_position_options = ( - camera_random_pose_select_position_options - ) - self._camera_random_pose_focal_point_z_offset = ( - camera_random_pose_focal_point_z_offset - ) + self._cameras = camera_args + self._camera_enable = False - #FT sensor - self._ft_sensor_enable = True #TODO: add external param + # FT sensor + self._ft_sensor_enable = True # TODO: add external param # Terrain self._terrain_enable = terrain_enable @@ -247,26 +200,7 @@ class ManipulationGazeboEnvRandomizer( self._light_model_rollouts_num = light_model_rollouts_num # Objects - self._object_enable = object_enable - self._objects_relative_to = objects_relative_to - self._object_static = object_static - self._object_collision = object_collision - self._object_visual = object_visual - self._object_color = object_color - self._object_dimensions = object_dimensions - self._object_mass = object_mass - self._object_count = object_count - self._object_randomize_count = object_randomize_count - self._object_spawn_position = object_spawn_position - self._object_random_pose = object_random_pose - self._object_random_spawn_position_segments = ( - object_random_spawn_position_segments - ) - self._object_random_spawn_position_update_workspace_centre = ( - object_random_spawn_position_update_workspace_centre - ) - self._object_random_spawn_volume = object_random_spawn_volume - self._object_models_rollouts_num = object_models_rollouts_num + self._objects = object_args # Collision plane beneath the terrain (prevent objects from falling forever) self._underworld_collision_plane = underworld_collision_plane @@ -289,21 +223,17 @@ class ManipulationGazeboEnvRandomizer( self.__is_light_type_randomizable = models.is_light_type_randomizable( light_type ) - self.__object_model_class = models.get_object_model_class(object_type) - self.__is_object_type_randomizable = models.is_object_type_randomizable( - object_type - ) - - # If object count is randomized, set max allowed count based on passed arg - if self._object_randomize_count: - self.__object_max_count = self._object_count + # self.__object_model_classes: list[ModelWrapper] = [] + # self.__is_object_type_randomizable: list[bool] = [] + # self.__objects_max_count: list[int] = [] # Variable initialisation # # Rollout counters - self.__camera_pose_rollout_counter = camera_random_pose_rollouts_num + self.__objects_unique_names: dict[str, list[str]] = {} + self.__camera_pose_rollout_counter: int = 0 self.__terrain_model_rollout_counter = terrain_model_rollouts_num self.__light_model_rollout_counter = light_model_rollouts_num - self.__object_models_rollout_counter = object_models_rollouts_num + self.__object_models_rollout_counter: int = 0 # Flag that determines whether the camera is attached to the robot via detachable joint self.__is_camera_attached = False @@ -320,15 +250,12 @@ class ManipulationGazeboEnvRandomizer( ########################## def init_physics_preset(self, task: SupportedTasks): - self.set_gravity(task=task) def randomize_physics(self, task: SupportedTasks, **kwargs): - self.set_gravity(task=task) def set_gravity(self, task: SupportedTasks): - if not task.world.to_gazebo().set_gravity( ( task.np_random.normal(loc=self._gravity[0], scale=self._gravity_std[0]), @@ -339,7 +266,6 @@ class ManipulationGazeboEnvRandomizer( raise RuntimeError("Failed to set the gravity") def get_engine(self): - return scenario.PhysicsEngine_dart ####################### @@ -396,22 +322,17 @@ class ManipulationGazeboEnvRandomizer( raise RuntimeError("Failed to execute a paused Gazebo run") # Offset some parameters by the robot base offset - self._object_spawn_position = ( - self._object_spawn_position[0], - self._object_spawn_position[1], - self._object_spawn_position[2] + task.robot_model_class.BASE_LINK_Z_OFFSET, - ) - self._camera_random_pose_focal_point_z_offset += ( - task.robot_model_class.BASE_LINK_Z_OFFSET - ) + for object in self._objects: + object.position = ( + object.position[0], + object.position[1], + object.position[2] + task.robot_model_class.BASE_LINK_Z_OFFSET, + ) - # Substitute frame names if needed - self._camera_relative_to = task.substitute_special_frame( - self._camera_relative_to - ) - self._objects_relative_to = task.substitute_special_frame( - self._objects_relative_to - ) + for camera in self._cameras: + camera.random_pose_focal_point_z_offset += ( + task.robot_model_class.BASE_LINK_Z_OFFSET + ) # Initialise custom physics preset self.init_physics_preset(task=task) @@ -455,6 +376,10 @@ class ManipulationGazeboEnvRandomizer( raise RuntimeError("Failed to execute a paused Gazebo run") # Sensors + for camera in self._cameras: + if camera.enable: + self._camera_enable = True + if self._camera_enable: task.get_logger().info( f"Inserting world plugins for sensors with {self._plugin_sensors_render_engine} rendering engine..." @@ -470,7 +395,7 @@ class ManipulationGazeboEnvRandomizer( if self._ft_sensor_enable: task.world.to_gazebo().insert_world_plugin( "ignition-gazebo-forcetorque-system", - "ignition::gazebo::systems::ForceTorque" + "ignition::gazebo::systems::ForceTorque", ) # Execute a paused run to process world plugin insertion @@ -505,14 +430,21 @@ class ManipulationGazeboEnvRandomizer( self.add_robot(task=task, gazebo=gazebo) # Insert camera - if self._camera_enable: - task.get_logger().info("Inserting camera into the environment...") - self.add_camera(task=task, gazebo=gazebo) + for camera in self._cameras: + if camera.enable: + task.get_logger().info( + f"Inserting [{camera.name}] into the environment ..." + ) + self.add_camera(task=task, gazebo=gazebo, camera=camera) # Insert default object if enabled and object randomization is disabled - if self._object_enable and not self.__object_models_randomizer_enabled(): - task.get_logger().info("Inserting default objects into the environment...") - self.add_default_objects(task=task, gazebo=gazebo) + for object in self._objects: + # TODO: support for random models + # this code is not executable for objects with random model + task.get_logger().info( + f"Inserting default object [{object.name}] to the environment" + ) + self.add_default_objects(task=task, gazebo=gazebo, obj=object) # TODO (medium): Consider replacing invisible planes with removal of all objects that are too low along a certain axis # Insert invisible plane below the terrain to prevent objects from falling into the abyss and causing physics errors @@ -549,9 +481,6 @@ class ManipulationGazeboEnvRandomizer( orientation=quat_to_wxyz(self._robot_spawn_quat_xyzw), initial_arm_joint_positions=task.initial_arm_joint_positions, initial_gripper_joint_positions=task.initial_gripper_joint_positions, - # TODO (medium): Pass xacro mappings to the function - # xacro_mappings={}, - # kwargs={}, ) # The desired name is passed as arg on creation, however, a different name might be selected to be unique @@ -568,11 +497,11 @@ class ManipulationGazeboEnvRandomizer( link = robot_gazebo.get_link(link_name=arm_link_name) link.enable_contact_detection(True) - # If mobile, enable contact detection also for the wheels - if self.robot.is_mobile: - for wheel_link_name in self.robot.wheel_link_names: - wheel = robot_gazebo.get_link(link_name=wheel_link_name) - wheel.enable_contact_detection(True) + # # If mobile, enable contact detection also for the wheels + # if self.robot.is_mobile: + # for wheel_link_name in self.robot.wheel_link_names: + # wheel = robot_gazebo.get_link(link_name=wheel_link_name) + # wheel.enable_contact_detection(True) # Execute a paused run to process robot model insertion if not gazebo.run(paused=True): @@ -584,47 +513,46 @@ class ManipulationGazeboEnvRandomizer( ) def add_camera( - self, - task: SupportedTasks, - gazebo: scenario.GazeboSimulator, + self, task: SupportedTasks, gazebo: scenario.GazeboSimulator, camera: CameraData ): """ Configure and insert camera into the simulation. Camera is places with respect to the robot """ - if task.world.to_gazebo().name() == self._camera_relative_to: - camera_position = self._camera_spawn_position - camera_quat_wxyz = quat_to_wxyz(self._camera_spawn_quat_xyzw) + if task.world.to_gazebo().name() == camera.relative_to: + camera_position = camera.spawn_position + camera_quat_wxyz = quat_to_wxyz(camera.spawn_quat_xyzw) else: # Transform the pose of camera to be with respect to robot - but still represented in world reference frame for insertion into the world camera_position, camera_quat_wxyz = transform_move_to_model_pose( world=task.world, - position=self._camera_spawn_position, - quat=quat_to_wxyz(self._camera_spawn_quat_xyzw), + position=camera.spawn_position, + quat=quat_to_wxyz(camera.spawn_quat_xyzw), target_model=self.robot, - target_link=self._camera_relative_to, + target_link=camera.relative_to, xyzw=False, ) # Create camera - self.camera = models.Camera( + self.camera_wrapper = models.Camera( + name=camera.name, world=task.world, position=camera_position, orientation=camera_quat_wxyz, - camera_type=self._camera_type, - width=self._camera_width, - height=self._camera_height, - image_format=self._camera_image_format, - update_rate=self._camera_update_rate, - horizontal_fov=self._camera_horizontal_fov, - vertical_fov=self._camera_vertical_fov, - clip_color=self._camera_clip_color, - clip_depth=self._camera_clip_depth, - noise_mean=self._camera_noise_mean, - noise_stddev=self._camera_noise_stddev, - ros2_bridge_color=self._camera_publish_color, - ros2_bridge_depth=self._camera_publish_depth, - ros2_bridge_points=self._camera_publish_points, + camera_type=camera.type, + width=camera.width, + height=camera.height, + image_format=camera.image_format, + update_rate=camera.update_rate, + horizontal_fov=camera.horizontal_fov, + vertical_fov=camera.vertical_fov, + clip_color=camera.clip_color, + clip_depth=camera.clip_depth, + noise_mean=camera.noise_mean, + noise_stddev=camera.noise_stddev, + ros2_bridge_color=camera.publish_color, + ros2_bridge_depth=camera.publish_depth, + ros2_bridge_points=camera.publish_points, ) # Execute a paused run to process camera model insertion @@ -632,23 +560,26 @@ class ManipulationGazeboEnvRandomizer( raise RuntimeError("Failed to execute a paused Gazebo run") # Attach to robot if needed - if task.world.to_gazebo().name() != self._camera_relative_to: + if task.world.to_gazebo().name() != camera.relative_to: if not self.robot.to_gazebo().attach_link( - self._camera_relative_to, self.camera.name(), self.camera.link_name + camera.relative_to, + self.camera_wrapper.name(), + self.camera_wrapper.link_name, ): raise Exception("Cannot attach camera link to robot") - self.__is_camera_attached = True - # Execute a paused run to process camera link attachment - if not gazebo.run(paused=True): - raise RuntimeError("Failed to execute a paused Gazebo run") + self.__is_camera_attached = True + + # Execute a paused run to process camera link attachment + if not gazebo.run(paused=True): + raise RuntimeError("Failed to execute a paused Gazebo run") # Broadcast tf task.tf2_broadcaster.broadcast_tf( - parent_frame_id=self._camera_relative_to, - child_frame_id=self.camera.frame_id, - translation=self._camera_spawn_position, - rotation=self._camera_spawn_quat_xyzw, + parent_frame_id=camera.relative_to, + child_frame_id=self.camera_wrapper.frame_id, + translation=camera.spawn_position, + rotation=camera.spawn_quat_xyzw, xyzw=True, ) @@ -703,64 +634,67 @@ class ManipulationGazeboEnvRandomizer( raise RuntimeError("Failed to execute a paused Gazebo run") def add_default_objects( - self, task: SupportedTasks, gazebo: scenario.GazeboSimulator + self, task: SupportedTasks, gazebo: scenario.GazeboSimulator, obj: ObjectData ): """ Configure and insert default object into the simulation """ + # TODO: add randomization when spawning # Insert new models with random pose - while len(self.task.object_names) < self._object_count: - if self._object_count > 1: - object_position, object_quat_wxyz = self.get_random_object_pose( - task=task, - centre=self._object_spawn_position, - volume=self._object_random_spawn_volume, - ) - else: - object_position = self._object_spawn_position - object_quat_wxyz = (1.0, 0.0, 0.0, 0.0) + # while len(self.task.object_names) < self._object_count: + # if self._object_count > 1: + # object_position, object_quat_wxyz = self.get_random_object_pose( + # task=task, + # centre=self._object_spawn_position, + # volume=self._object_random_spawn_volume, + # ) + # else: + # object_position = self._object_spawn_position + # object_quat_wxyz = (1.0, 0.0, 0.0, 0.0) + # + # if task.world.to_gazebo().name() != self._objects_relative_to: + # # Transform the pose of camera to be with respect to robot - but represented in world reference frame for insertion into the world + # object_position, object_quat_wxyz = transform_move_to_model_pose( + # world=task.world, + # position=object_position, + # quat=object_quat_wxyz, + # target_model=self.robot, + # target_link=self._objects_relative_to, + # xyzw=False, + # ) + try: + obj_dict = asdict(obj) + obj_dict["world"] = task.world + match obj: + case BoxObjectData(): + object_wrapper = models.Box(**obj_dict) + case PlaneObjectData(): + object_wrapper = models.Plane(**obj_dict) + case SphereObjectData(): + object_wrapper = models.Sphere(**obj_dict) + case CylinderObjectData(): + object_wrapper = models.Cylinder(**obj_dict) + case MeshObjectData(): + object_wrapper = models.Mesh(**obj_dict) + case _: + raise NotImplementedError("Type is not supported") - if task.world.to_gazebo().name() != self._objects_relative_to: - # Transform the pose of camera to be with respect to robot - but represented in world reference frame for insertion into the world - object_position, object_quat_wxyz = transform_move_to_model_pose( - world=task.world, - position=object_position, - quat=object_quat_wxyz, - target_model=self.robot, - target_link=self._objects_relative_to, - xyzw=False, - ) + model_name = object_wrapper.name() + if obj.name not in self.__objects_unique_names: + self.__objects_unique_names[obj.name] = [] + self.__objects_unique_names[obj.name].append(model_name) - try: + # Expose name of the object for task (append in case of more) + task.object_names.append(model_name) - # Create object - object_model = self.__object_model_class( - world=task.world, - position=object_position, - orientation=object_quat_wxyz, - size=self._object_dimensions, - radius=self._object_dimensions[0], - length=self._object_dimensions[1], - mass=self._object_mass, - collision=self._object_collision, - visual=self._object_visual, - static=self._object_static, - color=self._object_color, - ) + # Enable contact detection + for link_name in object_wrapper.link_names(): + link = object_wrapper.to_gazebo().get_link(link_name=link_name) + link.enable_contact_detection(True) - model_name = object_model.name() - - # Expose name of the object for task (append in case of more) - task.object_names.append(model_name) - - # Enable contact detection - for link_name in object_model.link_names(): - link = object_model.to_gazebo().get_link(link_name=link_name) - link.enable_contact_detection(True) - - except Exception as ex: - task.get_logger().warn(f"Model could not be inserted. Reason: {ex}") + except Exception as ex: + task.get_logger().warn(f"Model could not be inserted. Reason: {ex}") # Execute a paused run to process insertion of object model if not gazebo.run(paused=True): @@ -895,22 +829,37 @@ class ManipulationGazeboEnvRandomizer( randomize=self._robot_random_joint_positions, ) - # Randomize camera if needed - if self._camera_enable and self._camera_pose_expired(): - self.randomize_camera_pose( - task=task, gazebo=gazebo, mode=self._camera_random_pose_mode - ) + # Randomize cameras if needed + for camera in self._cameras: + if camera.enable and self._camera_pose_expired(camera): + self.randomize_camera_pose( + task=task, + gazebo=gazebo, + mode=camera.random_pose_mode, + camera=camera, + ) # Randomize objects if needed - # Note: No need to randomize pose of new models because they are already spawned randomly - if self._object_enable: + # NOTE: No need to randomize pose of new models because they are already spawned randomly + randomized_objects = set() + if self._objects: self.__object_positions.clear() - if self._object_models_expired(): - self.randomize_object_models(task=task, gazebo=gazebo) - elif self._object_random_pose: - self.object_random_pose(task=task, gazebo=gazebo) - else: - self.reset_default_object_pose(task=task, gazebo=gazebo) + for obj in self._objects: + if obj.name in randomized_objects: + continue + # NOTE: currently unimplemented well + # if obj.randomize.random_model: + # if self._object_models_expired(obj): + # self.randomize_object_models(task=task, gazebo=gazebo) + if ( + obj.randomize.random_pose + or obj.randomize.random_position + or obj.randomize.random_orientation + ): + self.object_random_pose(task=task, gazebo=gazebo, object=obj) + else: + self.reset_default_object_pose(task=task, gazebo=gazebo, object=obj) + randomized_objects.add(obj.name) # Randomize terrain if needed if self._terrain_enable and self._terrain_model_expired(): @@ -922,7 +871,6 @@ class ManipulationGazeboEnvRandomizer( gazebo: scenario.GazeboSimulator, randomize: bool = False, ): - if randomize: position = [ self._robot_spawn_position[0] @@ -950,7 +898,6 @@ class ManipulationGazeboEnvRandomizer( gazebo_robot = self.robot.to_gazebo() gazebo_robot.reset_base_pose(position, quat_to_wxyz(quat_xyzw)) - gazebo_robot.reset_base_world_velocity([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]) # Execute a paused run to process model modification if not gazebo.run(paused=True): @@ -963,18 +910,18 @@ class ManipulationGazeboEnvRandomizer( above_object_spawn: bool = False, randomize: bool = False, ): - # TODO: remove moveit # Stop servoing # if task._use_servo: # if task.servo.is_enabled: # task.servo.servo() # task.servo.disable(sync=True) - + # task.controller.publish = False gazebo_robot = self.robot.to_gazebo() + #TODO: select object if above_object_spawn: # If desired, compute IK above object spawn if randomize: @@ -1079,19 +1026,24 @@ class ManipulationGazeboEnvRandomizer( task.gripper.open() def randomize_camera_pose( - self, task: SupportedTasks, gazebo: scenario.GazeboSimulator, mode: str + self, + task: SupportedTasks, + gazebo: scenario.GazeboSimulator, + mode: str, + camera: CameraData, ): - # Get random camera pose, centred at object position (or centre of object spawn box) if "orbit" == mode: camera_position, camera_quat_xyzw = self.get_random_camera_pose_orbit( task=task, centre=self._object_spawn_position, - distance=self._camera_random_pose_orbit_distance, - height=self._camera_random_pose_orbit_height_range, - ignore_arc_behind_robot=self._camera_random_pose_orbit_ignore_arc_behind_robot, - focal_point_z_offset=self._camera_random_pose_focal_point_z_offset, + distance=camera.random_pose_orbit_distance, + height=camera.random_pose_orbit_height_range, + ignore_arc_behind_robot=camera.random_pose_orbit_ignore_arc_behind_robot, + focal_point_z_offset=camera.random_pose_focal_point_z_offset, ) + # elif "off" == mode: + # (camera_position, camera_quat_xyzw) = (self) elif "select_random" == mode: ( camera_position, @@ -1099,7 +1051,7 @@ class ManipulationGazeboEnvRandomizer( ) = self.get_random_camera_pose_sample_random( task=task, centre=self._object_spawn_position, - options=self._camera_random_pose_select_position_options, + options=camera.random_pose_select_position_options, ) elif "select_nearest" == mode: ( @@ -1107,12 +1059,12 @@ class ManipulationGazeboEnvRandomizer( camera_quat_xyzw, ) = self.get_random_camera_pose_sample_nearest( centre=self._object_spawn_position, - options=self._camera_random_pose_select_position_options, + options=camera.random_pose_select_position_options, ) else: raise TypeError("Invalid mode for camera pose randomization.") - if task.world.to_gazebo().name() == self._camera_relative_to: + if task.world.to_gazebo().name() == camera.relative_to: transformed_camera_position = camera_position transformed_camera_quat_wxyz = quat_to_wxyz(camera_quat_xyzw) else: @@ -1125,14 +1077,16 @@ class ManipulationGazeboEnvRandomizer( position=camera_position, quat=quat_to_wxyz(camera_quat_xyzw), target_model=self.robot, - target_link=self._camera_relative_to, + target_link=camera.relative_to, xyzw=False, ) # Detach camera if needed if self.__is_camera_attached: if not self.robot.to_gazebo().detach_link( - self._camera_relative_to, self.camera.name(), self.camera.link_name + camera.relative_to, + self.camera_wrapper.name(), + self.camera_wrapper.link_name, ): raise Exception("Cannot detach camera link from robot") @@ -1153,7 +1107,9 @@ class ManipulationGazeboEnvRandomizer( # Attach to robot again if needed if self.__is_camera_attached: if not self.robot.to_gazebo().attach_link( - self._camera_relative_to, self.camera.name(), self.camera.link_name + camera.relative_to, + self.camera_wrapper.name(), + self.camera_wrapper.link_name, ): raise Exception("Cannot attach camera link to robot") @@ -1163,8 +1119,8 @@ class ManipulationGazeboEnvRandomizer( # Broadcast tf task.tf2_broadcaster.broadcast_tf( - parent_frame_id=self._camera_relative_to, - child_frame_id=self.camera.frame_id, + parent_frame_id=camera.relative_to, + child_frame_id=self.camera_wrapper.frame_id, translation=camera_position, rotation=camera_quat_xyzw, xyzw=True, @@ -1179,7 +1135,6 @@ class ManipulationGazeboEnvRandomizer( ignore_arc_behind_robot: float, focal_point_z_offset: float, ) -> Tuple[Tuple[float, float, float], Tuple[float, float, float, float]]: - # Select a random 3D position (with restricted min height) while True: position = task.np_random.uniform( @@ -1217,7 +1172,6 @@ class ManipulationGazeboEnvRandomizer( centre: Tuple[float, float, float], options: List[Tuple[float, float, float]], ) -> Tuple[Tuple[float, float, float], Tuple[float, float, float, float]]: - # Select a random entry from the options selection = options[task.np_random.randint(len(options))] @@ -1233,7 +1187,6 @@ class ManipulationGazeboEnvRandomizer( centre: Tuple[float, float, float], options: List[Tuple[float, float, float]], ) -> Tuple[Tuple[float, float, float], Tuple[float, float, float, float]]: - # Select the nearest entry dist_sqr = np.sum((np.array(options) - np.array(centre)) ** 2, axis=1) nearest = options[np.argmin(dist_sqr)] @@ -1251,7 +1204,6 @@ class ManipulationGazeboEnvRandomizer( position: Tuple[float, float, float], focal_point_z_offset: float, ) -> Tuple[Tuple[float, float, float], Tuple[float, float, float, float]]: - # Determine orientation such that camera faces the centre rpy = [ 0.0, @@ -1266,7 +1218,6 @@ class ManipulationGazeboEnvRandomizer( return position, quat_xyzw def randomize_terrain(self, task: SupportedTasks, gazebo: scenario.GazeboSimulator): - # Remove existing terrain if hasattr(self, "terrain"): if not task.world.to_gazebo().remove_model(self.terrain.name()): @@ -1302,7 +1253,6 @@ class ManipulationGazeboEnvRandomizer( raise RuntimeError("Failed to execute an unpaused Gazebo run") def randomize_light(self, task: SupportedTasks, gazebo: scenario.GazeboSimulator): - # Remove existing light if hasattr(self, "light"): if not task.world.to_gazebo().remove_model(self.light.name()): @@ -1325,14 +1275,12 @@ class ManipulationGazeboEnvRandomizer( raise RuntimeError("Failed to execute a paused Gazebo run") def reset_default_object_pose( - self, task: SupportedTasks, gazebo: scenario.GazeboSimulator + self, task: SupportedTasks, gazebo: scenario.GazeboSimulator, object: ObjectData ): - - assert len(task.object_names) == 1 - - obj = task.world.to_gazebo().get_model(task.object_names[0]).to_gazebo() - obj.reset_base_pose(self._object_spawn_position, (1.0, 0.0, 0.0, 0.0)) - obj.reset_base_world_velocity([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]) + task_object_names = self.__objects_unique_names[object.name] + for object_name in task_object_names: + obj = task.world.to_gazebo().get_model(object_name).to_gazebo() + obj.reset_base_pose(object.position, object.orientation) # Execute a paused run to process model modification if not gazebo.run(paused=True): @@ -1341,7 +1289,6 @@ class ManipulationGazeboEnvRandomizer( def randomize_object_models( self, task: SupportedTasks, gazebo: scenario.GazeboSimulator ): - # Remove all existing models if len(self.task.object_names) > 0: for object_name in self.task.object_names: @@ -1350,27 +1297,41 @@ class ManipulationGazeboEnvRandomizer( self.task.object_names.clear() # Insert new models with random pose - while len(self.task.object_names) < self._object_count: + i = 0 + while len(self.task.object_names) < len(self._objects): position, quat_random = self.get_random_object_pose( - task=task, - centre=self._object_spawn_position, - volume=self._object_random_spawn_volume, + task=task, obj=self._objects[i] ) try: - model = self.__object_model_class( - world=task.world, - position=position, - orientation=quat_random, - np_random=task.np_random, - ) - model_name = model.name() + obj_dict = asdict(self._objects[i]) + obj_dict["world"] = task.world + obj_dict["position"] = position + obj_dict["orientation"] = quat_random + obj_dict["np_random"] = task.np_random + match self._objects[i]: + case BoxObjectData(): + object_wrapper = models.Box(**obj_dict) + case PlaneObjectData(): + object_wrapper = models.Plane(**obj_dict) + case SphereObjectData(): + object_wrapper = models.Sphere(**obj_dict) + case CylinderObjectData(): + object_wrapper = models.Cylinder(**obj_dict) + case MeshObjectData(): + object_wrapper = models.Mesh(**obj_dict) + case _: + raise NotImplementedError("Type is not supported") + + model_name = object_wrapper.name() self.task.object_names.append(model_name) self.__object_positions[model_name] = position # Enable contact detection - for link_name in model.link_names(): - link = model.to_gazebo().get_link(link_name=link_name) + for link_name in object_wrapper.link_names(): + link = object_wrapper.to_gazebo().get_link(link_name=link_name) link.enable_contact_detection(True) + i += 1 + except Exception as ex: task.get_logger().warn(f"Model could not be inserted: {ex}") @@ -1379,18 +1340,19 @@ class ManipulationGazeboEnvRandomizer( raise RuntimeError("Failed to execute a paused Gazebo run") def object_random_pose( - self, task: SupportedTasks, gazebo: scenario.GazeboSimulator + self, task: SupportedTasks, gazebo: scenario.GazeboSimulator, object: ObjectData ): - - for object_name in self.task.object_names: - position, quat_random = self.get_random_object_pose( - task=task, - centre=self._object_spawn_position, - volume=self._object_random_spawn_volume, - ) + task_object_names = self.__objects_unique_names[object.name] + for object_name in task_object_names: + position, quat_random = self.get_random_object_pose(task=task, obj=object) obj = task.world.to_gazebo().get_model(object_name).to_gazebo() - obj.reset_base_pose(position, quat_random) - obj.reset_base_world_velocity([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]) + if object.randomize.random_pose: + obj.reset_base_pose(position, quat_random) + elif object.randomize.random_orientation: + obj.reset_base_pose(object.position, quat_random) + elif object.randomize.random_position: + obj.reset_base_pose(position, object.orientation) + self.__object_positions[object_name] = position # Execute a paused run to process model modification @@ -1400,28 +1362,38 @@ class ManipulationGazeboEnvRandomizer( def get_random_object_pose( self, task: SupportedTasks, - centre: Tuple[float, float, float], - volume: Tuple[float, float, float], + obj: ObjectData, name: str = "", min_distance_to_other_objects: float = 0.2, min_distance_decay_factor: float = 0.95, ): - is_too_close = True while is_too_close: - object_position = [ - centre[0] + task.np_random.uniform(-volume[0] / 2, volume[0] / 2), - centre[1] + task.np_random.uniform(-volume[1] / 2, volume[1] / 2), - centre[2] + task.np_random.uniform(-volume[2] / 2, volume[2] / 2), - ] + object_position = ( + obj.position[0] + + task.np_random.uniform( + -obj.randomize.random_spawn_volume[0] / 2, + obj.randomize.random_spawn_volume[0] / 2, + ), + obj.position[1] + + task.np_random.uniform( + -obj.randomize.random_spawn_volume[1] / 2, + obj.randomize.random_spawn_volume[1] / 2, + ), + obj.position[2] + + task.np_random.uniform( + -obj.randomize.random_spawn_volume[2] / 2, + obj.randomize.random_spawn_volume[2] / 2, + ), + ) - if task.world.to_gazebo().name() != self._objects_relative_to: + if task.world.to_gazebo().name() != obj.relative_to: # Transform the pose of camera to be with respect to robot - but represented in world reference frame for insertion into the world object_position = transform_move_to_model_position( world=task.world, position=object_position, target_model=self.robot, - target_link=self._objects_relative_to, + target_link=obj.relative_to, ) # Check if position is far enough from other @@ -1451,11 +1423,12 @@ class ManipulationGazeboEnvRandomizer( """ Perform internal overrides if parameters """ + pass - if self._object_randomize_count: - self._object_count = task.np_random.randint( - low=1, high=self.__object_max_count + 1 - ) + # if self._object_randomize_count: + # self._object_count = task.np_random.randint( + # low=1, high=self.__object_max_count + 1 + # ) # External overrides # def external_overrides(self, task: SupportedTasks): @@ -1471,121 +1444,138 @@ class ManipulationGazeboEnvRandomizer( Perform steps that are required before randomization is performed. """ - # If desired, select random spawn position from the segments - # It is performed here because object spawn position might be of interest also for robot and camera randomization - segments_len = len(self._object_random_spawn_position_segments) - if segments_len > 1: - # Randomly select a segment between two points - start_index = task.np_random.randint(segments_len - 1) - segment = ( - self._object_random_spawn_position_segments[start_index], - self._object_random_spawn_position_segments[start_index + 1], - ) + for obj in self._objects: + # If desired, select random spawn position from the segments + # It is performed here because object spawn position might be of interest also for robot and camera randomization - # Randomly select a point on the segment and use it as the new object spawn position - intersect = task.np_random.random() - direction = ( - segment[1][0] - segment[0][0], - segment[1][1] - segment[0][1], - segment[1][2] - segment[0][2], - ) - self._object_spawn_position = ( - segment[0][0] + intersect * direction[0], - segment[0][1] + intersect * direction[1], - segment[0][2] + intersect * direction[2], - ) - - # Update also the workspace centre (and bounding box) if desired - if self._object_random_spawn_position_update_workspace_centre: - task.workspace_centre = ( - self._object_spawn_position[0], - self._object_spawn_position[1], - # Z workspace is currently kept the same on purpose - task.workspace_centre[2], - ) - workspace_volume_half = ( - task.workspace_volume[0] / 2, - task.workspace_volume[1] / 2, - task.workspace_volume[2] / 2, - ) - task.workspace_min_bound = ( - task.workspace_centre[0] - workspace_volume_half[0], - task.workspace_centre[1] - workspace_volume_half[1], - task.workspace_centre[2] - workspace_volume_half[2], - ) - task.workspace_max_bound = ( - task.workspace_centre[0] + workspace_volume_half[0], - task.workspace_centre[1] + workspace_volume_half[1], - task.workspace_centre[2] + workspace_volume_half[2], + segments_len = len(obj.randomize.random_spawn_position_segments) + if segments_len > 1: + # Randomly select a segment between two points + start_index = task.np_random.randint(segments_len - 1) + segment = ( + obj.randomize.random_spawn_position_segments[start_index], + obj.randomize.random_spawn_position_segments[start_index + 1], ) - # Post-randomization # + # Randomly select a point on the segment and use it as the new object spawn position + intersect = task.np_random.random() + direction = ( + segment[1][0] - segment[0][0], + segment[1][1] - segment[0][1], + segment[1][2] - segment[0][2], + ) + obj.position = ( + segment[0][0] + intersect * direction[0], + segment[0][1] + intersect * direction[1], + segment[0][2] + intersect * direction[2], + ) + + # TODO: add bounding box with multiple objects + + # # Update also the workspace centre (and bounding box) if desired + # if self._object_random_spawn_position_update_workspace_centre: + # task.workspace_centre = ( + # self._object_spawn_position[0], + # self._object_spawn_position[1], + # # Z workspace is currently kept the same on purpose + # task.workspace_centre[2], + # ) + # workspace_volume_half = ( + # task.workspace_volume[0] / 2, + # task.workspace_volume[1] / 2, + # task.workspace_volume[2] / 2, + # ) + # task.workspace_min_bound = ( + # task.workspace_centre[0] - workspace_volume_half[0], + # task.workspace_centre[1] - workspace_volume_half[1], + # task.workspace_centre[2] - workspace_volume_half[2], + # ) + # task.workspace_max_bound = ( + # task.workspace_centre[0] + workspace_volume_half[0], + # task.workspace_centre[1] + workspace_volume_half[1], + # task.workspace_centre[2] + workspace_volume_half[2], + # ) + def post_randomization( self, task: SupportedTasks, gazebo: scenario.GazeboSimulator ): """ - Perform steps that are required once randomization is complete and the simulation can be stepped a few times unpaused. + Perform steps required once randomization is complete and the simulation can be stepped unpaused. """ - attempts = 0 - object_overlapping_ok = False + def perform_gazebo_step(): + if not gazebo.step(): + raise RuntimeError("Failed to execute an unpaused Gazebo step") - if self.POST_RANDOMIZATION_MAX_STEPS == attempts: + def wait_for_new_observations(): + attempts = 0 + while True: + attempts += 1 + if attempts % self.POST_RANDOMIZATION_MAX_STEPS == 0: + task.get_logger().debug( + f"Waiting for new joint state after reset. Iteration #{attempts}..." + ) + else: + task.get_logger().debug("Waiting for new joint state after reset.") + + perform_gazebo_step() + + # If camera_sub is defined, ensure all new observations are available + if hasattr(task, "camera_subs") and not all( + sub.new_observation_available for sub in task.camera_subs + ): + continue + + return # Observations are ready + + attempts = 0 + processed_objects = set() + + # Early exit if the maximum number of steps is already reached + if self.POST_RANDOMIZATION_MAX_STEPS == 0: task.get_logger().error( "Robot keeps falling through the terrain. There is something wrong..." ) return - # Make sure no objects are overlapping (intersections between collision geometry) - while ( - not object_overlapping_ok and attempts < self.POST_RANDOMIZATION_MAX_STEPS - ): - attempts += 1 - task.get_logger().debug("Objects overlapping, trying new positions") - object_overlapping_ok = self.check_object_overlapping(task=task) - if not gazebo.step(): - raise RuntimeError("Failed to execute an unpaused Gazebo step") - if self.POST_RANDOMIZATION_MAX_STEPS == attempts: - task.get_logger().warn( - "Objects could not be spawned without any overlapping. The workspace might be too crowded!" - ) - return + # Ensure no objects are overlapping + for obj in self._objects: + if not obj.randomize.random_pose or obj.name in processed_objects: + continue + + # Try repositioning until no overlap or maximum attempts reached + for _ in range(self.POST_RANDOMIZATION_MAX_STEPS): + task.get_logger().debug( + f"Checking overlap for {obj.name}, attempt {attempts + 1}" + ) + if self.check_object_overlapping(task=task, object=obj): + processed_objects.add(obj.name) + break # No overlap, move to next object + else: + task.get_logger().debug( + f"Objects overlapping, trying new positions for {obj.name}" + ) + + perform_gazebo_step() + + else: + task.get_logger().warn( + f"Could not place {obj.name} without overlap after {self.POST_RANDOMIZATION_MAX_STEPS} attempts" + ) + continue # Move to next object in case of failure # Execute steps until new observations are available - observations_ready = False - # task.moveit2.reset_new_joint_state_checker() - if task._enable_gripper: - task.gripper.reset_new_joint_state_checker() - if hasattr(task, "camera_sub"): - task.camera_sub.reset_new_observation_checker() - while not observations_ready: - attempts += 1 - if 0 == attempts % self.POST_RANDOMIZATION_MAX_STEPS: - task.get_logger().debug( - f"Waiting for new joint state after reset. Iteration #{attempts}..." - ) - else: - task.get_logger().debug("Waiting for new joint state after reset.") - if not gazebo.step(): - raise RuntimeError("Failed to execute an unpaused Gazebo step") + if hasattr(task, "camera_subs") or task._enable_gripper: + wait_for_new_observations() - # Break once all observaions are available - # if not task.moveit2.new_joint_state_available: - # continue - # if task._enable_gripper: - # if not task.gripper.new_joint_state_available: - # continue - if hasattr(task, "camera_sub"): - if not task.camera_sub.new_observation_available: - continue - observations_ready = True + # Final check if observations are not available within the maximum steps if self.POST_RANDOMIZATION_MAX_STEPS == attempts: task.get_logger().error("Cannot obtain new observation.") - return def check_object_overlapping( self, task: SupportedTasks, + object: ObjectData, allowed_penetration_depth: float = 0.001, terrain_allowed_penetration_depth: float = 0.002, ) -> bool: @@ -1598,13 +1588,14 @@ class ManipulationGazeboEnvRandomizer( """ # Update object positions - for object_name in self.task.object_names: + task_object_names = self.__objects_unique_names[object.name] + for object_name in task_object_names: model = task.world.get_model(object_name).to_gazebo() self.__object_positions[object_name] = model.get_link( link_name=model.link_names()[0] ).position() - for object_name in self.task.object_names: + for object_name in task_object_names: obj = task.world.get_model(object_name).to_gazebo() # Make sure the object is inside workspace @@ -1612,13 +1603,9 @@ class ManipulationGazeboEnvRandomizer( self.__object_positions[object_name] ): position, quat_random = self.get_random_object_pose( - task=task, - centre=self._object_spawn_position, - volume=self._object_random_spawn_volume, - name=object_name, + task=task, obj=object, name=object_name ) obj.reset_base_pose(position, quat_random) - obj.reset_base_world_velocity([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]) return False # Make sure the object is not intersecting other objects @@ -1636,12 +1623,10 @@ class ManipulationGazeboEnvRandomizer( ): position, quat_random = self.get_random_object_pose( task=task, - centre=self._object_spawn_position, - volume=self._object_random_spawn_volume, + obj=object, name=object_name, ) obj.reset_base_pose(position, quat_random) - obj.reset_base_world_velocity([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]) return False except Exception as e: task.get_logger().error( @@ -1654,7 +1639,7 @@ class ManipulationGazeboEnvRandomizer( # Randomizer rollouts checking # ============================ - def __camera_pose_randomizer_enabled(self) -> bool: + def __camera_pose_randomizer_enabled(self, camera: CameraData) -> bool: """ Checks if camera pose randomizer is enabled. @@ -1662,12 +1647,12 @@ class ManipulationGazeboEnvRandomizer( True if enabled, false otherwise """ - if self._camera_random_pose_rollouts_num == 0: + if camera.random_pose_rollouts_num == 0: return False else: return True - def _camera_pose_expired(self) -> bool: + def _camera_pose_expired(self, camera: CameraData) -> bool: """ Checks if camera pose needs to be randomized. @@ -1675,12 +1660,12 @@ class ManipulationGazeboEnvRandomizer( True if expired, false otherwise """ - if not self.__camera_pose_randomizer_enabled(): + if not self.__camera_pose_randomizer_enabled(camera): return False self.__camera_pose_rollout_counter += 1 - if self.__camera_pose_rollout_counter >= self._camera_random_pose_rollouts_num: + if self.__camera_pose_rollout_counter >= camera.random_pose_rollouts_num: self.__camera_pose_rollout_counter = 0 return True @@ -1750,7 +1735,7 @@ class ManipulationGazeboEnvRandomizer( return False - def __object_models_randomizer_enabled(self) -> bool: + def __object_models_randomizer_enabled(self, obj: ObjectData) -> bool: """ Checks if object model randomizer is enabled. @@ -1758,12 +1743,15 @@ class ManipulationGazeboEnvRandomizer( True if enabled, false otherwise """ - if self._object_models_rollouts_num == 0: - return False + if obj.randomize: + if obj.randomize.models_rollouts_num == 0: + return False + else: + return True else: - return self.__is_object_type_randomizable + return False - def _object_models_expired(self) -> bool: + def _object_models_expired(self, obj: ObjectData) -> bool: """ Checks if object models need to be randomized. @@ -1771,19 +1759,18 @@ class ManipulationGazeboEnvRandomizer( True if expired, false otherwise """ - if not self.__object_models_randomizer_enabled(): + if not self.__object_models_randomizer_enabled(obj): return False self.__object_models_rollout_counter += 1 - if self.__object_models_rollout_counter >= self._object_models_rollouts_num: + if self.__object_models_rollout_counter >= obj.randomize.models_rollouts_num: self.__object_models_rollout_counter = 0 return True return False def __consume_parameter_overrides(self, task: SupportedTasks): - for key, value in task._randomizer_parameter_overrides.items(): if hasattr(self, key): setattr(self, key, value) @@ -1808,7 +1795,6 @@ class ManipulationGazeboEnvRandomizer( gazebo: scenario.GazeboSimulator, color: Tuple[float, float, float, float] = (0, 1, 0, 0.8), ): - # Insert a translucent box visible only in simulation with no physical interactions models.Box( world=task.world, @@ -1833,7 +1819,6 @@ class ManipulationGazeboEnvRandomizer( color: Tuple[float, float, float, float] = (0, 0, 1, 0.8), color_with_height: Tuple[float, float, float, float] = (1, 0, 1, 0.7), ): - # Insert translucent boxes visible only in simulation with no physical interactions models.Box( world=task.world, diff --git a/env_manager/rbs_gym/rbs_gym/envs/tasks/reach/pick_and_place_imitate.py b/env_manager/rbs_gym/rbs_gym/envs/tasks/reach/pick_and_place_imitate.py new file mode 100644 index 0000000..dd2d408 --- /dev/null +++ b/env_manager/rbs_gym/rbs_gym/envs/tasks/reach/pick_and_place_imitate.py @@ -0,0 +1,120 @@ +import abc + +import gymnasium as gym +import numpy as np +from gym_gz.utils.typing import Observation, ObservationSpace + +from rbs_gym.envs.models.sensors import Camera +from rbs_gym.envs.observation import CameraSubscriber +from rbs_gym.envs.tasks.reach import Reach + + +class ReachColorImage(Reach, abc.ABC): + def __init__( + self, + camera_info: list[dict] = [], + monochromatic: bool = False, + **kwargs, + ): + # Initialize the Task base class + Reach.__init__( + self, + **kwargs, + ) + + # Store parameters for later use + self._camera_width = sum(w.get("width", 0) for w in camera_info) + self._camera_height = camera_info[0]["height"] + self._monochromatic = monochromatic + self._saved = False + + self.camera_subs: list[CameraSubscriber] = [] + for camera in camera_info: + # Perception (RGB camera) + self.camera_subs.append( + CameraSubscriber( + node=self, + topic=Camera.get_color_topic(camera["name"], camera["type"]), + is_point_cloud=False, + callback_group=self._callback_group, + ) + ) + + def create_observation_space(self) -> ObservationSpace: + """ + Creates the observation space for the Imitation Learning algorithm. + + This method returns a dictionary-based observation space that includes: + + - **image**: A 2D image captured from the robot's camera. The image's shape is determined by the camera's + width and height, and the number of channels (either monochromatic or RGB). + - Dimensions: `(camera_height, camera_width, channels)` where `channels` is 1 for monochromatic images + and 3 for RGB images. + - Pixel values are in the range `[0, 255]` with data type `np.uint8`. + + - **joint_states**: The joint positions of the robot arm, represented as continuous values within the + range `[-1.0, 1.0]`, where `nDoF` refers to the number of degrees of freedom of the robot arm. + + Returns: + gym.spaces.Dict: A dictionary defining the observation space for the learning algorithm, + containing both image and joint_states information. + """ + return gym.spaces.Dict( + { + "image": gym.spaces.Box( + low=0, + high=255, + shape=( + self._camera_height, + self._camera_width, + 1 if self._monochromatic else 3, + ), + dtype=np.uint8, + ), + "joint_states": gym.spaces.Box(low=-1.0, high=1.0), + } + ) + + def get_observation(self) -> Observation: + # Get the latest images + image = [] + + for sub in self.camera_subs: + image.append(sub.get_observation()) + + image_width = sum(i.width for i in image) + image_height = image[0].height + # if image_width == self._camera_width and image_height == self._camera_height: + + # image_data = np.concatenate([i.data for i in image], axis=1) + + assert ( + image_width == self._camera_width and image_height == self._camera_height + ), f"Error: Resolution of the input image does not match the configured observation space. ({image_width}x{image_height} instead of {self._camera_width}x{self._camera_height})" + + # Reshape and create the observation + # color_image = np.array([i.data for i in image], dtype=np.uint8).reshape( + # self._camera_height, self._camera_width, 3 + # ) + color_image = np.concatenate( + [ + np.array(i.data, dtype=np.uint8).reshape(i.height, i.width, 3) + for i in image + ], + axis=1, + ) + + # # Debug save images + # from PIL import Image + # img_color = Image.fromarray(color_image) + # img_color.save("img_color.png") + + if self._monochromatic: + observation = Observation(color_image[:, :, 0]) + else: + observation = Observation(color_image) + + self.get_logger().debug(f"\nobservation: {observation}") + + # Return the observation + return observation diff --git a/env_manager/rbs_gym/rbs_gym/envs/tasks/reach/reach_color_image.py b/env_manager/rbs_gym/rbs_gym/envs/tasks/reach/reach_color_image.py index 4213cbc..2c934d4 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/tasks/reach/reach_color_image.py +++ b/env_manager/rbs_gym/rbs_gym/envs/tasks/reach/reach_color_image.py @@ -12,9 +12,7 @@ from rbs_gym.envs.tasks.reach import Reach class ReachColorImage(Reach, abc.ABC): def __init__( self, - camera_width: int, - camera_height: int, - camera_type: str = "camera", + camera_info: list[dict] = [], monochromatic: bool = False, **kwargs, ): @@ -26,17 +24,20 @@ class ReachColorImage(Reach, abc.ABC): ) # Store parameters for later use - self._camera_width = camera_width - self._camera_height = camera_height + self._camera_width = sum(w.get('width', 0) for w in camera_info) + self._camera_height = camera_info[0]["height"] self._monochromatic = monochromatic + self._saved = False - # Perception (RGB camera) - self.camera_sub = CameraSubscriber( - node=self, - topic=Camera.get_color_topic(camera_type), - is_point_cloud=False, - callback_group=self._callback_group, - ) + self.camera_subs: list[CameraSubscriber] = [] + for camera in camera_info: + # Perception (RGB camera) + self.camera_subs.append(CameraSubscriber( + node=self, + topic=Camera.get_color_topic(camera["name"], camera["type"]), + is_point_cloud=False, + callback_group=self._callback_group, + )) def create_observation_space(self) -> ObservationSpace: @@ -54,17 +55,22 @@ class ReachColorImage(Reach, abc.ABC): ) def get_observation(self) -> Observation: + # Get the latest images + image = [] - # Get the latest image - image = self.camera_sub.get_observation() + for sub in self.camera_subs: + image.append(sub.get_observation()) + + image_width = sum(i.width for i in image) + image_height = image[0].height assert ( - image.width == self._camera_width and image.height == self._camera_height - ), f"Error: Resolution of the input image does not match the configured observation space. ({image.width}x{image.height} instead of {self._camera_width}x{self._camera_height})" + image_width == self._camera_width and image_height == self._camera_height + ), f"Error: Resolution of the input image does not match the configured observation space. ({image_width}x{image_height} instead of {self._camera_width}x{self._camera_height})" - # Reshape and create the observation - color_image = np.array(image.data, dtype=np.uint8).reshape( - self._camera_height, self._camera_width, 3 + color_image = np.concatenate( + [np.array(i.data, dtype=np.uint8).reshape(i.height, i.width, 3) for i in image], + axis=1 ) # # Debug save images diff --git a/env_manager/rbs_gym/rbs_gym/envs/utils/conversions.py b/env_manager/rbs_gym/rbs_gym/envs/utils/conversions.py index 6831f9b..259d708 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/utils/conversions.py +++ b/env_manager/rbs_gym/rbs_gym/envs/utils/conversions.py @@ -164,13 +164,12 @@ def orientation_quat_to_6d( def quat_to_wxyz( - xyzw: Union[numpy.ndarray, Tuple[float, float, float, float]] -) -> numpy.ndarray: + xyzw: tuple[float, float, float, float] +) -> tuple[float, float, float, float]: - if isinstance(xyzw, tuple): - return (xyzw[3], xyzw[0], xyzw[1], xyzw[2]) + return (xyzw[3], xyzw[0], xyzw[1], xyzw[2]) - return xyzw[[3, 0, 1, 2]] + # return xyzw[[3, 0, 1, 2]] def quat_to_xyzw( diff --git a/env_manager/rbs_gym/rbs_gym/envs/utils/gazebo.py b/env_manager/rbs_gym/rbs_gym/envs/utils/gazebo.py index cd0bcb5..8753c61 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/utils/gazebo.py +++ b/env_manager/rbs_gym/rbs_gym/envs/utils/gazebo.py @@ -103,12 +103,12 @@ def get_model_orientation( def transform_move_to_model_pose( world: World, - position: Tuple[float, float, float], - quat: Tuple[float, float, float, float], - target_model: Union[ModelWrapper, str], - target_link: Union[Link, str, None] = None, + position: tuple[float, float, float], + quat: tuple[float, float, float, float], + target_model: ModelWrapper | str, + target_link: Link | str | None = None, xyzw: bool = False, -) -> Tuple[Tuple[float, float, float], Tuple[float, float, float, float]]: +) -> tuple[tuple[float, float, float], tuple[float, float, float, float]]: """ Transform such that original `position` and `quat` are represented with respect to `target_model::target_link`. The resulting pose is still represented in world coordinate system. diff --git a/env_manager/rbs_gym/rbs_gym/envs/worlds/default.sdf b/env_manager/rbs_gym/rbs_gym/envs/worlds/default.sdf index d8850bc..669f038 100644 --- a/env_manager/rbs_gym/rbs_gym/envs/worlds/default.sdf +++ b/env_manager/rbs_gym/rbs_gym/envs/worlds/default.sdf @@ -14,7 +14,7 @@ - 0 0 0 + 1.0 1.0 1.0 false diff --git a/env_manager/rbs_gym/runtime/__init__.py b/env_manager/rbs_gym/runtime/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/env_manager/rbs_gym/scripts/test_agent.py b/env_manager/rbs_gym/scripts/test_agent.py index 93ed550..285a68a 100755 --- a/env_manager/rbs_gym/scripts/test_agent.py +++ b/env_manager/rbs_gym/scripts/test_agent.py @@ -10,10 +10,13 @@ from rbs_gym import envs as gz_envs from rbs_gym.utils.utils import StoreDict, str2bool -def main(args: Dict): +def main(args: argparse.Namespace): # Create the environment - env = gym.make(args.env, **args.env_kwargs) + if args.env_kwargs is not None: + env = gym.make(args.env, **args.env_kwargs) + else: + env = gym.make(args.env) # Initialize random seed env.seed(args.seed) @@ -58,7 +61,7 @@ if __name__ == "__main__": # Environment and its parameters parser.add_argument( - "--env", type=str, default="Reach-Gazebo-v0", help="Environment ID" + "--env", type=str, default="Reach-ColorImage-Gazebo-v0", help="Environment ID" ) parser.add_argument( "--env-kwargs", diff --git a/rbs_bringup/launch/rbs_robot.launch.py b/rbs_bringup/launch/rbs_robot.launch.py index 5eca9b0..a25321a 100644 --- a/rbs_bringup/launch/rbs_robot.launch.py +++ b/rbs_bringup/launch/rbs_robot.launch.py @@ -158,12 +158,9 @@ def launch_setup(context, *args, **kwargs): ]) ]), launch_arguments={ - 'with_gripper': with_gripper_condition, - 'robot_description': robot_description_content, - 'start_joint_controller': start_joint_controller, - 'initial_joint_controller': initial_joint_controller, - 'controllers_file': controllers_file, - 'cartesian_controllers': cartesian_controllers, + 'control_space': "task", + 'control_strategy': "effort", + 'is_gripper': "false", 'namespace': namespace, }.items(), condition=IfCondition(launch_controllers)) @@ -234,6 +231,7 @@ def launch_setup(context, *args, **kwargs): asm_config = Node( package="rbs_utils", + namespace=namespace, executable="assembly_config_service.py" ) diff --git a/rbs_bringup/launch/single_robot.launch.py b/rbs_bringup/launch/single_robot.launch.py index 528a81a..94517da 100644 --- a/rbs_bringup/launch/single_robot.launch.py +++ b/rbs_bringup/launch/single_robot.launch.py @@ -11,7 +11,7 @@ from launch_ros.substitutions import FindPackageShare from launch_ros.actions import Node import os from ament_index_python.packages import get_package_share_directory -from nav2_common.launch import RewrittenYaml +# from nav2_common.launch import RewrittenYaml def launch_setup(context, *args, **kwargs): # Initialize Arguments @@ -85,6 +85,7 @@ def launch_setup(context, *args, **kwargs): gz_spawner = Node( package='ros_gz_sim', executable='create', + # prefix=['gdbserver localhost:1234'], arguments=[ '-name', robot_name.perform(context), '-topic', namespace + '/robot_description'], @@ -103,7 +104,7 @@ def launch_setup(context, *args, **kwargs): "env_manager": env_manager, "with_gripper": with_gripper_condition, "gripper_name": gripper_name, - "controllers_file": controllers_file, + "controllers_file": initial_joint_controllers_file_path, "robot_type": robot_type, # "controllers_file": controller_paramfile, "cartesian_controllers": cartesian_controllers, diff --git a/rbs_bt_executor/bt_trees/test_tree.xml b/rbs_bt_executor/bt_trees/test_tree.xml index 6c63706..889382f 100644 --- a/rbs_bt_executor/bt_trees/test_tree.xml +++ b/rbs_bt_executor/bt_trees/test_tree.xml @@ -2,10 +2,19 @@ - - - + + + + + + + + + + + + + diff --git a/rbs_bt_executor/bt_trees/workspace_movement.xml b/rbs_bt_executor/bt_trees/workspace_movement.xml index 7b5cae7..bc33a91 100644 --- a/rbs_bt_executor/bt_trees/workspace_movement.xml +++ b/rbs_bt_executor/bt_trees/workspace_movement.xml @@ -2,16 +2,32 @@ - + + + + +