add dynamic terrain and TF broadcaster support

- Added `RandomGround` terrain model for random ground generation with texture randomization.
- Integrated `Tf2DynamicBroadcaster` for dynamic TF broadcasting.
- Enhanced the `Scene` class to support dynamic terrain and marker publishing.
- Updated imports to include new utilities and modules for dynamic broadcasting and terrain handling.
This commit is contained in:
Ilya Uraev 2024-11-30 19:26:09 +03:00
parent 1f879e2dd6
commit 48e8668136
5 changed files with 437 additions and 12 deletions

View file

@ -8,6 +8,7 @@ from .objects import (
ObjectData, ObjectData,
PlaneObjectData, PlaneObjectData,
SphereObjectData, SphereObjectData,
ObjectRandomizerData,
) )
from .robot import RobotData from .robot import RobotData
from .terrain import TerrainData from .terrain import TerrainData

View file

@ -0,0 +1,137 @@
import os
import numpy as np
from gym_gz.scenario import model_wrapper
from gym_gz.utils.scenario import get_unique_model_name
from scenario import core as scenario_core
from scenario import gazebo as scenario_gazebo
from rbs_assets_library import get_textures_path
class RandomGround(model_wrapper.ModelWrapper):
def __init__(
self,
world: scenario_gazebo.World,
name: str = "random_ground",
position: tuple[float, float, float] = (0, 0, 0),
orientation: tuple[float, float, float, float] = (1, 0, 0, 0),
size: tuple[float, float] = (1.0, 1.0),
friction: float = 5.0,
texture_dir: str | None = None,
**kwargs,
):
np_random = np.random.default_rng()
# Get a unique model name
model_name = get_unique_model_name(world, name)
# Initial pose
initial_pose = scenario_core.Pose(position, orientation)
# Get textures from ENV variable if not directly specified
if not texture_dir:
texture_dir = os.environ.get("TEXTURE_DIRS", default="")
if not texture_dir:
texture_dir = get_textures_path()
# Find random PBR texture
albedo_map = None
normal_map = None
roughness_map = None
metalness_map = None
if texture_dir:
if ":" in texture_dir:
textures = []
for d in texture_dir.split(":"):
textures.extend([os.path.join(d, f) for f in os.listdir(d)])
else:
# Get list of the available textures
textures = os.listdir(texture_dir)
# Choose a random texture from these
random_texture_dir = str(np_random.choice(textures))
random_texture_dir = texture_dir + random_texture_dir
# List all files
texture_files = os.listdir(random_texture_dir)
# Extract the appropriate files
for texture in texture_files:
texture_lower = texture.lower()
if "color" in texture_lower or "albedo" in texture_lower:
albedo_map = os.path.join(random_texture_dir, texture)
elif "normal" in texture_lower:
normal_map = os.path.join(random_texture_dir, texture)
elif "roughness" in texture_lower:
roughness_map = os.path.join(random_texture_dir, texture)
elif "specular" in texture_lower or "metalness" in texture_lower:
metalness_map = os.path.join(random_texture_dir, texture)
# Create SDF string for the model
sdf = f"""<sdf version="1.9">
<model name="{model_name}">
<static>true</static>
<link name="{model_name}_link">
<collision name="{model_name}_collision">
<geometry>
<plane>
<normal>0 0 1</normal>
<size>{size[0]} {size[1]}</size>
</plane>
</geometry>
<surface>
<friction>
<ode>
<mu>{friction}</mu>
<mu2>{friction}</mu2>
<fdir1>0 0 0</fdir1>
<slip1>0.0</slip1>
<slip2>0.0</slip2>
</ode>
</friction>
</surface>
</collision>
<visual name="{model_name}_visual">
<geometry>
<plane>
<normal>0 0 1</normal>
<size>{size[0]} {size[1]}</size>
</plane>
</geometry>
<material>
<ambient>1 1 1 1</ambient>
<diffuse>1 1 1 1</diffuse>
<specular>1 1 1 1</specular>
<pbr>
<metal>
{"<albedo_map>%s</albedo_map>"
% albedo_map if albedo_map is not None else ""}
{"<normal_map>%s</normal_map>"
% normal_map if normal_map is not None else ""}
{"<roughness_map>%s</roughness_map>"
% roughness_map if roughness_map is not None else ""}
{"<metalness_map>%s</metalness_map>"
% metalness_map if metalness_map is not None else ""}
</metal>
</pbr>
</material>
</visual>
</link>
</model>
</sdf>"""
# Insert the model
ok_model = world.to_gazebo().insert_model_from_string(
sdf, initial_pose, model_name
)
if not ok_model:
raise RuntimeError("Failed to insert " + model_name)
# Get the model
model = world.get_model(model_name)
# Initialize base class
model_wrapper.ModelWrapper.__init__(self, model=model)

