from typing import Any, List, TypeVar, Type, cast, Callable import numpy as np import pybullet as p import time import pybullet_data import os import json from time import sleep T = TypeVar("T") def from_str(x): assert isinstance(x, str) return x def from_float(x: Any) -> float: assert isinstance(x, (float, int)) and not isinstance(x, bool) return float(x) def to_float(x: Any) -> float: assert isinstance(x, float) return x def from_int(x: Any) -> int: assert isinstance(x, int) and not isinstance(x, bool) return x def to_class(c: Type[T], x: Any) -> dict: assert isinstance(x, c) return cast(Any, x).to_dict() def from_list(f: Callable[[Any], T], x: Any) -> List[T]: assert isinstance(x, list) return [f(y) for y in x] class Coords: x: float y: float z: float def __init__(self, x: float, y: float, z: float) -> None: self.x = x self.y = y self.z = z @staticmethod def from_dict(obj: Any) -> 'Coords': assert isinstance(obj, dict) x = from_float(obj.get("x")) y = from_float(obj.get("y")) z = from_float(obj.get("z")) return Coords(x, y, z) def to_dict(self) -> dict: result: dict = {} result["x"] = to_float(self.x) result["y"] = to_float(self.y) result["z"] = to_float(self.z) return result class SimulatorStabilityResultModel: id: str quaternion: Coords position: Coords def __init__(self, id: int, quaternion: Coords, position: Coords) -> None: self.id = id self.quaternion = quaternion self.position = position @staticmethod def from_dict(obj: Any) -> 'SimulatorStabilityResultModel': assert isinstance(obj, dict) id = from_str(obj.get("id")) quaternion = Coords.from_dict(obj.get("quaternion")) position = Coords.from_dict(obj.get("position")) return SimulatorStabilityResultModel(id, quaternion, position) def to_dict(self) -> dict: result: dict = {} result["id"] = from_str(self.id) result["quaternion"] = to_class(Coords, self.quaternion) result["position"] = to_class(Coords, self.position) return result def SimulatorStabilityModelfromdict(s: Any) -> List[SimulatorStabilityResultModel]: return from_list(SimulatorStabilityResultModel.from_dict, s) def SimulatorStabilityModeltodict(x: List[SimulatorStabilityResultModel]) -> Any: return from_list(lambda x: to_class(SimulatorStabilityResultModel, x), x) class StabilityCheckUseCase: def urdfLoader(self, assembly: list[str], outPath: str, urdfGeneration: dict[str:str]): urdfs = [] for assemblyCount in range(len(assembly)): urdf = urdfGeneration.get(assembly[assemblyCount]) file_to_open = outPath + '/sdf-generation/' + \ str(assemblyCount) + '.urdf' f = open(file_to_open, 'w', encoding='utf-8', errors='ignore') f.write(urdf) f.close() urdfs.append(os.path.abspath(f.name)) return urdfs def executeSimulation(self, assembly: list[str], outPath: str, urdfGeneration: dict[str:str], duration: int) -> list['SimulatorStabilityResultModel']: p.connect(p.DIRECT) p.setGravity(0, 0, -10) p.setAdditionalSearchPath(pybullet_data.getDataPath()) p.loadURDF("plane.urdf") resultCoords = [] urdfs = self.urdfLoader(assembly=assembly, urdfGeneration=urdfGeneration, outPath=outPath) bulletIds = [] for el in urdfs: id = p.loadURDF(el) bulletIds.append(id) for i in range(duration): if (i + 200 == duration): inc = 0 for bulletUUID in bulletIds: pos, rot = p.getBasePositionAndOrientation(bulletUUID) resultCoords.append(SimulatorStabilityResultModel(id=assembly[inc], quaternion=Coords( x=rot[0], y=rot[1], z=rot[2]), position=Coords(x=pos[0], y=pos[1], z=pos[2]))) p.removeBody(bulletUUID) inc += 1 p.stepSimulation() time.sleep(1./240.) return resultCoords def call(self, aspPath: str): try: assemblyFolder = aspPath assemblesStructures = json.loads( (open(assemblyFolder + 'sequences.json')).read()).get('sequences') tasks = len(assemblesStructures) * len(assemblesStructures[0]) taskCounter = 0 urdfGeneration = json.loads( (open(assemblyFolder + 'sdf-generation/urdf-generation.json')).read()) for activeAssemblyNumber in range(len(assemblesStructures)): pathSaveResultAssemblyFolder = aspPath + 'stability' + \ '/' + str(activeAssemblyNumber + 1) + '/' if not os.path.exists(pathSaveResultAssemblyFolder): os.makedirs(pathSaveResultAssemblyFolder) for subAssemblyNumber in range(len(assemblesStructures[activeAssemblyNumber])): taskCounter += 1 subAssembly = assemblesStructures[activeAssemblyNumber][0:subAssemblyNumber+1] asm = [] for el in subAssembly: asm.append(el) resultSimulationStates = self.executeSimulation( assembly=asm, outPath=aspPath, urdfGeneration=urdfGeneration, duration=1000) pathSaveResultSubAssemblyFolder = aspPath + 'stability' + '/' + \ str(activeAssemblyNumber + 1) + \ '/' + str(subAssemblyNumber) + '/' if not os.path.exists(pathSaveResultSubAssemblyFolder): os.makedirs(pathSaveResultSubAssemblyFolder) results = {} for state in resultSimulationStates: results[state.id] = state.to_dict() f = open(pathSaveResultSubAssemblyFolder + '/' + 'motion_result.json', 'w', encoding='utf-8', errors='ignore') f.write(json.dumps(results, ensure_ascii=False, indent=4)) f.close() percentageOfCompletion = taskCounter / tasks * 100 print('process complete: ' + str(percentageOfCompletion) + '%') except Exception as e: print(e)