From 64b80be028d53b8d058d487eb415537c43011ac6 Mon Sep 17 00:00:00 2001 From: IDONTSUDO Date: Wed, 19 Jul 2023 18:50:01 +0300 Subject: [PATCH] multiple evaluation of assemblies by sequences --- stability_process_predicate/main.py | 10 +- .../usecases/stability_check_usecase.py | 213 ++++++++++++++---- 2 files changed, 175 insertions(+), 48 deletions(-) diff --git a/stability_process_predicate/main.py b/stability_process_predicate/main.py index e1c6182..06028a9 100644 --- a/stability_process_predicate/main.py +++ b/stability_process_predicate/main.py @@ -1,16 +1,12 @@ -# Алгоритм для вычисления стабильности при помощи PyBullet import argparse from usecases.stability_check_usecase import StabilityCheckUseCase - -# python3 main.py --aspPath /home/idontsudo/t/framework/asp/out/sdf-generation --buildNumber 3 + +# python3 main.py --aspPath /Users/idontsudo/framework/asp/out/ def main(): parser = argparse.ArgumentParser() parser.add_argument('--aspPath', help='asp folder generation path') - parser.add_argument('--buildNumber', help='FreeCad generation buildNumber') args = parser.parse_args() - StabilityCheckUseCase().call(args.aspPath, args.buildNumber) - - + StabilityCheckUseCase().call(args.aspPath) main() diff --git a/stability_process_predicate/usecases/stability_check_usecase.py b/stability_process_predicate/usecases/stability_check_usecase.py index 561f885..86d6004 100644 --- a/stability_process_predicate/usecases/stability_check_usecase.py +++ b/stability_process_predicate/usecases/stability_check_usecase.py @@ -1,3 +1,4 @@ +from typing import Any, List, TypeVar, Type, cast, Callable import numpy as np import pybullet as p import time @@ -5,59 +6,189 @@ import pybullet_data import os import json +from time import sleep + + +T = TypeVar("T") + + +def from_str(x): + assert isinstance(x, str) + return x + + +def from_float(x: Any) -> float: + assert isinstance(x, (float, int)) and not isinstance(x, bool) + return float(x) + + +def to_float(x: Any) -> float: + assert isinstance(x, float) + return x + + +def from_int(x: Any) -> int: + assert isinstance(x, int) and not isinstance(x, bool) + return x + + +def to_class(c: Type[T], x: Any) -> dict: + assert isinstance(x, c) + return cast(Any, x).to_dict() + + +def from_list(f: Callable[[Any], T], x: Any) -> List[T]: + assert isinstance(x, list) + return [f(y) for y in x] + + +class Coords: + x: float + y: float + z: float + + def __init__(self, x: float, y: float, z: float) -> None: + self.x = x + self.y = y + self.z = z + + @staticmethod + def from_dict(obj: Any) -> 'Coords': + assert isinstance(obj, dict) + x = from_float(obj.get("x")) + y = from_float(obj.get("y")) + z = from_float(obj.get("z")) + return Coords(x, y, z) + + def to_dict(self) -> dict: + result: dict = {} + result["x"] = to_float(self.x) + result["y"] = to_float(self.y) + result["z"] = to_float(self.z) + return result + + +class SimulatorStabilityResultModel: + id: str + quaternion: Coords + position: Coords + + def __init__(self, id: int, quaternion: Coords, position: Coords) -> None: + self.id = id + self.quaternion = quaternion + self.position = position + + @staticmethod + def from_dict(obj: Any) -> 'SimulatorStabilityResultModel': + assert isinstance(obj, dict) + id = from_str(obj.get("id")) + quaternion = Coords.from_dict(obj.get("quaternion")) + position = Coords.from_dict(obj.get("position")) + return SimulatorStabilityResultModel(id, quaternion, position) + + def to_dict(self) -> dict: + result: dict = {} + result["id"] = from_str(self.id) + result["quaternion"] = to_class(Coords, self.quaternion) + result["position"] = to_class(Coords, self.position) + return result + + +def SimulatorStabilityModelfromdict(s: Any) -> List[SimulatorStabilityResultModel]: + return from_list(SimulatorStabilityResultModel.from_dict, s) + + +def SimulatorStabilityModeltodict(x: List[SimulatorStabilityResultModel]) -> Any: + return from_list(lambda x: to_class(SimulatorStabilityResultModel, x), x) + class StabilityCheckUseCase: - def call(self, outPath: str, buildNumber: int, duration=500): - DURATION = duration - try: - # Получение сгенерированного URDF для сборки - assemblyUrdf = json.loads( - (open(outPath + 'urdf-generation.json')).read()).get(buildNumber) - except: - return TypeError('not found urfd file or not found build number') - inc = 0 + def urdfLoader(self, assembly: list[str], outPath: str, urdfGeneration: dict[str:str]): + urdfs = [] + for assemblyCount in range(len(assembly)): - for el in assemblyUrdf: - inc += 1 - file_to_open = outPath + str(inc) + '.urdf' - + urdf = urdfGeneration.get(assembly[assemblyCount]) + file_to_open = outPath + '/sdf-generation/' + \ + str(assemblyCount) + '.urdf' f = open(file_to_open, 'w', encoding='utf-8', errors='ignore') - f.write(el) - urdfs.append(os.path.abspath(f.name)) + f.write(urdf) f.close() + urdfs.append(os.path.abspath(f.name)) + + return urdfs + + def executeSimulation(self, assembly: list[str], outPath: str, urdfGeneration: dict[str:str], duration: int) -> list['SimulatorStabilityResultModel']: p.connect(p.DIRECT) - p.setGravity(0, 0, -10) p.setAdditionalSearchPath(pybullet_data.getDataPath()) - bulletIds = [] - for el in urdfs: - bulletIds.append(p.loadURDF(el)) p.loadURDF("plane.urdf") resultCoords = [] - for i in range(DURATION): - if (i + 200 == DURATION): - inc = 0 - for el in bulletIds: - inc += 1 - # Получение расположения деталей - pos, rot = p.getBasePositionAndOrientation(el) - resultCoords.append({ - 'id': inc, - "quaternion": rot, - "position": pos - }) - p.stepSimulation() - time.sleep(1./240.) - file_to_open = outPath + buildNumber + "_" + 'stability_coords.json' - - f = open(file_to_open, 'w', encoding='utf-8',errors='ignore') - # Запись результатов тестирования - f.write(json.dumps(resultCoords)) + urdfs = self.urdfLoader(assembly=assembly, + urdfGeneration=urdfGeneration, outPath=outPath) + bulletIds = [] for el in urdfs: - os.remove(el) - f.close() - p.disconnect() + id = p.loadURDF(el) + bulletIds.append(id) + + for i in range(duration): + if (i + 200 == duration): + inc = 0 + for bulletUUID in bulletIds: + pos, rot = p.getBasePositionAndOrientation(bulletUUID) + resultCoords.append(SimulatorStabilityResultModel(id=assembly[inc], quaternion=Coords( + x=rot[0], y=rot[1], z=rot[2]), position=Coords(x=pos[0], y=pos[1], z=pos[2]))) + p.removeBody(bulletUUID) + inc += 1 + + p.stepSimulation() + + time.sleep(1./240.) + return resultCoords + + def call(self, aspPath: str): + try: + assemblyFolder = aspPath + assemblesStructures = json.loads( + (open(assemblyFolder + 'sequences.json')).read()).get('sequences') + tasks = len(assemblesStructures) * len(assemblesStructures[0]) + taskCounter = 0 + urdfGeneration = json.loads( + (open(assemblyFolder + 'sdf-generation/urdf-generation.json')).read()) + for activeAssemblyNumber in range(len(assemblesStructures)): + pathSaveResultAssemblyFolder = aspPath + 'stability' + \ + '/' + str(activeAssemblyNumber + 1) + '/' + if not os.path.exists(pathSaveResultAssemblyFolder): + os.makedirs(pathSaveResultAssemblyFolder) + + for subAssemblyNumber in range(len(assemblesStructures[activeAssemblyNumber])): + taskCounter += 1 + subAssembly = assemblesStructures[activeAssemblyNumber][0:subAssemblyNumber+1] + asm = [] + for el in subAssembly: + asm.append(el) + + resultSimulationStates = self.executeSimulation( + assembly=asm, outPath=aspPath, urdfGeneration=urdfGeneration, duration=1000) + + pathSaveResultSubAssemblyFolder = aspPath + 'stability' + '/' + \ + str(activeAssemblyNumber + 1) + \ + '/' + str(subAssemblyNumber) + '/' + if not os.path.exists(pathSaveResultSubAssemblyFolder): + os.makedirs(pathSaveResultSubAssemblyFolder) + results = {} + for state in resultSimulationStates: + results[state.id] = state.to_dict() + f = open(pathSaveResultSubAssemblyFolder + '/' + + 'motion_result.json', 'w', encoding='utf-8', errors='ignore') + f.write(json.dumps(results, + ensure_ascii=False, indent=4)) + f.close() + percentageOfCompletion = taskCounter / tasks * 100 + print('process complete: ' + + str(percentageOfCompletion) + '%') + except Exception as e: + print(e)