View file

@ -1,23 +1,34 @@
import asyncio import asyncio
from dataclasses import asdict from dataclasses import asdict
from gym_gz.scenario.model_wrapper import ModelWrapper
from rbs_assets_library import get_model_meshes_info
import rclpy
import time
from threading import Thread
import numpy as np import numpy as np
from rcl_interfaces.srv import GetParameters from rcl_interfaces.srv import GetParameters
from rclpy.node import Node, Parameter from rclpy.node import Node, Parameter
from scenario.bindings.gazebo import GazeboSimulator, PhysicsEngine_dart, World from scenario.bindings.gazebo import GazeboSimulator, PhysicsEngine_dart, World
from scipy.spatial import distance from scipy.spatial import distance
from scipy.spatial.transform import Rotation from scipy.spatial.transform import Rotation
from yaml import Mark
from env_manager.models.terrains.random_ground import RandomGround
from ..models import Box, Camera, Cylinder, Ground, Mesh, Model, Plane, Sphere, Sun from ..models import Box, Camera, Cylinder, Ground, Mesh, Model, Plane, Sphere, Sun
from ..models.configs import CameraData, ObjectData, SceneData from ..models.configs import CameraData, ObjectData, SceneData
from ..models.robots import get_robot_model_class from ..models.robots import get_robot_model_class
from ..utils import Tf2Broadcaster from ..utils import Tf2Broadcaster, Tf2DynamicBroadcaster
from ..utils.conversions import quat_to_wxyz from ..utils.conversions import quat_to_wxyz
from ..utils.gazebo import ( from ..utils.gazebo import (
transform_move_to_model_pose, transform_move_to_model_pose,
transform_move_to_model_position, transform_move_to_model_position,
) )
from ..utils.types import Point, Pose from ..utils.types import Point, Pose
from visualization_msgs.msg import MarkerArray, Marker
from geometry_msgs.msg import Point as RosPoint
from geometry_msgs.msg import Quaternion as RosQuat
# TODO: Split scene randomizer and scene # TODO: Split scene randomizer and scene
@ -71,6 +82,7 @@ class Scene:
self.world: World = gazebo.get_world() self.world: World = gazebo.get_world()
self.__objects_unique_names = {} self.__objects_unique_names = {}
self.object_models: list[ModelWrapper] = []
self.__object_positions: dict[str, Point] = {} self.__object_positions: dict[str, Point] = {}
self.__objects_workspace_centre: Point = (0.0, 0.0, 0.0) self.__objects_workspace_centre: Point = (0.0, 0.0, 0.0)
@ -88,6 +100,7 @@ class Scene:
scene.robot.urdf_string = robot_urdf_string scene.robot.urdf_string = robot_urdf_string
self.tf2_broadcaster = Tf2Broadcaster(node=node) self.tf2_broadcaster = Tf2Broadcaster(node=node)
self.tf2_broadcaster_dyn = Tf2DynamicBroadcaster(node=node)
self.__workspace_center = (0.0, 0.0, 0.0) self.__workspace_center = (0.0, 0.0, 0.0)
self.__workspace_dimensions = (1.5, 1.5, 1.5) self.__workspace_dimensions = (1.5, 1.5, 1.5)
@ -107,6 +120,9 @@ class Scene:
self.__workspace_center[2] + half_depth, self.__workspace_center[2] + half_depth,
) )
# self.marker_array = MarkerArray()
async def get_parameter_async(self): async def get_parameter_async(self):
""" """
Asynchronously retrieves the robot_description parameter from the robot state publisher. Asynchronously retrieves the robot_description parameter from the robot state publisher.
@ -165,6 +181,7 @@ class Scene:
This method must be called after the Scene object has been initialized This method must be called after the Scene object has been initialized
and before running the simulation. and before running the simulation.
""" """
self.gazebo_paused_run() self.gazebo_paused_run()
self.init_world_plugins() self.init_world_plugins()
@ -494,16 +511,199 @@ class Scene:
self.__objects_unique_names[obj.name].append(model_name) self.__objects_unique_names[obj.name].append(model_name)
self.__object_positions[model_name] = obj.position self.__object_positions[model_name] = obj.position
self.object_models.append(object_wrapper.model)
# Enable contact detection # Enable contact detection
for link_name in object_wrapper.link_names(): for link_name in object_wrapper.link_names():
link = object_wrapper.to_gazebo().get_link(link_name=link_name) link = object_wrapper.to_gazebo().get_link(link_name=link_name)
link.enable_contact_detection(True) link.enable_contact_detection(True)
self.node.get_logger().info(
f"Add object with name [{obj.name}], relative_to [{obj.relative_to}]"
)
self.node.get_logger().info(f"On position: {obj.position}")
self.node.get_logger().info(f"On orientation: {obj.orientation}")
publish_tf = True
publish_marker = True
if publish_tf:
if not hasattr(self, "tf_broadcaster_and_markers"):
self.tf_broadcaster_and_markers = self.node.create_timer(0.1, self.publish_marker)
if publish_marker:
if not hasattr(self, "marker_array"):
self.marker_array = MarkerArray()
self.marker_publisher = self.node.create_publisher(MarkerArray, "/markers", 10)
if not hasattr(self, "tf_broadcaster_and_markers"):
self.tf_broadcaster_and_markers = self.node.create_timer(0.1, self.publish_marker)
marker = self.create_marker(obj, object_wrapper, len(self.object_models))
self.marker_array.markers.append(marker)
except Exception as ex: except Exception as ex:
self.node.get_logger().warn(f"Model could not be inserted. Reason: {ex}") self.node.get_logger().warn(f"Model could not be inserted. Reason: {ex}")
self.gazebo_paused_run self.gazebo_paused_run
def create_marker(self, object: ObjectData, model: ModelWrapper, id):
"""
Create a single marker.
"""
marker = Marker()
marker.header.frame_id = object.relative_to
marker.header.stamp = self.node.get_clock().now().to_msg()
marker.ns = "assembly"
marker.id = id
if object.type == "mesh" or object.type == "model":
marker.type = Marker.MESH_RESOURCE
marker.mesh_resource = f"file://{get_model_meshes_info(object.name)['visual']}"
marker.mesh_use_embedded_materials = True
marker.scale.x = 1.0
marker.scale.y = 1.0
marker.scale.z = 1.0
marker.color.r = 0.0
marker.color.g = 0.5
marker.color.b = 1.0
marker.color.a = 1.0
self.node.get_logger().info(f"Created mesh marker for {object.name}")
elif object.type == "sphere":
marker.type = Marker.SPHERE
marker.scale.x = object.radius
marker.scale.y = object.radius
marker.scale.z = object.radius
marker.color.r = 0.0
marker.color.g = 0.5
marker.color.b = 1.0
marker.color.a = 1.0
self.node.get_logger().info(f"Created sphere marker for {object.name}")
elif object.type == "box":
marker.type = Marker.CUBE
marker.scale.x = object.size[0]
marker.scale.y = object.size[1]
marker.scale.z = object.size[2]
marker.color.r = 0.0
marker.color.g = 0.5
marker.color.b = 1.0
marker.color.a = 1.0
self.node.get_logger().info(f"Created box marker for {object.name}")
elif object.type == "cylinder":
marker.type = Marker.CYLINDER
marker.scale.x = object.radius * 2
marker.scale.y = object.radius * 2
marker.scale.z = object.length
marker.color.r = 0.0
marker.color.g = 0.5
marker.color.b = 1.0
marker.color.a = 1.0
self.node.get_logger().info(f"Created cylinder marker for {object.name}")
else:
self.node.get_logger().fatal(f"Unsupported object type: {object.type}")
return None
marker.action = Marker.ADD
model_position, model_orientation = self.rotate_around_parent(model.base_position(), model.base_orientation(), (0.0, 0.0, 0.0), (1.0, 0.0, 0.0, 0.0))
position = RosPoint()
position.x = model_position[0]
position.y = model_position[1]
position.z = model_position[2]
orientation = RosQuat()
orientation.x = model_orientation[3]
orientation.y = model_orientation[1]
orientation.z = model_orientation[2]
orientation.w = model_orientation[0]
marker.pose.position = position
marker.pose.orientation = orientation
return marker
def publish_marker(self):
for idx, model in enumerate(self.object_models):
# model_position, model_orientation = self.rotate_around_parent(model.base_position(), model.base_orientation(), (0.0, 0.0, 0.0), (1.0, 0.0, 0.0, 0.0))
model_position, model_orientation = (model.base_position(), model.base_orientation())
self.marker_array.markers[idx].pose.position.x = float(model_position[0])
self.marker_array.markers[idx].pose.position.y = float(model_position[1])
self.marker_array.markers[idx].pose.position.z = float(model_position[2])
self.marker_array.markers[idx].pose.orientation.w = float(model_orientation[0])
self.marker_array.markers[idx].pose.orientation.x = float(model_orientation[1])
self.marker_array.markers[idx].pose.orientation.y = float(model_orientation[2])
self.marker_array.markers[idx].pose.orientation.z = float(model_orientation[3])
self.tf2_broadcaster_dyn.broadcast_tf(
parent_frame_id="world",
child_frame_id=model.base_frame(),
translation=model_position,
rotation=model_orientation,
xyzw=False,
)
self.marker_publisher.publish(self.marker_array)
def rotate_around_parent(self, position, orientation, parent_position, parent_orientation):
"""
Apply a 180-degree rotation around the parent's Z-axis to the object's pose.
Args:
position: Tuple (x, y, z) - the object's position relative to the parent.
orientation: Tuple (w, x, y, z) - the object's orientation relative to the parent.
parent_position: Tuple (x, y, z) - the parent's position.
parent_orientation: Tuple (w, x, y, z) - the parent's orientation.
Returns:
A tuple containing the updated position and orientation:
(new_position, new_orientation)
"""
# Define a quaternion for 180-degree rotation around Z-axis (w, x, y, z)
rotation_180_z = (0.0, 0.0, 0.0, 1.0)
w1, x1, y1, z1 = orientation
# Кватернион для 180° поворота вокруг Z
w2, x2, y2, z2 = (0, 0, 0, 1)
# Умножение кватернионов
w3 = w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2
x3 = w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2
y3 = w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2
z3 = w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2
new_orientation = (w3, x3, y3, z3)
# Compute the new orientation: parent_orientation * rotation_180_z * orientation
# new_orientation = (
# parent_orientation[0] * rotation_180_z[0] - parent_orientation[1] * rotation_180_z[1] -
# parent_orientation[2] * rotation_180_z[2] - parent_orientation[3] * rotation_180_z[3],
# parent_orientation[0] * rotation_180_z[1] + parent_orientation[1] * rotation_180_z[0] +
# parent_orientation[2] * rotation_180_z[3] - parent_orientation[3] * rotation_180_z[2],
# parent_orientation[0] * rotation_180_z[2] - parent_orientation[1] * rotation_180_z[3] +
# parent_orientation[2] * rotation_180_z[0] + parent_orientation[3] * rotation_180_z[1],
# parent_orientation[0] * rotation_180_z[3] + parent_orientation[1] * rotation_180_z[2] -
# parent_orientation[2] * rotation_180_z[1] + parent_orientation[3] * rotation_180_z[0],
# )
# Apply rotation to the position
rotated_position = (
-position[0], # Rotate 180 degrees around Z-axis: x -> -x
-position[1], # y -> -y
position[2] # z remains unchanged
)
# Add parent position
new_position = (
parent_position[0] + rotated_position[0],
parent_position[1] + rotated_position[1],
parent_position[2] + rotated_position[2],
)
return new_position, new_orientation
def add_light(self): def add_light(self):
""" """
Configure and insert a default light source into the simulation. Configure and insert a default light source into the simulation.
@ -566,7 +766,7 @@ class Scene:
changes are reflected in the environment. changes are reflected in the environment.
""" """
# TODO: add another terrains if self.terrain.type == "flat":
self.terrain_wrapper = Ground( self.terrain_wrapper = Ground(
self.world, self.world,
name=self.terrain.name, name=self.terrain.name,
@ -577,6 +777,18 @@ class Scene:
diffuse=self.terrain.diffuse, diffuse=self.terrain.diffuse,
specular=self.terrain.specular, specular=self.terrain.specular,
) )
elif self.terrain.type == "random_flat":
self.terrain_wrapper = RandomGround(
self.world,
name=self.terrain.name,
position=self.terrain.spawn_position,
orientation=quat_to_wxyz(self.terrain.spawn_quat_xyzw),
size=self.terrain.size,
)
else:
raise ValueError(
"Type of ground is not supported, supported type is: [ground, random_ground]"
)
# Enable contact detection # Enable contact detection
for link_name in self.terrain_wrapper.link_names(): for link_name in self.terrain_wrapper.link_names():

View file

@ -1,3 +1,4 @@
from . import conversions, gazebo, logging, math, types from . import conversions, gazebo, logging, math, types
from .tf2_broadcaster import Tf2Broadcaster, Tf2BroadcasterStandalone from .tf2_broadcaster import Tf2Broadcaster, Tf2BroadcasterStandalone
from .tf2_dynamic_broadcaster import Tf2DynamicBroadcaster
from .tf2_listener import Tf2Listener, Tf2ListenerStandalone from .tf2_listener import Tf2Listener, Tf2ListenerStandalone

View file

@ -0,0 +1,74 @@
import sys
from typing import Tuple
import rclpy
from geometry_msgs.msg import TransformStamped
from rclpy.node import Node
from rclpy.parameter import Parameter
from tf2_ros import TransformBroadcaster
class Tf2DynamicBroadcaster:
def __init__(
self,
node: Node,
):
self._node = node
self.__tf2_broadcaster = TransformBroadcaster(node=self._node)
self._transform_stamped = TransformStamped()
def broadcast_tf(
self,
parent_frame_id: str,
child_frame_id: str,
translation: Tuple[float, float, float],
rotation: Tuple[float, float, float, float],
xyzw: bool = True,
):
"""
Broadcast transformation of the camera
"""
self._transform_stamped.header.frame_id = parent_frame_id
self._transform_stamped.child_frame_id = child_frame_id
self._transform_stamped.header.stamp = self._node.get_clock().now().to_msg()
self._transform_stamped.transform.translation.x = float(translation[0])
self._transform_stamped.transform.translation.y = float(translation[1])
self._transform_stamped.transform.translation.z = float(translation[2])
if xyzw:
self._transform_stamped.transform.rotation.x = float(rotation[0])
self._transform_stamped.transform.rotation.y = float(rotation[1])
self._transform_stamped.transform.rotation.z = float(rotation[2])
self._transform_stamped.transform.rotation.w = float(rotation[3])
else:
self._transform_stamped.transform.rotation.w = float(rotation[0])
self._transform_stamped.transform.rotation.x = float(rotation[1])
self._transform_stamped.transform.rotation.y = float(rotation[2])
self._transform_stamped.transform.rotation.z = float(rotation[3])
self.__tf2_broadcaster.sendTransform(self._transform_stamped)
class Tf2BroadcasterStandalone(Node, Tf2DynamicBroadcaster):
def __init__(
self,
node_name: str = "env_manager_tf_broadcaster",
use_sim_time: bool = True,
):
try:
rclpy.init()
except Exception as e:
if not rclpy.ok():
sys.exit(f"ROS 2 context could not be initialised: {e}")
Node.__init__(self, node_name)
self.set_parameters(
[Parameter("use_sim_time", type_=Parameter.Type.BOOL, value=use_sim_time)]
)
Tf2DynamicBroadcaster.__init__(self, node=self)