rbs_gym multi camera and multi object setup
This commit is contained in:
parent
b4b452297d
commit
1a562b05ca
34 changed files with 1082 additions and 816 deletions
BIN
env_manager/rbs_gym/img_color.png
Normal file
BIN
env_manager/rbs_gym/img_color.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 62 KiB |
|
@ -13,7 +13,7 @@ except:
|
|||
pass
|
||||
|
||||
from os import environ, path
|
||||
from typing import Dict, Tuple
|
||||
from typing import Dict, Tuple, Any
|
||||
|
||||
import numpy as np
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
@ -22,15 +22,13 @@ from gymnasium.envs.registration import register
|
|||
from rbs_gym.utils.utils import str2bool
|
||||
|
||||
from . import tasks
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
######################
|
||||
# Runtime Entrypoint #
|
||||
######################
|
||||
# Entrypoint for tasks (can be simulated or real)
|
||||
|
||||
RBS_ENVS_TASK_ENTRYPOINT: str = (
|
||||
"gym_gz.runtimes.gazebo_runtime:GazeboRuntime"
|
||||
)
|
||||
RBS_ENVS_TASK_ENTRYPOINT: str = "gym_gz.runtimes.gazebo_runtime:GazeboRuntime"
|
||||
|
||||
|
||||
###################
|
||||
|
@ -43,9 +41,7 @@ RBS_ENVS_ROBOT_MODEL: str = "rbs_arm"
|
|||
# Datasets and paths #
|
||||
######################
|
||||
# Path to directory containing base SDF worlds
|
||||
RBS_ENVS_WORLDS_DIR: str = path.join(
|
||||
get_package_share_directory("rbs_gym"), "worlds"
|
||||
)
|
||||
RBS_ENVS_WORLDS_DIR: str = path.join(get_package_share_directory("rbs_gym"), "worlds")
|
||||
|
||||
|
||||
###########
|
||||
|
@ -53,7 +49,7 @@ RBS_ENVS_WORLDS_DIR: str = path.join(
|
|||
###########
|
||||
# Gravity preset for Earth
|
||||
GRAVITY_EARTH: Tuple[float, float, float] = (0.0, 0.0, -9.80665)
|
||||
GRAVITY_EARTH_STD: Tuple[float, float, float] = (0.0, 0.0, 0.0232)
|
||||
GRAVITY_EARTH_STD: Tuple[float, float, float] = (0.0, 0.0, 0.0232)
|
||||
|
||||
############################
|
||||
# Additional Configuration #
|
||||
|
@ -67,7 +63,7 @@ BROADCAST_GUI: bool = str2bool(
|
|||
# Reach #
|
||||
#########
|
||||
REACH_MAX_EPISODE_STEPS: int = 100
|
||||
REACH_KWARGS: Dict[str, any] = {
|
||||
REACH_KWARGS: Dict[str, Any] = {
|
||||
"agent_rate": 4.0,
|
||||
"robot_model": RBS_ENVS_ROBOT_MODEL,
|
||||
"workspace_frame_id": "world",
|
||||
|
@ -85,20 +81,84 @@ REACH_KWARGS: Dict[str, any] = {
|
|||
"required_accuracy": 0.05,
|
||||
"num_threads": 3,
|
||||
}
|
||||
REACH_KWARGS_SIM: Dict[str, any] = {
|
||||
REACH_KWARGS_SIM: Dict[str, Any] = {
|
||||
"physics_rate": 1000.0,
|
||||
"real_time_factor": float(np.finfo(np.float32).max),
|
||||
"world": path.join(RBS_ENVS_WORLDS_DIR, "default.sdf"),
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObjectRandomizerData:
|
||||
count: int = field(default=0)
|
||||
random_pose: bool = field(default=False)
|
||||
random_position: bool = field(default=False)
|
||||
random_orientation: bool = field(default=False)
|
||||
random_model: bool = field(default=False)
|
||||
random_spawn_position_segments: list = field(default_factory=list)
|
||||
random_spawn_position_update_workspace_centre: bool = field(default=False)
|
||||
random_spawn_volume: tuple = field(default=(0.5, 0.5, 0.5))
|
||||
models_rollouts_num: int = field(default=0)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObjectData:
|
||||
name: str = field(default="object")
|
||||
relative_to: str = field(default="world")
|
||||
position: tuple = field(default=(0.0, 0.0, 0.0))
|
||||
orientation: tuple = field(default=(1.0, 0.0, 0.0, 0.0))
|
||||
static: bool = field(default=False)
|
||||
randomize: ObjectRandomizerData = field(default_factory=ObjectRandomizerData)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrimitiveObjectData(ObjectData):
|
||||
collision: bool = field(default=True)
|
||||
visual: bool = field(default=True)
|
||||
color: tuple = field(default=(0.8, 0.8, 0.8, 1.0))
|
||||
mass: float = field(default=0.1)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SphereObjectData(PrimitiveObjectData):
|
||||
radius: float = field(default=0.025)
|
||||
friction: float = field(default=1.0)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlaneObjectData(PrimitiveObjectData):
|
||||
size: tuple = field(default=(1.0, 1.0))
|
||||
direction: tuple = field(default=(0.0, 0.0, 1.0))
|
||||
friction: float = field(default=1.0)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CylinderObjectData(PrimitiveObjectData):
|
||||
radius: float = field(default=0.025)
|
||||
length: float = field(default=0.1)
|
||||
friction: float = field(default=1.0)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BoxObjectData(PrimitiveObjectData):
|
||||
size: tuple = field(default=(0.05, 0.05, 0.05))
|
||||
friction: float = field(default=1.0)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MeshObjectData(ObjectData):
|
||||
texture: list[float] = field(default_factory=list)
|
||||
|
||||
|
||||
REACH_RANDOMIZER: str = "rbs_gym.envs.randomizers:ManipulationGazeboEnvRandomizer"
|
||||
REACH_KWARGS_RANDOMIZER: Dict[str, any] = {
|
||||
REACH_KWARGS_RANDOMIZER: Dict[str, Any] = {
|
||||
"gravity": GRAVITY_EARTH,
|
||||
"gravity_std": GRAVITY_EARTH_STD,
|
||||
"plugin_scene_broadcaster": BROADCAST_GUI,
|
||||
"plugin_user_commands": BROADCAST_GUI,
|
||||
"plugin_sensors_render_engine": "ogre2",
|
||||
"robot_random_pose": False,
|
||||
"robot_random_joint_positions": True, # FIXME:
|
||||
"robot_random_joint_positions": True, # FIXME:
|
||||
"robot_random_joint_positions_std": 0.2,
|
||||
"robot_random_joint_positions_above_object_spawn": False,
|
||||
"robot_random_joint_positions_above_object_spawn_elevation": 0.0,
|
||||
|
@ -111,42 +171,136 @@ REACH_KWARGS_RANDOMIZER: Dict[str, any] = {
|
|||
"light_visual": False,
|
||||
"light_radius": 25.0,
|
||||
"light_model_rollouts_num": 1,
|
||||
"object_enable": True,
|
||||
"object_type": "sphere",
|
||||
"objects_relative_to": "base_link",
|
||||
"object_static": True,
|
||||
"object_collision": False,
|
||||
"object_visual": True,
|
||||
"object_color": (0.0, 0.0, 1.0, 1.0),
|
||||
"object_dimensions": [0.025, 0.025, 0.025],
|
||||
"object_count": 1,
|
||||
"object_spawn_position": (-0.4, 0, 0.3),
|
||||
"object_random_pose": True,
|
||||
"object_random_spawn_position_segments": [],
|
||||
"object_random_spawn_volume": (0.2, 0.2, 0.2),
|
||||
"object_models_rollouts_num": 0,
|
||||
"underworld_collision_plane": False,
|
||||
}
|
||||
REACH_KWARGS_RANDOMIZER_CAMERA: Dict[str, any] = {
|
||||
"camera_enable": True,
|
||||
"camera_width": 64,
|
||||
"camera_height": 64,
|
||||
"camera_update_rate": 1.2 * REACH_KWARGS["agent_rate"],
|
||||
"camera_horizontal_fov": np.pi / 3.0,
|
||||
"camera_vertical_fov": np.pi / 3.0,
|
||||
"camera_noise_mean": 0.0,
|
||||
"camera_noise_stddev": 0.001,
|
||||
"camera_relative_to": "base_link",
|
||||
"camera_spawn_position": (0.85, -0.4, 0.45),
|
||||
"camera_spawn_quat_xyzw": (-0.0402991, -0.0166924, 0.9230002, 0.3823192),
|
||||
"camera_random_pose_rollouts_num": 0,
|
||||
"camera_random_pose_mode": "orbit",
|
||||
"camera_random_pose_orbit_distance": 1.0,
|
||||
"camera_random_pose_orbit_height_range": (0.1, 0.7),
|
||||
"camera_random_pose_orbit_ignore_arc_behind_robot": np.pi / 8,
|
||||
"camera_random_pose_select_position_options": [],
|
||||
"camera_random_pose_focal_point_z_offset": 0.0,
|
||||
}
|
||||
REACH_KWARGS_OBJECT_RANDOMIZER: list = [
|
||||
MeshObjectData(
|
||||
"bishop",
|
||||
"base_link",
|
||||
(-0.3, 0.0, 0.0),
|
||||
randomize=ObjectRandomizerData(
|
||||
random_position=True, random_spawn_volume=(0.2, 0.5, 0.0)
|
||||
),
|
||||
),
|
||||
SphereObjectData(
|
||||
"sphere",
|
||||
"base_link",
|
||||
(-0.3, 0.0, 0.3),
|
||||
mass=1.0,
|
||||
color=(0.0, 0.0, 1.0, 1.0),
|
||||
# randomize=ObjectRandomizerData(
|
||||
# random_pose=True, random_spawn_volume=(0.2, 0.2, 0.0)
|
||||
# ),
|
||||
),
|
||||
]
|
||||
# REACH_KWARGS_RANDOMIZER_CAMERA: Dict[str, Any] = {
|
||||
# "camera_enable": True,
|
||||
# "camera_width": 64,
|
||||
# "camera_height": 64,
|
||||
# "camera_update_rate": 1.2 * REACH_KWARGS["agent_rate"],
|
||||
# "camera_horizontal_fov": np.pi / 3.0,
|
||||
# "camera_vertical_fov": np.pi / 3.0,
|
||||
# "camera_noise_mean": 0.0,
|
||||
# "camera_noise_stddev": 0.001,
|
||||
# "camera_relative_to": "base_link",
|
||||
# "camera_spawn_position": (0.85, -0.4, 0.45),
|
||||
# "camera_spawn_quat_xyzw": (-0.0402991, -0.0166924, 0.9230002, 0.3823192),
|
||||
# "camera_random_pose_rollouts_num": 0,
|
||||
# "camera_random_pose_mode": "orbit",
|
||||
# "camera_random_pose_orbit_distance": 1.0,
|
||||
# "camera_random_pose_orbit_height_range": (0.1, 0.7),
|
||||
# "camera_random_pose_orbit_ignore_arc_behind_robot": np.pi / 8,
|
||||
# "camera_random_pose_select_position_options": [],
|
||||
# "camera_random_pose_focal_point_z_offset": 0.0,
|
||||
# }
|
||||
|
||||
|
||||
@dataclass
|
||||
class CameraData:
|
||||
name: str | None = None
|
||||
enable: bool = field(default=True)
|
||||
type: str = field(default="rgbd_camera")
|
||||
relative_to: str = field(default="base_link")
|
||||
|
||||
width: int = field(default=128)
|
||||
height: int = field(default=128)
|
||||
image_format: str = field(default="R8G8B8")
|
||||
update_rate: int = field(default=10)
|
||||
horizontal_fov: float = field(default=np.pi / 3.0)
|
||||
vertical_fov: float = field(default=np.pi / 3.0)
|
||||
|
||||
clip_color: tuple[float, float] = field(default=(0.01, 1000.0))
|
||||
clip_depth: tuple[float, float] = field(default=(0.05, 10.0))
|
||||
|
||||
noise_mean: float | None = None
|
||||
noise_stddev: float | None = None
|
||||
|
||||
publish_color: bool = field(default=False)
|
||||
publish_depth: bool = field(default=False)
|
||||
publish_points: bool = field(default=False)
|
||||
|
||||
spawn_position: tuple[float, float, float] = field(default=(0, 0, 1))
|
||||
spawn_quat_xyzw: tuple[float, float, float, float] = field(
|
||||
default=(0, 0.70710678118, 0, 0.70710678118)
|
||||
)
|
||||
|
||||
random_pose_rollouts_num: int = field(default=1)
|
||||
random_pose_mode: str = field(default="orbit")
|
||||
random_pose_orbit_distance: float = field(default=1.0)
|
||||
random_pose_orbit_height_range: tuple[float, float] = field(default=(0.1, 0.7))
|
||||
random_pose_orbit_ignore_arc_behind_robot: float = field(default=np.pi / 8)
|
||||
random_pose_select_position_options: list[tuple[float, float, float]] = field(
|
||||
default_factory=list
|
||||
)
|
||||
random_pose_focal_point_z_offset: float = field(default=0.0)
|
||||
|
||||
|
||||
REACH_KWARGS_RANDOMIZER_CAMERAS: list[CameraData] = [
|
||||
CameraData(
|
||||
name="inner_robot_camera",
|
||||
enable=True,
|
||||
type="rgbd_camera",
|
||||
relative_to="rbs_gripper_rot_base_link",
|
||||
width=320,
|
||||
height=240,
|
||||
update_rate=1.2 * REACH_KWARGS["agent_rate"],
|
||||
horizontal_fov=np.pi / 3.0,
|
||||
vertical_fov=np.pi / 3.0,
|
||||
noise_mean=0.0,
|
||||
publish_color=True,
|
||||
noise_stddev=0.001,
|
||||
spawn_position=(-0.02, 0.0, 0.03),
|
||||
spawn_quat_xyzw=(0.0, -0.707, 0.0, 0.707),
|
||||
random_pose_mode="orbit",
|
||||
random_pose_rollouts_num=0,
|
||||
random_pose_orbit_distance=1.0,
|
||||
random_pose_orbit_height_range=(0.1, 0.7),
|
||||
random_pose_focal_point_z_offset=0.0,
|
||||
random_pose_orbit_ignore_arc_behind_robot=np.pi / 8,
|
||||
),
|
||||
CameraData(
|
||||
name="outer_robot_camera",
|
||||
enable=True,
|
||||
type="rgbd_camera",
|
||||
relative_to="base_link",
|
||||
width=320,
|
||||
height=240,
|
||||
update_rate=1.2 * REACH_KWARGS["agent_rate"],
|
||||
publish_color=True,
|
||||
horizontal_fov=np.pi / 3.0,
|
||||
vertical_fov=np.pi / 3.0,
|
||||
noise_mean=0.0,
|
||||
noise_stddev=0.001,
|
||||
spawn_position=(-0.2, 1.0, 1.0),
|
||||
spawn_quat_xyzw=(0.183, 0.183, -0.683, 0.683),
|
||||
random_pose_mode="orbit",
|
||||
random_pose_rollouts_num=0,
|
||||
random_pose_orbit_distance=1.0,
|
||||
random_pose_orbit_height_range=(0.1, 0.7),
|
||||
random_pose_focal_point_z_offset=0.0,
|
||||
random_pose_orbit_ignore_arc_behind_robot=np.pi / 8,
|
||||
),
|
||||
]
|
||||
|
||||
# Task
|
||||
register(
|
||||
|
@ -186,7 +340,7 @@ register(
|
|||
"env": "Reach-v0",
|
||||
**REACH_KWARGS_SIM,
|
||||
**REACH_KWARGS_RANDOMIZER,
|
||||
"camera_enable": False,
|
||||
"object_args": REACH_KWARGS_OBJECT_RANDOMIZER,
|
||||
},
|
||||
)
|
||||
register(
|
||||
|
@ -197,9 +351,8 @@ register(
|
|||
"env": "Reach-ColorImage-v0",
|
||||
**REACH_KWARGS_SIM,
|
||||
**REACH_KWARGS_RANDOMIZER,
|
||||
**REACH_KWARGS_RANDOMIZER_CAMERA,
|
||||
"camera_type": "rgbd_camera",
|
||||
"camera_publish_color": True,
|
||||
"camera_args": REACH_KWARGS_RANDOMIZER_CAMERAS,
|
||||
"object_args": REACH_KWARGS_OBJECT_RANDOMIZER,
|
||||
},
|
||||
)
|
||||
register(
|
||||
|
@ -210,8 +363,7 @@ register(
|
|||
"env": "Reach-DepthImage-v0",
|
||||
**REACH_KWARGS_SIM,
|
||||
**REACH_KWARGS_RANDOMIZER,
|
||||
**REACH_KWARGS_RANDOMIZER_CAMERA,
|
||||
"camera_type": "depth_camera",
|
||||
"camera_publish_depth": True,
|
||||
"camera_args": REACH_KWARGS_RANDOMIZER_CAMERAS,
|
||||
"object_args": REACH_KWARGS_OBJECT_RANDOMIZER,
|
||||
},
|
||||
)
|
||||
|
|
|
@ -1,35 +1,51 @@
|
|||
from enum import Enum
|
||||
|
||||
from gym_gz.scenario.model_wrapper import ModelWrapper
|
||||
|
||||
from .model import Mesh
|
||||
from .primitives import Box, Cylinder, Plane, Sphere
|
||||
from .random_lunar_rock import RandomLunarRock
|
||||
from .random_object import RandomObject
|
||||
from .random_primitive import RandomPrimitive
|
||||
from .rock import Rock
|
||||
|
||||
|
||||
def get_object_model_class(object_type: str) -> ModelWrapper:
|
||||
# TODO: Refactor into enum
|
||||
# Enum для типов объектов
|
||||
class ObjectType(Enum):
|
||||
BOX = "box"
|
||||
SPHERE = "sphere"
|
||||
CYLINDER = "cylinder"
|
||||
PLANE = "plane"
|
||||
RANDOM_PRIMITIVE = "random_primitive"
|
||||
RANDOM_MESH = "random_mesh"
|
||||
MESH = "mesh"
|
||||
|
||||
if "box" == object_type:
|
||||
return Box
|
||||
elif "sphere" == object_type:
|
||||
return Sphere
|
||||
elif "cylinder" == object_type:
|
||||
return Cylinder
|
||||
elif "random_primitive" == object_type:
|
||||
return RandomPrimitive
|
||||
elif "random_mesh" == object_type:
|
||||
return RandomObject
|
||||
elif "rock" == object_type:
|
||||
return Rock
|
||||
elif "random_lunar_rock" == object_type:
|
||||
return RandomLunarRock
|
||||
OBJECT_MODEL_CLASSES = {
|
||||
ObjectType.BOX: Box,
|
||||
ObjectType.SPHERE: Sphere,
|
||||
ObjectType.CYLINDER: Cylinder,
|
||||
ObjectType.PLANE: Plane,
|
||||
ObjectType.RANDOM_PRIMITIVE: RandomPrimitive,
|
||||
ObjectType.RANDOM_MESH: RandomObject,
|
||||
ObjectType.MESH: Mesh,
|
||||
}
|
||||
|
||||
|
||||
RANDOMIZABLE_TYPES = {
|
||||
ObjectType.RANDOM_PRIMITIVE,
|
||||
ObjectType.RANDOM_MESH,
|
||||
}
|
||||
|
||||
|
||||
def get_object_model_class(object_type: str) -> type[ModelWrapper]:
|
||||
try:
|
||||
object_enum = ObjectType(object_type)
|
||||
return OBJECT_MODEL_CLASSES[object_enum]
|
||||
except KeyError:
|
||||
raise ValueError(f"Unsupported object type: {object_type}")
|
||||
|
||||
|
||||
def is_object_type_randomizable(object_type: str) -> bool:
|
||||
|
||||
return (
|
||||
"random_primitive" == object_type
|
||||
or "random_mesh" == object_type
|
||||
or "random_lunar_rock" == object_type
|
||||
)
|
||||
try:
|
||||
object_enum = ObjectType(object_type)
|
||||
return object_enum in RANDOMIZABLE_TYPES
|
||||
except ValueError:
|
||||
return False
|
||||
|
|
51
env_manager/rbs_gym/rbs_gym/envs/models/objects/model.py
Normal file
51
env_manager/rbs_gym/rbs_gym/envs/models/objects/model.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
from typing import List
|
||||
|
||||
from gym_gz.scenario import model_wrapper
|
||||
from gym_gz.utils import misc
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
from scenario import core as scenario_core
|
||||
from scenario import gazebo as scenario_gazebo
|
||||
from rbs_assets_library import get_model_file
|
||||
|
||||
|
||||
class Mesh(model_wrapper.ModelWrapper):
|
||||
def __init__(
|
||||
self,
|
||||
world: scenario_gazebo.World,
|
||||
name: str = "object",
|
||||
position: tuple[float, float, float] = (0, 0, 0),
|
||||
orientation: tuple[float, float, float, float] = (1, 0, 0, 0),
|
||||
gui_only: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
# Get a unique model name
|
||||
model_name = get_unique_model_name(world, name)
|
||||
|
||||
# Initial pose
|
||||
initial_pose = scenario_core.Pose(position, orientation)
|
||||
|
||||
# Create SDF string for the model
|
||||
sdf = self.get_sdf(
|
||||
model_name=name,
|
||||
)
|
||||
|
||||
# Insert the model
|
||||
ok_model = world.to_gazebo().insert_model(
|
||||
sdf, initial_pose, model_name
|
||||
)
|
||||
if not ok_model:
|
||||
raise RuntimeError("Failed to insert " + model_name)
|
||||
|
||||
# Get the model
|
||||
model = world.get_model(model_name)
|
||||
|
||||
# Initialize base class
|
||||
super().__init__(model=model)
|
||||
|
||||
@classmethod
|
||||
def get_sdf(
|
||||
cls,
|
||||
model_name: str,
|
||||
) -> str:
|
||||
return get_model_file(model_name)
|
|
@ -1,26 +1,24 @@
|
|||
from typing import List
|
||||
|
||||
from gym_gz.scenario import model_wrapper
|
||||
from gym_gz.utils import misc
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
from scenario import core as scenario
|
||||
from scenario import core as scenario_core
|
||||
from scenario import gazebo as scenario_gazebo
|
||||
|
||||
|
||||
class Box(model_wrapper.ModelWrapper):
|
||||
def __init__(
|
||||
self,
|
||||
world: scenario.World,
|
||||
world: scenario_gazebo.World,
|
||||
name: str = "box",
|
||||
position: List[float] = (0, 0, 0),
|
||||
orientation: List[float] = (1, 0, 0, 0),
|
||||
size: List[float] = (0.05, 0.05, 0.05),
|
||||
position: tuple[float, float, float] = (0, 0, 0),
|
||||
orientation: tuple[float, float, float, float] = (1, 0, 0, 0),
|
||||
size: tuple[float, float, float] = (0.05, 0.05, 0.05),
|
||||
mass: float = 0.1,
|
||||
static: bool = False,
|
||||
collision: bool = True,
|
||||
friction: float = 1.0,
|
||||
visual: bool = True,
|
||||
gui_only: bool = False,
|
||||
color: List[float] = (0.8, 0.8, 0.8, 1.0),
|
||||
color: tuple[float, float, float, float] = (0.8, 0.8, 0.8, 1.0),
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
|
@ -28,7 +26,7 @@ class Box(model_wrapper.ModelWrapper):
|
|||
model_name = get_unique_model_name(world, name)
|
||||
|
||||
# Initial pose
|
||||
initial_pose = scenario.Pose(position, orientation)
|
||||
initial_pose = scenario_core.Pose(position, orientation)
|
||||
|
||||
# Create SDF string for the model
|
||||
sdf = self.get_sdf(
|
||||
|
@ -48,26 +46,26 @@ class Box(model_wrapper.ModelWrapper):
|
|||
sdf, initial_pose, model_name
|
||||
)
|
||||
if not ok_model:
|
||||
raise RuntimeError("Failed to insert " + model_name)
|
||||
raise RuntimeError(f"Failed to insert {model_name}")
|
||||
|
||||
# Get the model
|
||||
model = world.get_model(model_name)
|
||||
|
||||
# Initialize base class
|
||||
model_wrapper.ModelWrapper.__init__(self, model=model)
|
||||
super().__init__(model=model)
|
||||
|
||||
@classmethod
|
||||
def get_sdf(
|
||||
self,
|
||||
cls,
|
||||
model_name: str,
|
||||
size: List[float],
|
||||
size: tuple[float, float, float],
|
||||
mass: float,
|
||||
static: bool,
|
||||
collision: bool,
|
||||
friction: float,
|
||||
visual: bool,
|
||||
gui_only: bool,
|
||||
color: List[float],
|
||||
color: tuple[float, float, float, float],
|
||||
) -> str:
|
||||
return f'''<sdf version="1.7">
|
||||
<model name="{model_name}">
|
||||
|
@ -108,7 +106,7 @@ class Box(model_wrapper.ModelWrapper):
|
|||
<diffuse>{color[0]} {color[1]} {color[2]} {color[3]}</diffuse>
|
||||
<specular>{color[0]} {color[1]} {color[2]} {color[3]}</specular>
|
||||
</material>
|
||||
<transparency>{1.0-color[3]}</transparency>
|
||||
<transparency>{1.0 - color[3]}</transparency>
|
||||
{'<visibility_flags>1</visibility_flags> <cast_shadows>false</cast_shadows>' if gui_only else ''}
|
||||
</visual>
|
||||
""" if visual else ""
|
||||
|
@ -116,9 +114,9 @@ class Box(model_wrapper.ModelWrapper):
|
|||
<inertial>
|
||||
<mass>{mass}</mass>
|
||||
<inertia>
|
||||
<ixx>{(size[1]**2 + size[2]**2)*mass/12}</ixx>
|
||||
<iyy>{(size[0]**2 + size[2]**2)*mass/12}</iyy>
|
||||
<izz>{(size[0]**2 + size[1]**2)*mass/12}</izz>
|
||||
<ixx>{(size[1]**2 + size[2]**2) * mass / 12}</ixx>
|
||||
<iyy>{(size[0]**2 + size[2]**2) * mass / 12}</iyy>
|
||||
<izz>{(size[0]**2 + size[1]**2) * mass / 12}</izz>
|
||||
<ixy>0.0</ixy>
|
||||
<ixz>0.0</ixz>
|
||||
<iyz>0.0</iyz>
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
from typing import List
|
||||
|
||||
from gym_gz.scenario import model_wrapper
|
||||
from gym_gz.utils import misc
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
from scenario import core as scenario
|
||||
from scenario import core as scenario_core
|
||||
from scenario import gazebo as scenario_gazebo
|
||||
|
||||
|
||||
class Cylinder(model_wrapper.ModelWrapper):
|
||||
def __init__(
|
||||
self,
|
||||
world: scenario.World,
|
||||
world: scenario_gazebo.World,
|
||||
name: str = "cylinder",
|
||||
position: List[float] = (0, 0, 0),
|
||||
orientation: List[float] = (1, 0, 0, 0),
|
||||
position: tuple[float, float, float] = (0, 0, 0),
|
||||
orientation: tuple[float, float, float, float] = (1, 0, 0, 0),
|
||||
radius: float = 0.025,
|
||||
length: float = 0.1,
|
||||
mass: float = 0.1,
|
||||
|
@ -21,17 +19,17 @@ class Cylinder(model_wrapper.ModelWrapper):
|
|||
friction: float = 1.0,
|
||||
visual: bool = True,
|
||||
gui_only: bool = False,
|
||||
color: List[float] = (0.8, 0.8, 0.8, 1.0),
|
||||
color: tuple[float, float, float, float] = (0.8, 0.8, 0.8, 1.0),
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
# Get a unique model name
|
||||
# Получаем уникальное имя модели
|
||||
model_name = get_unique_model_name(world, name)
|
||||
|
||||
# Initial pose
|
||||
initial_pose = scenario.Pose(position, orientation)
|
||||
# Инициализируем начальную позу
|
||||
initial_pose = scenario_core.Pose(position, orientation)
|
||||
|
||||
# Create SDF string for the model
|
||||
# Создаем строку SDF для модели
|
||||
sdf = self.get_sdf(
|
||||
model_name=model_name,
|
||||
radius=radius,
|
||||
|
@ -45,22 +43,22 @@ class Cylinder(model_wrapper.ModelWrapper):
|
|||
color=color,
|
||||
)
|
||||
|
||||
# Insert the model
|
||||
# Вставляем модель
|
||||
ok_model = world.to_gazebo().insert_model_from_string(
|
||||
sdf, initial_pose, model_name
|
||||
)
|
||||
if not ok_model:
|
||||
raise RuntimeError("Failed to insert " + model_name)
|
||||
raise RuntimeError(f"Failed to insert {model_name}")
|
||||
|
||||
# Get the model
|
||||
# Получаем модель
|
||||
model = world.get_model(model_name)
|
||||
|
||||
# Initialize base class
|
||||
model_wrapper.ModelWrapper.__init__(self, model=model)
|
||||
# Вызов конструктора родительского класса
|
||||
super().__init__(model=model)
|
||||
|
||||
@classmethod
|
||||
def get_sdf(
|
||||
self,
|
||||
cls,
|
||||
model_name: str,
|
||||
radius: float,
|
||||
length: float,
|
||||
|
@ -70,9 +68,9 @@ class Cylinder(model_wrapper.ModelWrapper):
|
|||
friction: float,
|
||||
visual: bool,
|
||||
gui_only: bool,
|
||||
color: List[float],
|
||||
color: tuple[float, float, float, float],
|
||||
) -> str:
|
||||
# Inertia is identical for xx and yy components, compute only once
|
||||
# Инерция одинакова для xx и yy, вычисляем ее один раз
|
||||
inertia_xx_yy = (3 * radius**2 + length**2) * mass / 12
|
||||
|
||||
return f'''<sdf version="1.7">
|
||||
|
|
|
@ -1,33 +1,30 @@
|
|||
from typing import List
|
||||
|
||||
from gym_gz.scenario import model_wrapper
|
||||
from gym_gz.utils import misc
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
from scenario import core as scenario
|
||||
from scenario import core as scenario_core
|
||||
from scenario import gazebo as scenario_gazebo
|
||||
|
||||
|
||||
class Plane(model_wrapper.ModelWrapper):
|
||||
def __init__(
|
||||
self,
|
||||
world: scenario.World,
|
||||
world: scenario_gazebo.World,
|
||||
name: str = "plane",
|
||||
position: List[float] = (0, 0, 0),
|
||||
orientation: List[float] = (1, 0, 0, 0),
|
||||
size: List[float] = (1.0, 1.0),
|
||||
direction: List[float] = (0.0, 0.0, 1.0),
|
||||
position: tuple[float, float, float] = (0, 0, 0),
|
||||
orientation: tuple[float, float, float, float] = (1, 0, 0, 0),
|
||||
size: tuple[float, float] = (1.0, 1.0),
|
||||
direction: tuple[float, float, float] = (0.0, 0.0, 1.0),
|
||||
collision: bool = True,
|
||||
friction: float = 1.0,
|
||||
visual: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
# Get a unique model name
|
||||
# Получаем уникальное имя модели
|
||||
model_name = get_unique_model_name(world, name)
|
||||
|
||||
# Initial pose
|
||||
initial_pose = scenario.Pose(position, orientation)
|
||||
# Инициализируем начальную позу
|
||||
initial_pose = scenario_core.Pose(position, orientation)
|
||||
|
||||
# Create SDF string for the model
|
||||
# Генерация строки SDF для модели
|
||||
sdf = f'''<sdf version="1.7">
|
||||
<model name="{model_name}">
|
||||
<static>true</static>
|
||||
|
@ -76,15 +73,15 @@ class Plane(model_wrapper.ModelWrapper):
|
|||
</model>
|
||||
</sdf>'''
|
||||
|
||||
# Insert the model
|
||||
# Вставка модели
|
||||
ok_model = world.to_gazebo().insert_model_from_string(
|
||||
sdf, initial_pose, model_name
|
||||
)
|
||||
if not ok_model:
|
||||
raise RuntimeError("Failed to insert " + model_name)
|
||||
raise RuntimeError(f"Failed to insert {model_name}")
|
||||
|
||||
# Get the model
|
||||
# Получение модели
|
||||
model = world.get_model(model_name)
|
||||
|
||||
# Initialize base class
|
||||
model_wrapper.ModelWrapper.__init__(self, model=model)
|
||||
# Вызов конструктора родительского класса
|
||||
super().__init__(model=model)
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
from typing import List
|
||||
|
||||
from gym_gz.scenario import model_wrapper
|
||||
from gym_gz.utils import misc
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
from scenario import core as scenario
|
||||
from scenario import core as scenario_core
|
||||
from scenario import gazebo as scenario_gazebo
|
||||
|
||||
|
||||
class Sphere(model_wrapper.ModelWrapper):
|
||||
def __init__(
|
||||
self,
|
||||
world: scenario.World,
|
||||
world: scenario_gazebo.World,
|
||||
name: str = "sphere",
|
||||
position: List[float] = (0, 0, 0),
|
||||
orientation: List[float] = (1, 0, 0, 0),
|
||||
position: tuple[float, float, float] = (0, 0, 0),
|
||||
orientation: tuple[float, float, float, float] = (1, 0, 0, 0),
|
||||
radius: float = 0.025,
|
||||
mass: float = 0.1,
|
||||
static: bool = False,
|
||||
|
@ -20,7 +18,7 @@ class Sphere(model_wrapper.ModelWrapper):
|
|||
friction: float = 1.0,
|
||||
visual: bool = True,
|
||||
gui_only: bool = False,
|
||||
color: List[float] = (0.8, 0.8, 0.8, 1.0),
|
||||
color: tuple[float, float, float, float] = (0.8, 0.8, 0.8, 1.0),
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
|
@ -28,7 +26,7 @@ class Sphere(model_wrapper.ModelWrapper):
|
|||
model_name = get_unique_model_name(world, name)
|
||||
|
||||
# Initial pose
|
||||
initial_pose = scenario.Pose(position, orientation)
|
||||
initial_pose = scenario_core.Pose(position, orientation)
|
||||
|
||||
# Create SDF string for the model
|
||||
sdf = self.get_sdf(
|
||||
|
@ -48,17 +46,17 @@ class Sphere(model_wrapper.ModelWrapper):
|
|||
sdf, initial_pose, model_name
|
||||
)
|
||||
if not ok_model:
|
||||
raise RuntimeError("Failed to insert " + model_name)
|
||||
raise RuntimeError(f"Failed to insert {model_name}")
|
||||
|
||||
# Get the model
|
||||
model = world.get_model(model_name)
|
||||
|
||||
# Initialize base class
|
||||
model_wrapper.ModelWrapper.__init__(self, model=model)
|
||||
super().__init__(model=model)
|
||||
|
||||
@classmethod
|
||||
def get_sdf(
|
||||
self,
|
||||
cls,
|
||||
model_name: str,
|
||||
radius: float,
|
||||
mass: float,
|
||||
|
@ -67,7 +65,7 @@ class Sphere(model_wrapper.ModelWrapper):
|
|||
friction: float,
|
||||
visual: bool,
|
||||
gui_only: bool,
|
||||
color: List[float],
|
||||
color: tuple[float, float, float, float],
|
||||
) -> str:
|
||||
# Inertia is identical for all axes
|
||||
inertia_xx_yy_zz = (mass * radius**2) * 2 / 5
|
||||
|
@ -111,7 +109,7 @@ class Sphere(model_wrapper.ModelWrapper):
|
|||
<diffuse>{color[0]} {color[1]} {color[2]} {color[3]}</diffuse>
|
||||
<specular>{color[0]} {color[1]} {color[2]} {color[3]}</specular>
|
||||
</material>
|
||||
<transparency>{1.0-color[3]}</transparency>
|
||||
<transparency>{1.0 - color[3]}</transparency>
|
||||
{'<visibility_flags>1</visibility_flags> <cast_shadows>false</cast_shadows>' if gui_only else ''}
|
||||
</visual>
|
||||
""" if visual else ""
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
import os
|
||||
from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
from gym_gz.scenario import model_wrapper
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
from numpy.random import RandomState
|
||||
from scenario import core as scenario
|
||||
|
||||
|
||||
class RandomLunarRock(model_wrapper.ModelWrapper):
|
||||
def __init__(
|
||||
self,
|
||||
world: scenario.World,
|
||||
name: str = "rock",
|
||||
position: List[float] = (0, 0, 0),
|
||||
orientation: List[float] = (1, 0, 0, 0),
|
||||
models_dir: Optional[str] = None,
|
||||
np_random: Optional[RandomState] = None,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
if np_random is None:
|
||||
np_random = np.random.default_rng()
|
||||
|
||||
# Get a unique model name
|
||||
model_name = get_unique_model_name(world, name)
|
||||
|
||||
# Initial pose
|
||||
initial_pose = scenario.Pose(position, orientation)
|
||||
|
||||
# Get path to all lunar rock models
|
||||
if not models_dir:
|
||||
models_dir = os.environ.get("SDF_PATH_LUNAR_ROCK", default="")
|
||||
|
||||
# Make sure the path exists
|
||||
if not os.path.exists(models_dir):
|
||||
raise ValueError(
|
||||
f"Invalid path '{models_dir}' pointed by 'SDF_PATH_LUNAR_ROCK' environment variable."
|
||||
)
|
||||
|
||||
# Select a single model at random
|
||||
model_dir = np_random.choice(os.listdir(models_dir))
|
||||
sdf_filepath = os.path.join(model_dir, "model.sdf")
|
||||
|
||||
# Insert the model
|
||||
ok_model = world.to_gazebo().insert_model_from_file(
|
||||
sdf_filepath, initial_pose, model_name
|
||||
)
|
||||
if not ok_model:
|
||||
raise RuntimeError("Failed to insert " + model_name)
|
||||
|
||||
# Get the model
|
||||
model = world.get_model(model_name)
|
||||
|
||||
# Initialize base class
|
||||
model_wrapper.ModelWrapper.__init__(self, model=model)
|
|
@ -1,52 +0,0 @@
|
|||
from typing import List
|
||||
|
||||
from gym_gz.scenario import model_with_file, model_wrapper
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
from scenario import core as scenario
|
||||
from scenario import gazebo as scenario_gazebo
|
||||
|
||||
|
||||
class Rock(model_wrapper.ModelWrapper, model_with_file.ModelWithFile):
|
||||
def __init__(
|
||||
self,
|
||||
world: scenario.World,
|
||||
name: str = "rock",
|
||||
position: List[float] = (0, 0, 0),
|
||||
orientation: List[float] = (1, 0, 0, 0),
|
||||
model_file: str = None,
|
||||
use_fuel: bool = True,
|
||||
variant: int = 6,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
# Allow passing of custom model file as an argument
|
||||
if model_file is None:
|
||||
model_file = self.get_model_file(fuel=use_fuel, variant=variant)
|
||||
|
||||
# Get a unique model name
|
||||
model_name = get_unique_model_name(world, name)
|
||||
|
||||
# Setup initial pose
|
||||
initial_pose = scenario.Pose(position, orientation)
|
||||
|
||||
# Insert the model
|
||||
ok_model = world.to_gazebo().insert_model_from_file(
|
||||
model_file, initial_pose, model_name
|
||||
)
|
||||
if not ok_model:
|
||||
raise RuntimeError("Failed to insert " + model_name)
|
||||
|
||||
# Get the model
|
||||
model = world.get_model(model_name)
|
||||
|
||||
# Initialize base class
|
||||
super().__init__(model=model)
|
||||
|
||||
@classmethod
|
||||
def get_model_file(self, fuel: bool = False, variant: int = 6) -> str:
|
||||
if fuel:
|
||||
return scenario_gazebo.get_model_file_from_fuel(
|
||||
f"https://fuel.ignitionrobotics.org/1.0/OpenRobotics/models/Falling Rock {variant}"
|
||||
)
|
||||
else:
|
||||
return "lunar_surface"
|
|
@ -6,10 +6,12 @@ from .rbs_arm import RbsArm
|
|||
# TODO: When adding new a robot, create abstract classes to simplify such process
|
||||
|
||||
|
||||
def get_robot_model_class(robot_model: str) -> ModelWrapper:
|
||||
def get_robot_model_class(robot_model: str) -> type[RbsArm]:
|
||||
# TODO: Refactor into enum
|
||||
|
||||
if "rbs_arm" == robot_model:
|
||||
return RbsArm
|
||||
else:
|
||||
return RbsArm
|
||||
# elif "panda" == robot_model:
|
||||
# return Panda
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
|
||||
from gym_gz.scenario import model_with_file, model_wrapper
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
|
@ -8,6 +8,10 @@ import numpy as np
|
|||
from scenario import core as scenario
|
||||
from scenario import gazebo as scenario_gazebo
|
||||
|
||||
# from robot_builder.elements.robot import Robot
|
||||
# from robot_builder.writer.urdf import URDF_writer
|
||||
# from robot_builder.parser.urdf import URDF_parser
|
||||
|
||||
class RbsArm(model_wrapper.ModelWrapper, model_with_file.ModelWithFile):
|
||||
|
||||
DEFAULT_ARM_JOINT_POSITIONS: List[float] = (
|
||||
|
@ -34,13 +38,13 @@ class RbsArm(model_wrapper.ModelWrapper, model_with_file.ModelWithFile):
|
|||
self,
|
||||
world: scenario.World,
|
||||
name: str = "rbs_arm",
|
||||
position: List[float] = (0.0, 0.0, 0.0),
|
||||
orientation: List[float] = (1.0, 0, 0, 0),
|
||||
model_file: Optional[str] = None,
|
||||
position: tuple[float, float, float] = (0.0, 0.0, 0.0),
|
||||
orientation: tuple[float, float, float, float] = (1.0, 0, 0, 0),
|
||||
model_file: str | None = None,
|
||||
use_fuel: bool = False,
|
||||
use_xacro: bool = False,
|
||||
xacro_file: str = "",
|
||||
xacro_mappings: Dict[str, any] = {},
|
||||
xacro_mappings: Dict[str, Any] = {},
|
||||
initial_arm_joint_positions: List[float] = DEFAULT_ARM_JOINT_POSITIONS,
|
||||
initial_gripper_joint_positions: List[float] = DEFAULT_GRIPPER_JOINT_POSITIONS,
|
||||
**kwargs
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import os
|
||||
from threading import Thread
|
||||
from typing import List, Optional, Union
|
||||
|
||||
from gym_gz.scenario import model_wrapper
|
||||
from gym_gz.utils.scenario import get_unique_model_name
|
||||
from scenario import core as scenario
|
||||
from scenario import core as scenario_core
|
||||
from scenario import gazebo as scenario_gz
|
||||
|
||||
from rbs_gym.envs.models.utils import ModelCollectionRandomizer
|
||||
|
||||
|
@ -12,10 +12,10 @@ from rbs_gym.envs.models.utils import ModelCollectionRandomizer
|
|||
class Camera(model_wrapper.ModelWrapper):
|
||||
def __init__(
|
||||
self,
|
||||
world: scenario.World,
|
||||
name: Union[str, None] = None,
|
||||
position: List[float] = (0, 0, 0),
|
||||
orientation: List[float] = (1, 0, 0, 0),
|
||||
world: scenario_gz.World,
|
||||
name: str | None = None,
|
||||
position: tuple[float, float, float] = (0, 0, 0),
|
||||
orientation: tuple[float, float, float, float] = (1, 0, 0, 0),
|
||||
static: bool = True,
|
||||
camera_type: str = "rgbd_camera",
|
||||
width: int = 212,
|
||||
|
@ -24,31 +24,37 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
update_rate: int = 15,
|
||||
horizontal_fov: float = 1.567821,
|
||||
vertical_fov: float = 1.022238,
|
||||
clip_color: List[float] = (0.02, 1000.0),
|
||||
clip_depth: List[float] = (0.02, 10.0),
|
||||
noise_mean: float = None,
|
||||
noise_stddev: float = None,
|
||||
clip_color: tuple[float, float] = (0.02, 1000.0),
|
||||
clip_depth: tuple[float, float] = (0.02, 10.0),
|
||||
noise_mean: float | None = None,
|
||||
noise_stddev: float | None = None,
|
||||
ros2_bridge_color: bool = False,
|
||||
ros2_bridge_depth: bool = False,
|
||||
ros2_bridge_points: bool = False,
|
||||
visibility_mask: int = 0,
|
||||
visual: Optional[str] = None,
|
||||
visual: str | None = None,
|
||||
# visual: Optional[str] = "intel_realsense_d435",
|
||||
):
|
||||
|
||||
# Get a unique model name
|
||||
if name is not None:
|
||||
model_name = get_unique_model_name(world, name)
|
||||
else:
|
||||
model_name = get_unique_model_name(world, camera_type)
|
||||
self._model_name = model_name
|
||||
self._camera_type = camera_type
|
||||
|
||||
# Initial pose
|
||||
initial_pose = scenario.Pose(position, orientation)
|
||||
initial_pose = scenario_core.Pose(position, orientation)
|
||||
|
||||
use_mesh: bool = False
|
||||
mesh_path_visual: str = ""
|
||||
|
||||
albedo_map = None
|
||||
normal_map = None
|
||||
roughness_map = None
|
||||
metalness_map = None
|
||||
# Get resources for visual (if enabled)
|
||||
if visual:
|
||||
use_mesh: bool = False
|
||||
if "intel_realsense_d435" == visual:
|
||||
use_mesh = True
|
||||
|
||||
|
@ -71,10 +77,6 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
)
|
||||
|
||||
# Find PBR textures
|
||||
albedo_map = None
|
||||
normal_map = None
|
||||
roughness_map = None
|
||||
metalness_map = None
|
||||
if texture_dir:
|
||||
# List all files
|
||||
texture_files = os.listdir(texture_dir)
|
||||
|
@ -94,7 +96,7 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
metalness_map = os.path.join(texture_dir, texture)
|
||||
|
||||
if not (albedo_map and normal_map and roughness_map and metalness_map):
|
||||
raise ValueError(f"Not all textures for Camera model were found.")
|
||||
raise ValueError("Not all textures for Camera model were found.")
|
||||
|
||||
# Create SDF string for the model
|
||||
sdf = f'''<sdf version="1.9">
|
||||
|
@ -225,6 +227,17 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
|
||||
if ros2_bridge_color or ros2_bridge_depth or ros2_bridge_points:
|
||||
self.__threads = []
|
||||
self.__threads.append(
|
||||
Thread(
|
||||
target=self.construct_ros2_bridge,
|
||||
args=(
|
||||
self.info_topic,
|
||||
"sensor_msgs/msg/CameraInfo",
|
||||
"gz.msgs.CameraInfo",
|
||||
),
|
||||
daemon=True,
|
||||
)
|
||||
)
|
||||
if ros2_bridge_color:
|
||||
self.__threads.append(
|
||||
Thread(
|
||||
|
@ -232,7 +245,7 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
args=(
|
||||
self.color_topic,
|
||||
"sensor_msgs/msg/Image",
|
||||
"ignition.msgs.Image",
|
||||
"gz.msgs.Image",
|
||||
),
|
||||
daemon=True,
|
||||
)
|
||||
|
@ -245,7 +258,7 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
args=(
|
||||
self.depth_topic,
|
||||
"sensor_msgs/msg/Image",
|
||||
"ignition.msgs.Image",
|
||||
"gz.msgs.Image",
|
||||
),
|
||||
daemon=True,
|
||||
)
|
||||
|
@ -258,7 +271,7 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
args=(
|
||||
self.points_topic,
|
||||
"sensor_msgs/msg/PointCloud2",
|
||||
"ignition.msgs.PointCloudPacked",
|
||||
"gz.msgs.PointCloudPacked",
|
||||
),
|
||||
daemon=True,
|
||||
)
|
||||
|
@ -273,10 +286,10 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
thread.join()
|
||||
|
||||
@classmethod
|
||||
def construct_ros2_bridge(self, topic: str, ros_msg: str, ign_msg: str):
|
||||
def construct_ros2_bridge(cls, topic: str, ros_msg: str, ign_msg: str):
|
||||
node_name = "parameter_bridge" + topic.replace("/", "_")
|
||||
command = (
|
||||
f"ros2 run ros_ign_bridge parameter_bridge {topic}@{ros_msg}[{ign_msg} "
|
||||
f"ros2 run ros_gz_bridge parameter_bridge {topic}@{ros_msg}[{ign_msg} "
|
||||
+ f"--ros-args --remap __node:={node_name} --ros-args -p use_sim_time:=true"
|
||||
)
|
||||
os.system(command)
|
||||
|
@ -290,22 +303,22 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
return self.get_frame_id(self._model_name)
|
||||
|
||||
@classmethod
|
||||
def get_color_topic(cls, model_name: str) -> str:
|
||||
return f"/{model_name}/image" if "rgbd" in model_name else f"/{model_name}"
|
||||
def get_color_topic(cls, model_name: str, camera_type: str) -> str:
|
||||
return f"/{model_name}/image" if "rgbd" in camera_type else f"/{model_name}"
|
||||
|
||||
@property
|
||||
def color_topic(self) -> str:
|
||||
return self.get_color_topic(self._model_name)
|
||||
return self.get_color_topic(self._model_name, self._camera_type)
|
||||
|
||||
@classmethod
|
||||
def get_depth_topic(cls, model_name: str) -> str:
|
||||
def get_depth_topic(cls, model_name: str, camera_type: str) -> str:
|
||||
return (
|
||||
f"/{model_name}/depth_image" if "rgbd" in model_name else f"/{model_name}"
|
||||
f"/{model_name}/depth_image" if "rgbd" in camera_type else f"/{model_name}"
|
||||
)
|
||||
|
||||
@property
|
||||
def depth_topic(self) -> str:
|
||||
return self.get_depth_topic(self._model_name)
|
||||
return self.get_depth_topic(self._model_name, self._camera_type)
|
||||
|
||||
@classmethod
|
||||
def get_points_topic(cls, model_name: str) -> str:
|
||||
|
@ -315,6 +328,10 @@ class Camera(model_wrapper.ModelWrapper):
|
|||
def points_topic(self) -> str:
|
||||
return self.get_points_topic(self._model_name)
|
||||
|
||||
@property
|
||||
def info_topic(self):
|
||||
return f"/{self._model_name}/camera_info"
|
||||
|
||||
@classmethod
|
||||
def get_link_name(cls, model_name: str) -> str:
|
||||
return f"{model_name}_link"
|
||||
|
|
|
@ -7,7 +7,7 @@ from .random_ground import RandomGround
|
|||
from .random_lunar_surface import RandomLunarSurface
|
||||
|
||||
|
||||
def get_terrain_model_class(terrain_type: str) -> ModelWrapper:
|
||||
def get_terrain_model_class(terrain_type: str) -> type[ModelWrapper]:
|
||||
# TODO: Refactor into enum
|
||||
|
||||
if "flat" == terrain_type:
|
||||
|
@ -20,6 +20,8 @@ def get_terrain_model_class(terrain_type: str) -> ModelWrapper:
|
|||
return LunarSurface
|
||||
elif "random_lunar_surface" == terrain_type:
|
||||
return RandomLunarSurface
|
||||
else:
|
||||
raise AttributeError(f"Unsupported terrain [{terrain_type}]")
|
||||
|
||||
|
||||
def is_terrain_type_randomizable(terrain_type: str) -> bool:
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -0,0 +1,120 @@
|
|||
import abc
|
||||
|
||||
import gymnasium as gym
|
||||
import numpy as np
|
||||
from gym_gz.utils.typing import Observation, ObservationSpace
|
||||
|
||||
from rbs_gym.envs.models.sensors import Camera
|
||||
from rbs_gym.envs.observation import CameraSubscriber
|
||||
from rbs_gym.envs.tasks.reach import Reach
|
||||
|
||||
|
||||
class ReachColorImage(Reach, abc.ABC):
|
||||
def __init__(
|
||||
self,
|
||||
camera_info: list[dict] = [],
|
||||
monochromatic: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
# Initialize the Task base class
|
||||
Reach.__init__(
|
||||
self,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# Store parameters for later use
|
||||
self._camera_width = sum(w.get("width", 0) for w in camera_info)
|
||||
self._camera_height = camera_info[0]["height"]
|
||||
self._monochromatic = monochromatic
|
||||
self._saved = False
|
||||
|
||||
self.camera_subs: list[CameraSubscriber] = []
|
||||
for camera in camera_info:
|
||||
# Perception (RGB camera)
|
||||
self.camera_subs.append(
|
||||
CameraSubscriber(
|
||||
node=self,
|
||||
topic=Camera.get_color_topic(camera["name"], camera["type"]),
|
||||
is_point_cloud=False,
|
||||
callback_group=self._callback_group,
|
||||
)
|
||||
)
|
||||
|
||||
def create_observation_space(self) -> ObservationSpace:
|
||||
"""
|
||||
Creates the observation space for the Imitation Learning algorithm.
|
||||
|
||||
This method returns a dictionary-based observation space that includes:
|
||||
|
||||
- **image**: A 2D image captured from the robot's camera. The image's shape is determined by the camera's
|
||||
width and height, and the number of channels (either monochromatic or RGB).
|
||||
- Dimensions: `(camera_height, camera_width, channels)` where `channels` is 1 for monochromatic images
|
||||
and 3 for RGB images.
|
||||
- Pixel values are in the range `[0, 255]` with data type `np.uint8`.
|
||||
|
||||
- **joint_states**: The joint positions of the robot arm, represented as continuous values within the
|
||||
range `[-1.0, 1.0]`, where `nDoF` refers to the number of degrees of freedom of the robot arm.
|
||||
|
||||
Returns:
|
||||
gym.spaces.Dict: A dictionary defining the observation space for the learning algorithm,
|
||||
containing both image and joint_states information.
|
||||
"""
|
||||
return gym.spaces.Dict(
|
||||
{
|
||||
"image": gym.spaces.Box(
|
||||
low=0,
|
||||
high=255,
|
||||
shape=(
|
||||
self._camera_height,
|
||||
self._camera_width,
|
||||
1 if self._monochromatic else 3,
|
||||
),
|
||||
dtype=np.uint8,
|
||||
),
|
||||
"joint_states": gym.spaces.Box(low=-1.0, high=1.0),
|
||||
}
|
||||
)
|
||||
|
||||
def get_observation(self) -> Observation:
|
||||
# Get the latest images
|
||||
image = []
|
||||
|
||||
for sub in self.camera_subs:
|
||||
image.append(sub.get_observation())
|
||||
|
||||
image_width = sum(i.width for i in image)
|
||||
image_height = image[0].height
|
||||
# if image_width == self._camera_width and image_height == self._camera_height:
|
||||
|
||||
# image_data = np.concatenate([i.data for i in image], axis=1)
|
||||
|
||||
assert (
|
||||
image_width == self._camera_width and image_height == self._camera_height
|
||||
), f"Error: Resolution of the input image does not match the configured observation space. ({image_width}x{image_height} instead of {self._camera_width}x{self._camera_height})"
|
||||
|
||||
# Reshape and create the observation
|
||||
# color_image = np.array([i.data for i in image], dtype=np.uint8).reshape(
|
||||
# self._camera_height, self._camera_width, 3
|
||||
# )
|
||||
color_image = np.concatenate(
|
||||
[
|
||||
np.array(i.data, dtype=np.uint8).reshape(i.height, i.width, 3)
|
||||
for i in image
|
||||
],
|
||||
axis=1,
|
||||
)
|
||||
|
||||
# # Debug save images
|
||||
# from PIL import Image
|
||||
# img_color = Image.fromarray(color_image)
|
||||
# img_color.save("img_color.png")
|
||||
|
||||
if self._monochromatic:
|
||||
observation = Observation(color_image[:, :, 0])
|
||||
else:
|
||||
observation = Observation(color_image)
|
||||
|
||||
self.get_logger().debug(f"\nobservation: {observation}")
|
||||
|
||||
# Return the observation
|
||||
return observation
|
|
@ -12,9 +12,7 @@ from rbs_gym.envs.tasks.reach import Reach
|
|||
class ReachColorImage(Reach, abc.ABC):
|
||||
def __init__(
|
||||
self,
|
||||
camera_width: int,
|
||||
camera_height: int,
|
||||
camera_type: str = "camera",
|
||||
camera_info: list[dict] = [],
|
||||
monochromatic: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
|
@ -26,17 +24,20 @@ class ReachColorImage(Reach, abc.ABC):
|
|||
)
|
||||
|
||||
# Store parameters for later use
|
||||
self._camera_width = camera_width
|
||||
self._camera_height = camera_height
|
||||
self._camera_width = sum(w.get('width', 0) for w in camera_info)
|
||||
self._camera_height = camera_info[0]["height"]
|
||||
self._monochromatic = monochromatic
|
||||
self._saved = False
|
||||
|
||||
# Perception (RGB camera)
|
||||
self.camera_sub = CameraSubscriber(
|
||||
node=self,
|
||||
topic=Camera.get_color_topic(camera_type),
|
||||
is_point_cloud=False,
|
||||
callback_group=self._callback_group,
|
||||
)
|
||||
self.camera_subs: list[CameraSubscriber] = []
|
||||
for camera in camera_info:
|
||||
# Perception (RGB camera)
|
||||
self.camera_subs.append(CameraSubscriber(
|
||||
node=self,
|
||||
topic=Camera.get_color_topic(camera["name"], camera["type"]),
|
||||
is_point_cloud=False,
|
||||
callback_group=self._callback_group,
|
||||
))
|
||||
|
||||
def create_observation_space(self) -> ObservationSpace:
|
||||
|
||||
|
@ -54,17 +55,22 @@ class ReachColorImage(Reach, abc.ABC):
|
|||
)
|
||||
|
||||
def get_observation(self) -> Observation:
|
||||
# Get the latest images
|
||||
image = []
|
||||
|
||||
# Get the latest image
|
||||
image = self.camera_sub.get_observation()
|
||||
for sub in self.camera_subs:
|
||||
image.append(sub.get_observation())
|
||||
|
||||
image_width = sum(i.width for i in image)
|
||||
image_height = image[0].height
|
||||
|
||||
assert (
|
||||
image.width == self._camera_width and image.height == self._camera_height
|
||||
), f"Error: Resolution of the input image does not match the configured observation space. ({image.width}x{image.height} instead of {self._camera_width}x{self._camera_height})"
|
||||
image_width == self._camera_width and image_height == self._camera_height
|
||||
), f"Error: Resolution of the input image does not match the configured observation space. ({image_width}x{image_height} instead of {self._camera_width}x{self._camera_height})"
|
||||
|
||||
# Reshape and create the observation
|
||||
color_image = np.array(image.data, dtype=np.uint8).reshape(
|
||||
self._camera_height, self._camera_width, 3
|
||||
color_image = np.concatenate(
|
||||
[np.array(i.data, dtype=np.uint8).reshape(i.height, i.width, 3) for i in image],
|
||||
axis=1
|
||||
)
|
||||
|
||||
# # Debug save images
|
||||
|
|
|
@ -164,13 +164,12 @@ def orientation_quat_to_6d(
|
|||
|
||||
|
||||
def quat_to_wxyz(
|
||||
xyzw: Union[numpy.ndarray, Tuple[float, float, float, float]]
|
||||
) -> numpy.ndarray:
|
||||
xyzw: tuple[float, float, float, float]
|
||||
) -> tuple[float, float, float, float]:
|
||||
|
||||
if isinstance(xyzw, tuple):
|
||||
return (xyzw[3], xyzw[0], xyzw[1], xyzw[2])
|
||||
return (xyzw[3], xyzw[0], xyzw[1], xyzw[2])
|
||||
|
||||
return xyzw[[3, 0, 1, 2]]
|
||||
# return xyzw[[3, 0, 1, 2]]
|
||||
|
||||
|
||||
def quat_to_xyzw(
|
||||
|
|
|
@ -103,12 +103,12 @@ def get_model_orientation(
|
|||
|
||||
def transform_move_to_model_pose(
|
||||
world: World,
|
||||
position: Tuple[float, float, float],
|
||||
quat: Tuple[float, float, float, float],
|
||||
target_model: Union[ModelWrapper, str],
|
||||
target_link: Union[Link, str, None] = None,
|
||||
position: tuple[float, float, float],
|
||||
quat: tuple[float, float, float, float],
|
||||
target_model: ModelWrapper | str,
|
||||
target_link: Link | str | None = None,
|
||||
xyzw: bool = False,
|
||||
) -> Tuple[Tuple[float, float, float], Tuple[float, float, float, float]]:
|
||||
) -> tuple[tuple[float, float, float], tuple[float, float, float, float]]:
|
||||
"""
|
||||
Transform such that original `position` and `quat` are represented with respect to `target_model::target_link`.
|
||||
The resulting pose is still represented in world coordinate system.
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
<!-- -->
|
||||
<!-- Scene -->
|
||||
<!-- -->
|
||||
<gravity>0 0 0</gravity>
|
||||
<!-- <gravity>0 0 0</gravity> -->
|
||||
<scene>
|
||||
<ambient>1.0 1.0 1.0</ambient>
|
||||
<grid>false</grid>
|
||||
|
|
0
env_manager/rbs_gym/runtime/__init__.py
Normal file
0
env_manager/rbs_gym/runtime/__init__.py
Normal file
|
@ -10,10 +10,13 @@ from rbs_gym import envs as gz_envs
|
|||
from rbs_gym.utils.utils import StoreDict, str2bool
|
||||
|
||||
|
||||
def main(args: Dict):
|
||||
def main(args: argparse.Namespace):
|
||||
|
||||
# Create the environment
|
||||
env = gym.make(args.env, **args.env_kwargs)
|
||||
if args.env_kwargs is not None:
|
||||
env = gym.make(args.env, **args.env_kwargs)
|
||||
else:
|
||||
env = gym.make(args.env)
|
||||
|
||||
# Initialize random seed
|
||||
env.seed(args.seed)
|
||||
|
@ -58,7 +61,7 @@ if __name__ == "__main__":
|
|||
|
||||
# Environment and its parameters
|
||||
parser.add_argument(
|
||||
"--env", type=str, default="Reach-Gazebo-v0", help="Environment ID"
|
||||
"--env", type=str, default="Reach-ColorImage-Gazebo-v0", help="Environment ID"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--env-kwargs",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue