add convert to LeRobot

This commit is contained in:
shalenikol 2025-06-10 13:11:11 +03:00
parent e45a00d664
commit 000dc1cc08
2 changed files with 387 additions and 0 deletions

View file

@ -0,0 +1,11 @@
## Скрипт конвертации записанных эпизодов (rosbag) в формат LeRobot-датасет.
Команда запуска:
```bash
python convert_rosbag_to_lerobot.py /path/to/rosbag/episodes
```
При успешном выполнении в рабочей папке образуются два json-файла с данными эпизодов:
- ros2bag_msg.json
- ros2bag_msg_synced.json
Файлы изображений сохраняются в папке 'frames' по эпизодам.
И, наконец, датасет в формате LeRobot сохраняется в папку 'converted_dataset'.

View file

@ -0,0 +1,376 @@
import os
import sys
import json
import argparse
from typing import List
from pathlib import Path
from PIL import Image
import numpy as np
import cv2
import time
from cv_bridge import CvBridge
from rosbags.highlevel import AnyReader
from rosbags.typesys import Stores, get_typestore
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
FPS = 30 # default
REPO_ID = "rbs_ros2bag"
OUTPUT_ROOT = "./converted_dataset"
USE_VIDEOS = False # используем PNG-фреймы, так что формат — изображения
start_time = time.time() # Запоминаем время начала
def convert_seconds(total_seconds) -> str:
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
return f"{hours:02}:{minutes:02}:{seconds:02}"
def has_db3_file(directory) -> bool:
return any(file.suffix == '.db3' for file in Path(directory).iterdir() if file.is_file())
def find_folders_with_db3_files(directory: Path) -> List[Path]:
# Получаем все вложенные папки
subfolders = [folder for folder in directory.iterdir() if folder.is_dir() and has_db3_file(folder)]
# Сортируем список папок
return sorted(subfolders)
def to_lerobot_dataset(json_file: Path):
# === ЗАГРУЗКА ДАННЫХ ===
with open(json_file, "r") as f:
data = json.load(f)
camera_keys = data["cameras"]
image_shape = data["image_shape"]
cam_features = [cam.lstrip('/').replace('/', '.') for cam in camera_keys] # ['/robot_camera/depth_image', '/robot_camera/image']
robot_joint_names = data["episodes"][0]["frames"][0]["joint_state"]["name"]
nof_joints = len(robot_joint_names)
estimated_fps = data["estimated_fps"]
fps = FPS if estimated_fps < 0.1 else estimated_fps
# === ОПРЕДЕЛЕНИЕ FEATURES ===
features = {
cam: {
"dtype": "image",
"shape": image_shape,
"names": ["height", "width", "channels"]
}
for cam in cam_features
}
features.update({
"observation.state": {
"dtype": "float32",
"shape": (nof_joints,),
"names": robot_joint_names,
},
})
# features.update({
# "joint_position": {
# "dtype": "float32",
# "shape": (nof_joints,),
# "names": robot_joint_names,
# },
# "joint_velocity": {
# "dtype": "float32",
# "shape": (nof_joints,),
# "names": robot_joint_names,
# },
# "joint_effort": {
# "dtype": "float32",
# "shape": (nof_joints,),
# "names": robot_joint_names,
# },
# })
features.update({
"action": {
"dtype": "float32",
"shape": (nof_joints,),
"names": robot_joint_names,
},
})
# === СОЗДАНИЕ LeRobotDataset ===
dataset = LeRobotDataset.create(
repo_id=REPO_ID,
fps=fps,
root=OUTPUT_ROOT,
features=features,
use_videos=USE_VIDEOS,
)
for episode in data["episodes"]:
# === КОНВЕРТАЦИЯ ФРЕЙМОВ ===
for frame in episode["frames"]:
frame_data = {
# "timestamp": np.array((idx / fps,), dtype=np.float32),
# "timestamp": np.array([idx / fps], dtype=np.float32),
"task": "default_task",
"observation.state": np.array(frame["joint_state"]["pos"], dtype=np.float32),
"action": np.array(frame["joint_state"]["pos"], dtype=np.float32),
# "joint_position": np.array(frame["joint_state"]["pos"], dtype=np.float32),
# "joint_velocity": np.array(frame["joint_state"]["vel"], dtype=np.float32),
# "joint_effort": np.array(frame["joint_state"]["eff"], dtype=np.float32),
}
# print(f"{idx=}) {frame_data['timestamp'].shape=} {frame_data['timestamp']=}")
for cam_idx, cam in enumerate(camera_keys):
image_path = Path(frame[cam])
image = Image.open(image_path).convert("RGB")
frame_data[cam_features[cam_idx]] = np.array(image)
dataset.add_frame(frame_data)
# === СОХРАНЕНИЕ ЭПИЗОДА ===
dataset.save_episode()
def add_episode(dir: Path) -> None:
image_shape = None
episode = {
"bag": str(dir.resolve()),
"eidx": add_episode.counter,
"num_joint_state": 0,
"joint_states": [],
"num_frame": 0,
"frames": []
}
e_image_dir = "episode" + str(add_episode.counter+1) # folder for images of the episode
with AnyReader([dir], default_typestore=extract_rosbag_to_json.typestore) as reader:
camera_topics = [] # for update
frame_index = -1
start_topic = ""
num_js = 0
for conn, timestamp, rawdata in reader.messages():
topic = conn.topic
msg = reader.deserialize(rawdata, conn.msgtype)
if topic == extract_rosbag_to_json.joint_topic:
def clean_array(arr):
return [0.0 if isinstance(v, float) and (v != v) else v for v in arr] # NaN != NaN
num_js += 1
episode["joint_states"].append({
"timestamp": timestamp,
"joint_state": {
"name": list(msg.name),
"pos": clean_array(list(msg.position)),
"vel": clean_array(list(msg.velocity)),
"eff": clean_array(list(msg.effort)),
}
})
elif topic in extract_rosbag_to_json.camera_topics:
try:
if topic in extract_rosbag_to_json.cim_topic:
img_cv = extract_rosbag_to_json.bridge.compressed_imgmsg_to_cv2(msg)
else:
img_cv = extract_rosbag_to_json.bridge.imgmsg_to_cv2(msg) #, desired_encoding="passthrough")
if img_cv is None:
continue
elif len(img_cv.shape) == 2:
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_GRAY2RGB)
elif img_cv.shape[2] == 4:
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_BGRA2RGB)
else:
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2RGB)
if frame_index < 0:
start_topic = topic
if topic == start_topic:
if image_shape == None:
image_shape = img_cv.shape
frame_index += 1
camera_name = topic.lstrip('/').replace('/', '_')
img_filename = f"{e_image_dir}/{camera_name}/frame_{frame_index:06d}.png"
img_path = extract_rosbag_to_json.image_dir / img_filename
img_path.parent.parent.mkdir(exist_ok=True)
img_path.parent.mkdir(exist_ok=True)
Image.fromarray(img_cv).save(img_path)
episode["frames"].append({
"timestamp": timestamp,
topic: str(img_path) #.relative_to(Path.cwd())) # str(img_filename)
})
if topic not in camera_topics:
camera_topics.append(topic)
except Exception as e:
print(f"[!] Failed to process image from {topic} @ {timestamp}: {img_cv=} {e}")
if len(episode["frames"]):
# only those topics will remain for which there are frames
print(f"{camera_topics=}")
if len(camera_topics) < len(extract_rosbag_to_json.camera_topics):
extract_rosbag_to_json.camera_topics = camera_topics
extract_rosbag_to_json.output["cameras"] = camera_topics
episode["num_frame"] = frame_index + 1
episode["num_joint_state"] = num_js
extract_rosbag_to_json.output["episodes"].append(episode)
extract_rosbag_to_json.image_shape = image_shape # applies to the entire dataset
add_episode.counter += 1
def common_part_json(dir: Path) -> None:
# common part
with AnyReader([dir], default_typestore=extract_rosbag_to_json.typestore) as reader:
# === Определяем топики ===
for conn in reader.connections:
if conn.msgtype == "sensor_msgs/msg/Image":
extract_rosbag_to_json.im_topic.append(conn.topic)
elif conn.msgtype == "sensor_msgs/msg/CompressedImage":
extract_rosbag_to_json.cim_topic.append(conn.topic)
# all camera topics
camera_topics = extract_rosbag_to_json.im_topic + extract_rosbag_to_json.cim_topic
# find only camera topics with data
active_topics = {}
for conn, _, _ in reader.messages():
if conn.topic in camera_topics:
active_topics[conn.topic] = None
camera_topics = [t for t in active_topics.keys()]
extract_rosbag_to_json.joint_topic = next(
(conn.topic for conn in reader.connections if conn.msgtype == "sensor_msgs/msg/JointState"), None
)
if not extract_rosbag_to_json.joint_topic:
raise ValueError("JointState topic not found in bag")
# === Структура JSON ===
extract_rosbag_to_json.output = {
"cameras": camera_topics,
"image_shape": [],
"robots": [extract_rosbag_to_json.joint_topic],
"num_episodes": 0,
"episodes": []
}
extract_rosbag_to_json.camera_topics = camera_topics
def extract_rosbag_to_json(bag_path, output_json, output_image_dir) -> Path:
print("Starting the export procedure..")
bag_path = Path(bag_path)
output_json = Path(output_json)
extract_rosbag_to_json.image_dir = Path(output_image_dir)
extract_rosbag_to_json.image_dir.mkdir(exist_ok=True, parents=True)
extract_rosbag_to_json.bridge = CvBridge()
extract_rosbag_to_json.typestore = get_typestore(Stores.ROS2_JAZZY)
extract_rosbag_to_json.output = {}
extract_rosbag_to_json.im_topic = []
extract_rosbag_to_json.cim_topic = []
extract_rosbag_to_json.camera_topics = []
extract_rosbag_to_json.joint_topic = []
extract_rosbag_to_json.image_shape = None
list_bags = find_folders_with_db3_files(bag_path)
common_part_json(list_bags[0])
add_episode.counter = 0
for bag in list_bags:
add_episode(bag)
print(f"episode {add_episode.counter}: ok")
extract_rosbag_to_json.output["image_shape"] = extract_rosbag_to_json.image_shape
extract_rosbag_to_json.output["num_episodes"] = len(list_bags) #add_episode.counter
with open(output_json, "w") as f:
json.dump(extract_rosbag_to_json.output, f, indent=2)
print(f"✅ Export complete!\nJSON saved to: {output_json.resolve()}\nImages saved to: {extract_rosbag_to_json.image_dir.resolve()}")
# === СОХРАНЕНИЕ СИНХРОНИЗИРОВАННОГО ВАРИАНТА ===
fps = 0.0
synced_output = {
"bags": str(bag_path.resolve()),
"estimated_fps": fps,
"cameras": extract_rosbag_to_json.camera_topics,
"image_shape": extract_rosbag_to_json.image_shape,
"robots": [extract_rosbag_to_json.joint_topic],
"num_episodes": len(list_bags),
"episodes": []
}
rgb_topic = extract_rosbag_to_json.camera_topics[0]
for episode in extract_rosbag_to_json.output["episodes"]:
e_sync = {
"bag": episode["bag"],
"eidx": episode["eidx"],
"num_frames": 0,
"frames": []
}
rgb_msgs = [m for m in episode["frames"] if rgb_topic in m]
joint_msgs = [m for m in episode["joint_states"]]
num_frames = 0
for im in rgb_msgs:
# Находим ближайшее joint_state сообщение по timestamp
closest = min(joint_msgs, key=lambda j: abs(j["timestamp"] - im["timestamp"]))
# Добавляем все изображения (включая rgb_topic и другие камеры)
cam_images = {}
for cam_topic in extract_rosbag_to_json.camera_topics:
# ищем изображение от этой камеры с таким же timestamp
candidates = [m for m in episode["frames"] if cam_topic in m]
match = min(candidates, key=lambda m: abs(m["timestamp"] - im["timestamp"])) if candidates else None
if match:
cam_key = cam_topic
cam_images[cam_key] = match[cam_topic]
e_sync["frames"].append({
"timestamp": im["timestamp"],
"idx": num_frames,
"joint_state": closest["joint_state"],
**cam_images # добавляем все камеры
})
num_frames += 1
e_sync["num_frames"] = num_frames
if num_frames >= 2 and fps <= 0.0:
t0 = e_sync["frames"][0]["timestamp"]
t1 = e_sync["frames"][-1]["timestamp"]
duration_sec = (t1 - t0) / 1e9 # наносекунды → секунды
fps = (num_frames - 1) / duration_sec if duration_sec > 0 else 0.0
synced_output["estimated_fps"] = fps
print(f"📈 Estimated FPS (synced): {fps:.1f}")
synced_output["episodes"].append(e_sync)
synced_json_path = output_json.with_name(output_json.stem + "_synced.json")
with open(synced_json_path, "w") as f:
json.dump(synced_output, f, indent=2)
print(f"🧩 Synced JSON saved to: {synced_json_path.resolve()}")
return synced_json_path
def main():
parser = argparse.ArgumentParser(description="Convert ROS2 bag to JSON + image folder (LeRobot format)")
parser.add_argument("bag", help="Path to folder with ROS2 bag episode files")
parser.add_argument("--json", default="ros2bag_msg.json", help="Path to output JSON file")
parser.add_argument("--images", default="frames", help="Directory to store extracted images")
args = parser.parse_args()
if not os.path.isdir(args.bag):
print(f"[!] {args.bag} is not a folder")
sys.exit()
out_json = extract_rosbag_to_json(args.bag, args.json, args.images)
to_lerobot_dataset(out_json)
end_time = time.time() # время окончания
execution_time = end_time - start_time
print(f"*****************\nВремя выполнения: {convert_seconds(execution_time)}\n")
if __name__ == "__main__":
main()