framework/stability_process_predicate/usecases/stability_check_usecase.py
2023-09-12 19:09:33 +00:00

192 lines
6.5 KiB
Python

from typing import Any, List, TypeVar, Type, cast, Callable
import numpy as np
import pybullet as p
import time
import pybullet_data
import os
import json
from time import sleep
T = TypeVar("T")
def from_str(x):
assert isinstance(x, str)
return x
def from_float(x: Any) -> float:
assert isinstance(x, (float, int)) and not isinstance(x, bool)
return float(x)
def to_float(x: Any) -> float:
assert isinstance(x, float)
return x
def from_int(x: Any) -> int:
assert isinstance(x, int) and not isinstance(x, bool)
return x
def to_class(c: Type[T], x: Any) -> dict:
assert isinstance(x, c)
return cast(Any, x).to_dict()
def from_list(f: Callable[[Any], T], x: Any) -> List[T]:
assert isinstance(x, list)
return [f(y) for y in x]
class Coords:
x: float
y: float
z: float
def __init__(self, x: float, y: float, z: float) -> None:
self.x = x
self.y = y
self.z = z
@staticmethod
def from_dict(obj: Any) -> 'Coords':
assert isinstance(obj, dict)
x = from_float(obj.get("x"))
y = from_float(obj.get("y"))
z = from_float(obj.get("z"))
return Coords(x, y, z)
def to_dict(self) -> dict:
result: dict = {}
result["x"] = to_float(self.x)
result["y"] = to_float(self.y)
result["z"] = to_float(self.z)
return result
class SimulatorStabilityResultModel:
id: str
quaternion: Coords
position: Coords
def __init__(self, id: int, quaternion: Coords, position: Coords) -> None:
self.id = id
self.quaternion = quaternion
self.position = position
@staticmethod
def from_dict(obj: Any) -> 'SimulatorStabilityResultModel':
assert isinstance(obj, dict)
id = from_str(obj.get("id"))
quaternion = Coords.from_dict(obj.get("quaternion"))
position = Coords.from_dict(obj.get("position"))
return SimulatorStabilityResultModel(id, quaternion, position)
def to_dict(self) -> dict:
result: dict = {}
result["id"] = from_str(self.id)
result["quaternion"] = to_class(Coords, self.quaternion)
result["position"] = to_class(Coords, self.position)
return result
def SimulatorStabilityModelfromdict(s: Any) -> List[SimulatorStabilityResultModel]:
return from_list(SimulatorStabilityResultModel.from_dict, s)
def SimulatorStabilityModeltodict(x: List[SimulatorStabilityResultModel]) -> Any:
return from_list(lambda x: to_class(SimulatorStabilityResultModel, x), x)
class StabilityCheckUseCase:
def urdfLoader(self, assembly: list[str], outPath: str, urdfGeneration: dict[str:str]):
urdfs = []
for assemblyCount in range(len(assembly)):
urdf = urdfGeneration.get(assembly[assemblyCount])
file_to_open = outPath + '/sdf-generation/' + \
str(assemblyCount) + '.urdf'
f = open(file_to_open, 'w', encoding='utf-8',
errors='ignore')
f.write(urdf)
f.close()
urdfs.append(os.path.abspath(f.name))
return urdfs
def executeSimulation(self, assembly: list[str], outPath: str, urdfGeneration: dict[str:str], duration: int) -> list['SimulatorStabilityResultModel']:
p.connect(p.DIRECT)
p.setGravity(0, 0, -10)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.loadURDF("plane.urdf")
resultCoords = []
urdfs = self.urdfLoader(assembly=assembly,
urdfGeneration=urdfGeneration, outPath=outPath)
bulletIds = []
for el in urdfs:
id = p.loadURDF(el)
bulletIds.append(id)
for i in range(duration):
if (i + 200 == duration):
inc = 0
for bulletUUID in bulletIds:
pos, rot = p.getBasePositionAndOrientation(bulletUUID)
resultCoords.append(SimulatorStabilityResultModel(id=assembly[inc], quaternion=Coords(
x=rot[0], y=rot[1], z=rot[2]), position=Coords(x=pos[0], y=pos[1], z=pos[2])))
p.removeBody(bulletUUID)
inc += 1
p.stepSimulation()
time.sleep(1./240.)
return resultCoords
def call(self, aspPath: str):
try:
assemblyFolder = aspPath
assemblesStructures = json.loads(
(open(assemblyFolder + 'sequences.json')).read()).get('sequences')
tasks = len(assemblesStructures) * len(assemblesStructures[0])
taskCounter = 0
urdfGeneration = json.loads(
(open(assemblyFolder + 'sdf-generation/urdf-generation.json')).read())
for activeAssemblyNumber in range(len(assemblesStructures)):
pathSaveResultAssemblyFolder = aspPath + 'stability' + \
'/' + str(activeAssemblyNumber + 1) + '/'
if not os.path.exists(pathSaveResultAssemblyFolder):
os.makedirs(pathSaveResultAssemblyFolder)
for subAssemblyNumber in range(len(assemblesStructures[activeAssemblyNumber])):
taskCounter += 1
subAssembly = assemblesStructures[activeAssemblyNumber][0:subAssemblyNumber+1]
asm = []
for el in subAssembly:
asm.append(el)
resultSimulationStates = self.executeSimulation(
assembly=asm, outPath=aspPath, urdfGeneration=urdfGeneration, duration=1000)
pathSaveResultSubAssemblyFolder = aspPath + 'stability' + '/' + \
str(activeAssemblyNumber + 1) + \
'/' + str(subAssemblyNumber) + '/'
if not os.path.exists(pathSaveResultSubAssemblyFolder):
os.makedirs(pathSaveResultSubAssemblyFolder)
results = {}
for state in resultSimulationStates:
results[state.id] = state.to_dict()
f = open(pathSaveResultSubAssemblyFolder + '/' +
'motion_result.json', 'w', encoding='utf-8', errors='ignore')
f.write(json.dumps(results,
ensure_ascii=False, indent=4))
f.close()
percentageOfCompletion = taskCounter / tasks * 100
print('process complete: ' +
str(percentageOfCompletion) + '%')
except Exception as e:
print(e)