FreeCAD: Workbench Refactor

This commit is contained in:
Mark Voltov 2024-04-14 18:54:47 +00:00 committed by Igor Brylyov
parent 037827669a
commit a58dcdafb1
386 changed files with 997 additions and 64533 deletions

1
simulation/asp/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
out

View file

@ -0,0 +1,37 @@
import os
import json
import typing
class FS:
def readJSON(path: str):
return json.loads((open(path)).read())
def writeFile(data, filePath, fileName):
file_to_open = filePath + fileName
f = open(file_to_open, "w", encoding="utf-8", errors="ignore")
f.write(data)
f.close()
def readFile(path: str):
return open(path).read()
def readFilesTypeFolder(pathFolder: str, fileType=".json"):
filesJson = list(
filter(
lambda x: x[-fileType.__len__() :] == fileType, os.listdir(pathFolder)
)
)
return filesJson
def listGetFirstValue(iterable, default=False, pred=None):
return next(filter(pred, iterable), default)
def filterModels(filterModels, filterModelsDescription: list[str]):
models = []
for el in filterModelsDescription:
models.append(listGetFirstValue(filterModels, None, lambda x: x.name == el))
return models

View file

@ -0,0 +1,877 @@
"""
Format and compress XML documents
"""
import getopt
import re
import sys
import xml.parsers.expat
__version__ = "0.2.4"
DEFAULT_BLANKS = False
DEFAULT_COMPRESS = False
DEFAULT_SELFCLOSE = False
DEFAULT_CORRECT = True
DEFAULT_INDENT = 2
DEFAULT_INDENT_CHAR = " "
DEFAULT_INLINE = True
DEFAULT_ENCODING_INPUT = None
DEFAULT_ENCODING_OUTPUT = None
DEFAULT_EOF_NEWLINE = False
class Formatter:
# Use internal encoding:
encoding_internal = None
def __init__(
self,
indent=DEFAULT_INDENT,
preserve=[],
blanks=DEFAULT_BLANKS,
compress=DEFAULT_COMPRESS,
selfclose=DEFAULT_SELFCLOSE,
indent_char=DEFAULT_INDENT_CHAR,
encoding_input=DEFAULT_ENCODING_INPUT,
encoding_output=DEFAULT_ENCODING_OUTPUT,
inline=DEFAULT_INLINE,
correct=DEFAULT_CORRECT,
eof_newline=DEFAULT_EOF_NEWLINE,
):
# Minify the XML document:
self.compress = compress
# Use self-closing tags
self.selfclose = selfclose
# Correct text nodes
self.correct = correct
# Decode the XML document:
self.encoding_input = self.enc_normalize(encoding_input)
# Encode ouput by:
self.encoding_output = self.enc_normalize(encoding_output)
# Insert indent = indent*level*indent_char:
self.indent = int(indent)
# Indent by char:
self.indent_char = indent_char
# Format inline objects:
self.inline = inline
# Don't compress this elements and their descendants:
self.preserve = preserve
# Preserve blanks lines (collapse multiple into one)
self.blanks = blanks
# Always add a newline character at EOF
self.eof_newline = eof_newline
@property
def encoding_effective(self, enc=None):
if self.encoding_output:
return self.encoding_output
elif self.encoding_internal:
return self.encoding_internal
elif self.encoding_input:
return self.encoding_input
else:
return "UTF-8"
def enc_normalize(self, string):
""" Format an Encoding identifier to upper case. """
if isinstance(string, str):
return string.upper()
return None
def enc_encode(self, strg):
""" Encode a formatted XML document in target"""
if sys.version_info > (3, 0):
return strg.encode(self.encoding_effective) # v3
return strg.decode("utf-8").encode(self.encoding_effective) # v2
def enc_output(self, path, strg):
""" Output according to encoding """
fh = sys.stdout
if strg is not None:
if path is not None:
open(path, "w+b").write(strg)
elif sys.version_info > (3, 0):
fh.buffer.write(strg)
else:
fh.write(strg)
def format_string(self, xmldoc=""):
""" Format a XML document given by xmldoc """
token_list = Formatter.TokenList(self)
token_list.parser.Parse(xmldoc)
return self.enc_encode(str(token_list))
def format_file(self, file):
""" Format a XML document given by path name """
fh = open(file, "rb")
token_list = Formatter.TokenList(self)
token_list.parser.ParseFile(fh)
fh.close()
return self.enc_encode(str(token_list))
class TokenList:
# Being in a cdata section:
cdata_section = False
# Lock deletion of leading whitespace:
desc_mixed_level = None
# Lock indenting:
indent_level = None
# Reference the Formatter:
formatter = None
# Count levels:
level_counter = 0
# Lock deletion of whitespaces:
preserve_level = None
def __init__(self, formatter):
# Keep tokens in a list:
self._list = []
self.formatter = formatter
self.parser = xml.parsers.expat.ParserCreate(
encoding=self.formatter.encoding_input
)
self.parser.specified_attributes = 1
self.parser.buffer_text = True
# Push tokens to buffer:
for pattern in [
"XmlDecl%s",
"ElementDecl%s",
"AttlistDecl%s",
"EntityDecl%s",
"StartElement%s",
"EndElement%s",
"ProcessingInstruction%s",
"CharacterData%s",
"Comment%s",
"Default%s",
"StartDoctypeDecl%s",
"EndDoctypeDecl%s",
"StartCdataSection%s",
"EndCdataSection%s",
"NotationDecl%s",
]:
setattr(
self.parser, pattern % "Handler", self.xml_handler(pattern % "")
)
def __iter__(self):
return iter(self._list)
def __len__(self):
return len(self._list)
def __getitem__(self, pos):
if 0 <= pos < len(self._list):
return self._list[pos]
else:
raise IndexError
def __setitem__(self, pos, value):
if 0 <= pos < len(self._list):
self._list[pos] = value
else:
raise IndexError
def __str__(self):
""" Returns the formatted XML document in UTF-8. """
for step in ["configure", "pre_operate", "post_operate"]:
for tk in iter(self):
getattr(tk, step)()
result = ""
for tk in iter(self):
result += str(tk)
if self.formatter.eof_newline and not result.endswith("\n"):
result += "\n"
return result
def append(self, tk):
""" Add token to tokenlist. """
tk.pos = len(self._list)
self._list.append(tk)
def level_increment(self):
""" Increment level counter. """
self.level_counter += 1
def level_decrement(self):
""" Decrement level counter. """
self.level_counter -= 1
def token_descendant_mixed(self, tk):
""" Mark descendants of mixed content. """
if tk.name == "StartElement":
# Mark every descendant:
if tk.content_model in [2, 3] and self.desc_mixed_level is None:
self.desc_mixed_level = tk.level
return False
return self.desc_mixed_level is not None
elif tk.name == "EndElement":
# Stop marking every descendant:
if tk.level is self.desc_mixed_level:
self.desc_mixed_level = None
elif self.desc_mixed_level is not None:
return True
return False
elif self.desc_mixed_level is None:
return False
return self.desc_mixed_level >= tk.level - 1
def sequence(self, tk, scheme=None):
"""Returns sublist of token list.
None: next to last
EndElement: first to previous"""
if scheme == "EndElement" or (scheme is None and tk.end):
return reversed(self._list[: tk.pos])
return self._list[(tk.pos + 1) :]
def token_indent(self, tk):
if self.formatter.inline:
return self.token_indent_inline(tk)
""" Indent outside of text of mixed content. """
if tk.name == "StartElement":
# Block indenting for descendants of text and mixed content:
if tk.content_model in [2, 3] and self.indent_level is None:
self.indent_level = tk.level
elif self.indent_level is not None:
return False
return True
elif tk.name == "EndElement":
# Unblock indenting for descendants of text and mixed content:
if tk.level == self.indent_level:
self.indent_level = None
elif self.indent_level is None:
return True
return False
return self.indent_level is None
def token_indent_inline(self, tk):
""" Indent every element content - no matter enclosed by text or mixed content. """
for itk in iter(self.sequence(tk, "EndElement")):
if itk.level < tk.level and itk.name == "StartElement":
if itk.content_model == 1:
return True
return False
if (
itk.level == tk.level
and tk.name == "EndElement"
and itk.name == "StartElement"
):
if itk.content_model == 1:
return True
return False
return True
def token_model(self, tk):
"""Returns code for content model.
0: empty
1: element
2: text
3: mixed"""
eflag = tflag = 0
for itk in iter(self.sequence(tk)):
# Element boundary found:
if itk.level <= tk.level:
break
# Direct child found:
elif (itk.level - 1) == tk.level:
if itk.start:
eflag = 1
elif itk.not_empty:
tflag = 2
return eflag + tflag
def token_preserve(self, tk):
"""Preseve eyery descendant of an preserved element.
0: not locked
1: just (un)locked
2: locked"""
# Lock perserving for StartElements:
if tk.name == "StartElement":
if self.preserve_level is not None:
return 2
if tk.arg[0] in self.formatter.preserve:
self.preserve_level = tk.level
return 1
return 0
# Unlock preserving for EndElements:
elif tk.name == "EndElement":
if (
tk.arg[0] in self.formatter.preserve
and tk.level == self.preserve_level
):
self.preserve_level = None
return 1
elif self.preserve_level is None:
return 0
return 2
return self.preserve_level is not None
def whitespace_append_trailing(self, tk):
""" Add a trailing whitespace to previous character data. """
if self.formatter.correct and tk.leading and tk.not_empty:
self.whitespace_append(tk, "EndElement", "StartElement", True)
def whitespace_append_leading(self, tk):
""" Add a leading whitespace to previous character data. """
if self.formatter.correct and tk.trailing and tk.not_empty:
self.whitespace_append(tk)
def whitespace_append(
self, tk, start="StartElement", stop="EndElement", direct=False
):
""" Add a whitspace to token list. """
for itk in self.sequence(tk, start):
if (
itk.empty
or (itk.name == stop and itk.descendant_mixed is False)
or (itk.name == start and abs(tk - itk) == 1)
):
break
elif itk.not_empty or (itk.name == start and itk.descendant_mixed):
self.insert_empty(itk, direct)
break
def whitespace_delete_leading(self, tk):
""" Returns True, if no next token or all empty (up to next end element)"""
if (
self.formatter.correct
and tk.leading
and not tk.preserve
and not tk.cdata_section
):
for itk in self.sequence(tk, "EndElement"):
if itk.trailing:
return True
elif itk.name in ["EndElement", "CharacterData", "EndCdataSection"]:
return False
return True
return False
def whitespace_delete_trailing(self, tk):
"""Returns True, if no next token or all empty (up to next end element)"""
if (
self.formatter.correct
and tk.trailing
and not tk.preserve
and not tk.cdata_section
):
for itk in self.sequence(tk, "StartElement"):
if itk.end:
return True
elif (
itk.name in ["StartElement", "StartCdataSection"]
or itk.not_empty
):
return False
return True
return False
def insert_empty(self, tk, before=True):
""" Insert an Empty Token into token list - before or after tk. """
if not (0 < tk.pos < (len(self) - 1)):
return False
ptk = self[tk.pos - 1]
ntk = self.formatter.CharacterData(self, [" "])
ntk.level = max(ptk.level, tk.level)
ntk.descendant_mixed = tk.descendant_mixed
ntk.preserve = ptk.preserve * tk.preserve
ntk.cdata_section = ptk.cdata_section or tk.cdata_section
if before:
self._list.insert(tk.pos + 1, ntk)
else:
self._list.insert(tk.pos, ntk)
for i in range((tk.pos - 1), len(self._list)):
self._list[i].pos = i
def xml_handler(self, key):
""" Returns lambda function which adds token to token list"""
return lambda *arg: self.append(getattr(self.formatter, key)(self, arg))
class Token(object):
def __init__(self, tklist, arg):
# Reference Token List:
self.list = tklist
# Token datas:
self.arg = list(arg)
# Token is placed in an CDATA section:
self.cdata_section = False
# Token has content model:
self.content_model = None
# Remove trailing wihtespaces:
self.delete_trailing = False
# Remove leading whitespaces:
self.delete_leading = False
# Token is descendant of text or mixed content element:
self.descendant_mixed = False
# Reference to formatter:
self.formatter = tklist.formatter
# Insert indenting white spaces:
self.indent = False
# N-th generation of roots descendants:
self.level = self.list.level_counter
# Token class:
self.name = self.__class__.__name__
# Preserve white spaces within enclosed tokens:
self.preserve = False
# Position in token list:
self.pos = None
def __sub__(self, other):
return self.pos - other.pos
def __unicode__(self):
return ""
# Workaround, see http://lucumr.pocoo.org/2011/1/22/forwards-compatible-python/:
if sys.version_info > (3, 0):
__str__ = lambda x: x.__unicode__()
else:
__str__ = lambda x: unicode(x).encode("utf-8")
@property
def end(self):
return self.name == "EndElement"
@property
def empty(self):
return self.name == "CharacterData" and re.match(
r"^[\t\s\n]*$", self.arg[0]
)
@property
def leading(self):
return self.name == "CharacterData" and re.search(
r"^[\t\s\n]+", self.arg[0]
)
@property
def not_empty(self):
return (
self.name == "CharacterData"
and not self.cdata_section
and not re.match(r"^[\t\s\n]+$", self.arg[0])
)
@property
def trailing(self):
return self.name == "CharacterData" and re.search(
r"[\t\s\n]+$", self.arg[0]
)
@property
def start(self):
return self.name == "StartElement"
@property
def correct(self):
return self.formatter.correct
def attribute(self, key, value):
if key and value:
return ' %s="%s"' % (key, value)
elif key:
return ' %s=""' % (key)
return ""
def indent_insert(self):
""" Indent token. """
# Child of root and no empty node
if (
self.level > 0 and not (self.end and self.list[self.pos - 1].start)
) or ( # not empty node:
self.end and not self.list[self.pos - 1].start
):
return self.indent_create(self.level)
return ""
def indent_create(self, times=1):
""" Returns indent string. """
if not self.formatter.compress and self.formatter.indent:
return "\n%s" % (
(times * self.formatter.indent) * self.formatter.indent_char
)
return ""
def identifier(self, systemid, publicid):
# TODO add base parameter:
if publicid and systemid:
return ' PUBLIC "%s" "%s"' % (publicid, systemid)
elif publicid:
return ' PUBLIC "%s"' % publicid
elif systemid:
return ' SYSTEM "%s"' % systemid
return ""
def configure(self):
""" Set token properties. """
self.descendant_mixed = self.list.token_descendant_mixed(self)
self.preserve = self.list.token_preserve(self)
self.cdata_section = self.list.cdata_section
def pre_operate(self):
pass
def post_operate(self):
pass
class AttlistDecl(Token):
def __unicode__(self):
str = self.indent_create()
str += "<!ATTLIST %s %s" % (self.arg[0], self.arg[1])
if self.arg[2] is not None:
str += " %s" % self.arg[2]
if self.arg[4] and not self.arg[3]:
str += " #REQUIRED"
elif self.arg[3] and self.arg[4]:
str += " #FIXED"
elif not self.arg[4] and not self.arg[3]:
str += " #IMPLIED"
if self.arg[3]:
str += ' "%s"' % self.arg[3]
str += ">"
return str
class CharacterData(Token):
def __unicode__(self):
str = self.arg[0]
if not self.preserve and not self.cdata_section:
# remove empty tokens always in element content!
if self.empty and not self.descendant_mixed:
if self.formatter.blanks and not self.formatter.compress and re.match(r"\s*\n\s*\n\s*", str):
str = "\n"
else:
str = ""
else:
if self.correct:
str = re.sub(r"\r\n", "\n", str)
str = re.sub(r"\r|\n|\t", " ", str)
str = re.sub(r"\s+", " ", str)
if self.delete_leading:
str = re.sub(r"^\s", "", str)
if self.delete_trailing:
str = re.sub(r"\s$", "", str)
if not self.cdata_section:
str = re.sub(r"&", "&amp;", str)
str = re.sub(r"<", "&lt;", str)
return str
def pre_operate(self):
self.list.whitespace_append_trailing(self)
self.list.whitespace_append_leading(self)
def post_operate(self):
self.delete_leading = self.list.whitespace_delete_leading(self)
self.delete_trailing = self.list.whitespace_delete_trailing(self)
class Comment(Token):
def __unicode__(self):
str = ""
if self.preserve in [0, 1] and self.indent:
str += self.indent_insert()
str += "<!--%s-->" % re.sub(
r"^[\r\n]+$", "\n", re.sub(r"^[\r\n]+", "\n", self.arg[0])
)
return str
def configure(self):
super(Formatter.Comment, self).configure()
self.indent = self.list.token_indent(self)
class Default(Token):
pass
class EndCdataSection(Token):
def __unicode__(self):
return "]]>"
def configure(self):
self.list.cdata_section = False
class ElementDecl(Token):
def __unicode__(self):
str = self.indent_create()
str += "<!ELEMENT %s%s>" % (self.arg[0], self.evaluate_model(self.arg[1]))
return str
def evaluate_model(self, model, modelStr="", concatStr=""):
childSeq = []
mixed = model[0] == xml.parsers.expat.model.XML_CTYPE_MIXED
hasChilds = len(model[3]) or mixed
if model[0] == xml.parsers.expat.model.XML_CTYPE_EMPTY: # 1
modelStr += " EMPTY"
elif model[0] == xml.parsers.expat.model.XML_CTYPE_ANY: # 2
modelStr += " ANY"
elif model[0] == xml.parsers.expat.model.XML_CTYPE_NAME: # 4
modelStr = "%s" % model[2] # new start
elif model[0] in (
xml.parsers.expat.model.XML_CTYPE_CHOICE,
xml.parsers.expat.model.XML_CTYPE_MIXED,
): # 5
concatStr = "|"
elif model[0] == xml.parsers.expat.model.XML_CTYPE_SEQ: # 6
concatStr = ","
if hasChilds:
modelStr += " ("
if mixed:
childSeq.append("#PCDATA")
for child in model[3]:
childSeq.append(self.evaluate_model(child))
modelStr += concatStr.join(childSeq)
if hasChilds:
modelStr += ")"
modelStr += {
xml.parsers.expat.model.XML_CQUANT_NONE: "",
xml.parsers.expat.model.XML_CQUANT_OPT: "?",
xml.parsers.expat.model.XML_CQUANT_PLUS: "+",
xml.parsers.expat.model.XML_CQUANT_REP: "*",
}[model[1]]
return modelStr
class EndDoctypeDecl(Token):
def __unicode__(self):
str = ""
if self.list[self.pos - 1].name != "StartDoctypeDecl":
str += self.indent_create(0)
str += "]"
str += ">"
str += self.indent_create(0)
return str
class EndElement(Token):
def __init__(self, list, arg):
list.level_decrement()
super(Formatter.EndElement, self).__init__(list, arg)
def __unicode__(self):
str = ""
# Don't close empty nodes on compression mode:
if (
not (self.formatter.compress or self.formatter.selfclose)
or self.list[self.pos - 1].name != "StartElement"
):
if self.preserve in [0] and self.indent:
str += self.indent_insert()
str += "</%s>" % self.arg[0]
return str
def configure(self):
self.descendant_mixed = self.list.token_descendant_mixed(self)
self.preserve = self.list.token_preserve(self)
self.indent = self.list.token_indent(self)
class EntityDecl(Token):
def __unicode__(self):
str = self.indent_create()
str += "<!ENTITY "
if self.arg[1]:
str += "% "
str += "%s " % self.arg[0]
if self.arg[2]:
str += '"%s"' % self.arg[2]
else:
str += "%s " % self.identifier(self.arg[4], self.arg[5])
if self.arg[6]:
str += "NDATA %s" % self.arg[6]
str += ">"
return str
class NotationDecl(Token):
def __unicode__(self):
str = self.indent_create()
str += "<!NOTATION %s%s>" % (
self.arg[0],
self.identifier(self.arg[2], self.arg[3]),
)
return str
class ProcessingInstruction(Token):
def __unicode__(self):
str = ""
if self.preserve in [0, 1] and self.indent:
str += self.indent_insert()
str += "<?%s %s?>" % (self.arg[0], self.arg[1])
return str
def configure(self):
super(Formatter.ProcessingInstruction, self).configure()
self.indent = self.list.token_indent(self)
class StartCdataSection(Token):
def __unicode__(self):
return "<![CDATA["
def configure(self):
self.list.cdata_section = True
class StartDoctypeDecl(Token):
def __unicode__(self):
str = "<!DOCTYPE %s" % (self.arg[0])
if self.arg[1]:
str += self.identifier(self.arg[1], self.arg[2])
if self.arg[3]:
str += " ["
return str
class StartElement(Token):
def __init__(self, list, arg):
super(Formatter.StartElement, self).__init__(list, arg)
self.list.level_increment()
def __unicode__(self):
str = ""
if self.preserve in [0, 1] and self.indent:
str += self.indent_insert()
str += "<%s" % self.arg[0]
for attr in sorted(self.arg[1].keys()):
str += self.attribute(attr, self.arg[1][attr])
if self.list[self.pos + 1].end and (self.formatter.compress or self.formatter.selfclose):
str += "/>"
else:
str += ">"
return str
def configure(self):
self.content_model = self.list.token_model(self)
self.descendant_mixed = self.list.token_descendant_mixed(self)
self.preserve = self.list.token_preserve(self)
self.indent = self.list.token_indent(self)
class XmlDecl(Token):
def __init__(self, list, arg):
super(Formatter.XmlDecl, self).__init__(list, arg)
if len(self.arg) > 1:
self.formatter.encoding_internal = self.arg[1]
def __unicode__(self):
str = "<?xml%s%s" % (
self.attribute("version", self.arg[0]),
self.attribute("encoding", self.formatter.encoding_effective),
)
if self.arg[2] > -1:
str += self.attribute("standalone", "yes")
str += "?>\n"
return str
def cli_usage(msg=""):
""" Output usage for command line tool. """
sys.stderr.write(msg + "\n")
sys.stderr.write(
'Usage: xmlformat [--preserve "pre,literal"] [--blanks]\
[--compress] [--selfclose] [--indent num] [--indent-char char]\
[--outfile file] [--encoding enc] [--outencoding enc]\
[--disable-inlineformatting] [--overwrite] [--disable-correction]\
[--eof-newline]\
[--help] <--infile file | file | - >\n'
)
sys.exit(2)
def cli():
""" Launch xmlformatter from command line. """
res = None
indent = DEFAULT_INDENT
indent_char = DEFAULT_INDENT_CHAR
outfile = None
overwrite = False
preserve = []
blanks = False
compress = DEFAULT_COMPRESS
selfclose = DEFAULT_SELFCLOSE
infile = None
encoding = DEFAULT_ENCODING_INPUT
outencoding = DEFAULT_ENCODING_OUTPUT
inline = DEFAULT_INLINE
correct = DEFAULT_CORRECT
eof_newline = DEFAULT_EOF_NEWLINE
try:
opts, args = getopt.getopt(
sys.argv[1:],
"",
[
"compress",
"selfclose",
"disable-correction",
"disable-inlineformatting",
"encoding=",
"help",
"infile=",
"indent=",
"indent-char=",
"outfile=",
"outencoding=",
"overwrite",
"preserve=",
"blanks",
"eof-newline"
],
)
except getopt.GetoptError as err:
cli_usage(str(err))
for key, value in opts:
if key in ["--indent"]:
indent = value
elif key in ["--preserve"]:
preserve = value.replace(",", " ").split()
elif key in ["--blanks"]:
blanks = True
elif key in ["--help"]:
cli_usage()
elif key in ["--compress"]:
compress = True
elif key in ["--selfclose"]:
selfclose = True
elif key in ["--outfile"]:
outfile = value
elif key in ["--infile"]:
infile = value
elif key in ["--encoding"]:
encoding = value
elif key in ["--outencoding"]:
outencoding = value
elif key in ["--indent-char"]:
indent_char = value
elif key in ["--disable-inlineformatting"]:
inline = False
elif key in ["--disable-correction"]:
correct = False
elif key in ["--overwrite"]:
overwrite = True
elif key in ["--eof-newline"]:
eof_newline = True
try:
formatter = Formatter(
indent=indent,
preserve=preserve,
blanks=blanks,
compress=compress,
selfclose=selfclose,
encoding_input=encoding,
encoding_output=outencoding,
indent_char=indent_char,
inline=inline,
correct=correct,
eof_newline=eof_newline,
)
input_file = None
if infile:
input_file = infile
res = formatter.format_file(input_file)
elif len(args) > 0:
if args[0] == "-":
res = formatter.format_string("".join(sys.stdin.readlines()))
else:
input_file = args[0]
res = formatter.format_file(input_file)
except xml.parsers.expat.ExpatError as err:
cli_usage("XML error: %s" % err)
except IOError as err:
cli_usage("IO error: %s" % err)
except:
cli_usage("Unkonwn error")
if overwrite:
formatter.enc_output(input_file, res)
else:
formatter.enc_output(outfile, res)

53
simulation/asp/main.py Normal file
View file

@ -0,0 +1,53 @@
import argparse
import shutil
from src.model.enum import Enum
from helper.fs import FS
from src.usecases.urdf_sub_assembly_usecase import UrdfSubAssemblyUseCase
from src.model.sdf_geometry import GeometryModel
from src.usecases.sdf_sub_assembly_usecase import SdfSubAssemblyUseCase
import os
from pathlib import Path
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--generationFolder", help="FreeCad generation folder")
parser.add_argument("--outPath", help="save SDF path")
parser.add_argument("--world", help="adding sdf world")
parser.add_argument("--format", help="urdf,sdf,mujoco")
args = parser.parse_args()
if args.generationFolder == None or args.outPath == None:
parser.print_help()
outPath = args.outPath
geometryFiles = FS.readFilesTypeFolder(args.generationFolder + "/assets/")
assemblyStructure = FS.readJSON(args.generationFolder + "/step-structure.json")
geometryModels: list[GeometryModel] = []
for el in geometryFiles:
geometryModels.append(
GeometryModel.from_dict(
FS.readJSON(args.generationFolder + "/assets/" + el)
)
)
if os.path.exists(outPath + Enum.folderPath):
shutil.rmtree(outPath + Enum.folderPath)
Path(outPath + Enum.folderPath).mkdir(parents=True, exist_ok=True)
if args.format == "sdf":
SdfSubAssemblyUseCase().call(
geometryModels=geometryModels,
assembly=assemblyStructure,
world=args.world,
generationFolder=args.generationFolder,
outPath=args.outPath,
)
if args.format == "urdf":
UrdfSubAssemblyUseCase().call(
geometryModels=geometryModels,
assembly=assemblyStructure,
world=args.world,
generationFolder=args.generationFolder,
outPath=args.outPath,
)

View file

@ -0,0 +1,18 @@
{
"name": "Cube1",
"ixx": "16.66666666666667",
"ixy": "0.0",
"ixz": "0.0",
"iyy": "16.66666666666667",
"izz": "16.66666666666667",
"massSDF": "0.9999999999999998",
"posX": "0.0",
"posY": "-0.015",
"posZ": "0.0",
"eulerX": "0.0",
"eulerY": "0.0",
"eulerZ": "0.0",
"iyz": "0.0",
"stl": "/meshes/Cube1.stl",
"link": "1554"
}

View file

@ -0,0 +1,18 @@
{
"name": "Cube2",
"ixx": "16.66666666666667",
"ixy": "0.0",
"ixz": "-3.637978807091713e-15",
"iyy": "16.66666666666667",
"izz": "16.66666666666667",
"massSDF": "0.9999999999999998",
"posX": "0.0",
"posY": "-0.009",
"posZ": "0.01",
"eulerX": "0.0",
"eulerY": "0.0",
"eulerZ": "0.0",
"iyz": "-3.637978807091713e-15",
"stl": "/meshes/Cube2.stl",
"link": "8838"
}

View file

@ -0,0 +1,4 @@
<include>
<name>{name}</name>
<uri>{uri}</uri>
</include>

View file

@ -0,0 +1,5 @@
<include>
<name>{name}</name>
<uri>{uri}</uri>
<pose>{posX} {posY} {posZ} {eulerX} {eulerY} {eulerZ}</pose>
</include>

View file

@ -0,0 +1,7 @@
<joint name="{name}" type="fixed">
<parent>base_link</parent>
<child>{child}::{child}</child>
<pose>{posX} {posY} {posZ} {eulerX} {eulerY} {eulerZ}</pose>
</joint>

View file

@ -0,0 +1,36 @@
<link name="{name}">
<pose>{posX} {posY} {posZ} {eulerX} {eulerY} {eulerZ}</pose>
<inertial>
<pose>{posX} {posY} {posZ} {eulerX} {eulerY} {eulerZ}</pose>
<inertia>
<ixx>{ixx}</ixx>
<ixy>{ixy}</ixy>
<ixz>{ixz}</ixz>
<iyy>{iyy}</iyy>
<iyz>{iyz}</iyz>
<izz>{izz}</izz>
</inertia>
<mass>{massSDF}</mass>
</inertial>
<collision name="collision">
<geometry>
<mesh>
<uri>model:/{stl}</uri>
</mesh>
</geometry>
</collision>
<visual name="visual">
<geometry>
<mesh>
<uri>model:/{stl}</uri>
</mesh>
</geometry>
<surface>
<friction>
<ode>
<mu>{friction}</mu>
</ode>
</friction>
</surface>
</visual>
</link>

View file

@ -0,0 +1,5 @@
<?xml version="1.0" ?>
<model>
<sdf version="1.5">model.sdf</sdf>
</model>

View file

@ -0,0 +1,27 @@
<?xml version='1.0'?>
<sdf version="1.4">
<model name="{name}">
<link name="{name}">
<gravity>0</gravity>
<collision name="collision">
<mesh>
<uri>model:/{stl}</uri>
</mesh>
</collision>
<visual name="visual">
<geometry>
<mesh>
<uri>model:/{stl}</uri>
</mesh>
</geometry>
<surface>
<friction>
<ode>
<mu>{friction}</mu>
</ode>
</friction>
</surface>
</visual>
</link>
</model>
</sdf>

View file

@ -0,0 +1,64 @@
<sdf version='1.7'>
<world name='empty'>
<gravity>0 0 -9.8</gravity>
<magnetic_field>6e-06 2.3e-05 -4.2e-05</magnetic_field>
<atmosphere type='adiabatic'/>
<scene>
<ambient>0.4 0.4 0.4 1</ambient>
<background>0.7 0.7 0.7 1</background>
<shadows>true</shadows>
</scene>
<model name='ground_plane'>
<static>true</static>
<link name='link'>
<collision name='collision'>
<geometry>
<plane>
<normal>0 0 1</normal>
<size>100 100</size>
</plane>
</geometry>
<surface>
<friction>
<ode/>
</friction>
<bounce/>
<contact/>
</surface>
</collision>
<visual name='visual'>
<geometry>
<plane>
<normal>0 0 1</normal>
<size>100 100</size>
</plane>
</geometry>
<material>
<ambient>0.8 0.8 0.8 1</ambient>
<diffuse>0.8 0.8 0.8 1</diffuse>
<specular>0.8 0.8 0.8 1</specular>
</material>
</visual>
<pose>0 0 0 0 -0 0</pose>
<inertial>
<pose>0 0 0 0 -0 0</pose>
<mass>1</mass>
<inertia>
<ixx>1</ixx>
<ixy>0</ixy>
<ixz>0</ixz>
<iyy>1</iyy>
<iyz>0</iyz>
<izz>1</izz>
</inertia>
</inertial>
<enable_wind>false</enable_wind>
</link>
<pose>0 0 0 0 -0 0</pose>
<self_collide>false</self_collide>
</model>
</world>
</sdf>

View file

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<robossembler
name="{name}">
</robossembler>

View file

@ -0,0 +1,13 @@
<joint
name="{joint_name}"
type="{joint_type}">
<origin
xyz="{joint_location}"
rpy="{joint_rotation}" />
<parent
link="{parent_part}" />
<child
link="{child_part}" />
<axis
xyz="0 0 1" />
</joint>

View file

@ -0,0 +1,31 @@
<link
name="{part_name}">
<inertial>
<origin
xyz="0 0 0"
rpy="0 0 0" />
<mass
value="{mass}" />
<inertia
ixx="0.0"
ixy="0.0"
ixz="0.0"
iyy="0.0"
iyz="0.0"
izz="0.0" />
</inertial>
<visual>
<geometry>
<mesh
filename="{visual_file}"
scale="1.0 1.0 1.0" />
</geometry>
</visual>
<collision>
<geometry>
<mesh
filename="{collision_file}"
scale="1.0 1.0 1.0" />
</geometry>
</collision>
</link>

View file

@ -0,0 +1,37 @@
<?xml version="1.0"?>
<robot name="{name}">
<link name="baseLink">
<contact>
<friction_anchor />
<lateral_friction value="0.3" />
<rolling_friction value="0.0" />
<contact_cfm value="0.0" />
<contact_erp value="1.0" />
</contact>
<inertial>
<origin xyz="{centerMassX} {centerMassY} {centerMassZ}" />
<mass value="{massSDF}" />
<inertia ixx="{ixx}" ixy="{ixy}" ixz="{ixz}" iyy="{iyy}" iyz="{iyz}" izz="{izz}" />
</inertial>
<visual>
<geometry>
<mesh filename="{stl}" scale="0.001 0.001 0.001" />
</geometry>
<material name="white">
<color rgba="1. 1. 1. 1." />
</material>
</visual>
<collision>
<geometry>
<mesh filename="{stl}" scale="0.001 0.001 0.001" />
</geometry>
</collision>
<friction>
<ode>
<mu>0.2</mu>
<mu2>0.1</mu2>
<fdir1>1 0 0</fdir1>
</ode>
</friction>
</link>
</robot>

View file

@ -0,0 +1,5 @@
argparse
matplotlib
pybullet
argparse
xmlformatter

View file

@ -0,0 +1,16 @@
from distutils.dir_util import copy_tree
from src.model.enum import Enum
class Assembly:
def generateSubAssembly(self, assembly: list[str]):
asm = {}
inc = 0
for el in assembly:
asm[str("asm" + str(inc))] = {
"part": el,
"assembly": assembly[0:inc],
}
inc += 1
return asm
def copy(self,generationFolder,format,outPath ):
copy_tree(generationFolder + format, outPath + Enum.folderPath)

View file

@ -0,0 +1,2 @@
class Enum:
folderPath = "generation/"

View file

@ -0,0 +1,327 @@
import os
from helper.fs import FS
from src.model.sdf_join import SdfJoin
import typing
import uuid
def from_str(x):
assert isinstance(x, str)
return x
def from_none(x):
assert x is None
return x
def from_union(fs, x):
for f in fs:
try:
return f(x)
except:
pass
assert False
def to_class(c, x):
assert isinstance(x, c)
return x.to_dict()
DELIMITER_SCALE = 10000
class GeometryModel:
def __init__(
self,
name,
ixx,
ixy,
ixz,
iyy,
izz,
massSDF,
posX,
posY,
posZ,
eulerX,
eulerY,
eulerZ,
iyz,
stl,
link,
friction,
centerMassX,
centerMassY,
centerMassZ,
):
self.name = name
self.ixx = ixx
self.ixy = ixy
self.ixz = ixz
self.iyy = iyy
self.izz = izz
self.massSDF = massSDF
self.posX = posX
self.posY = posY
self.posZ = posZ
self.eulerX = eulerX
self.eulerY = eulerY
self.eulerZ = eulerZ
self.iyz = iyz
self.stl = stl
self.link = link
self.friction = friction
self.centerMassX = centerMassX
self.centerMassY = centerMassY
self.centerMassZ = centerMassZ
@staticmethod
def from_dict(obj):
assert isinstance(obj, dict)
name = from_union([from_str, from_none], obj.get("name"))
ixx = from_union([from_str, from_none], obj.get("ixx"))
ixy = from_union([from_str, from_none], obj.get("ixy"))
ixz = from_union([from_str, from_none], obj.get("ixz"))
iyy = from_union([from_str, from_none], obj.get("iyy"))
izz = from_union([from_str, from_none], obj.get("izz"))
massSDF = from_union([from_str, from_none], obj.get("massSDF"))
posX = from_union([from_str, from_none], obj.get("posX"))
posY = from_union([from_str, from_none], obj.get("posY"))
posZ = from_union([from_str, from_none], obj.get("posZ"))
eulerX = from_union([from_str, from_none], obj.get("eulerX"))
eulerY = from_union([from_str, from_none], obj.get("eulerY"))
eulerZ = from_union([from_str, from_none], obj.get("eulerZ"))
iyz = from_union([from_str, from_none], obj.get("iyz"))
stl = from_union([from_str, from_none], obj.get("stl"))
link = from_union([from_str, from_none], obj.get("link"))
friction = from_union([from_str, from_none], obj.get("friction"))
centerMassX = from_union([from_str, from_none], obj.get("centerMassX"))
centerMassY = from_union([from_str, from_none], obj.get("centerMassY"))
centerMassZ = from_union([from_str, from_none], obj.get("centerMassZ"))
return GeometryModel(
name,
ixx,
ixy,
ixz,
iyy,
izz,
massSDF,
posX,
posY,
posZ,
eulerX,
eulerY,
eulerZ,
iyz,
stl,
link,
friction,
centerMassX,
centerMassY,
centerMassZ,
)
def to_dict(self):
result = {}
if self.name is not None:
result["name"] = from_union([from_str, from_none], self.name)
if self.ixx is not None:
result["ixx"] = from_union([from_str, from_none], self.ixx)
if self.ixy is not None:
result["ixy"] = from_union([from_str, from_none], self.ixy)
if self.ixz is not None:
result["ixz"] = from_union([from_str, from_none], self.ixz)
if self.iyy is not None:
result["iyy"] = from_union([from_str, from_none], self.iyy)
if self.izz is not None:
result["izz"] = from_union([from_str, from_none], self.izz)
if self.massSDF is not None:
result["massSDF"] = from_union([from_str, from_none], self.massSDF)
if self.posX is not None:
result["posX"] = from_union([from_str, from_none], self.posX)
if self.posY is not None:
result["posY"] = from_union([from_str, from_none], self.posY)
if self.posZ is not None:
result["posZ"] = from_union([from_str, from_none], self.posZ)
if self.eulerX is not None:
result["eulerX"] = from_union([from_str, from_none], self.eulerX)
if self.eulerY is not None:
result["eulerY"] = from_union([from_str, from_none], self.eulerY)
if self.eulerZ is not None:
result["eulerZ"] = from_union([from_str, from_none], self.eulerZ)
if self.iyz is not None:
result["iyz"] = from_union([from_str, from_none], self.iyz)
if self.stl is not None:
result["stl"] = from_union([from_str, from_none], self.stl)
if self.link is not None:
result["link"] = from_union([from_str, from_none], self.link)
if self.friction is not None:
result["friction"] = from_union([from_str, from_none], self.eulerZ)
if self.centerMassX is not None:
result["centerMassX"] = from_union([from_str, from_none], self.centerMassX)
if self.centerMassY is not None:
result["centerMassY"] = from_union([from_str, from_none], self.centerMassY)
if self.centerMassZ is not None:
result["centerMassZ"] = from_union([from_str, from_none], self.centerMassZ)
return result
def toJSON(self) -> str:
return str(self.to_dict()).replace("'", '"')
def toSDF(self):
return (
FS.readFile(
os.path.dirname(os.path.realpath(__file__))
+ "/../../mocks/sdf/model.sdf"
)
.replace(
"{name}",
self.name,
)
.replace("{posX}", self.posX)
.replace("{posY}", self.posY)
.replace("{posZ}", self.posZ)
.replace("{eulerX}", self.eulerX)
.replace("{eulerY}", self.eulerY)
.replace("{eulerZ}", self.eulerZ)
.replace("{ixx}", self.ixx)
.replace("{ixy}", self.ixy)
.replace("{ixz}", self.ixz)
.replace("{iyy}", self.iyy)
.replace("{iyz}", self.iyz)
.replace("{izz}", self.izz)
.replace(
"{massSDF}",
self.massSDF,
)
.replace("{stl}", self.stl)
.replace("{friction}", self.friction)
)
def toSdfLink(self):
return (
FS.readFile(
os.path.dirname(os.path.realpath(__file__))
+ "/../../mocks/sdf/link.sdf"
)
.replace(
"{name}",
self.name,
)
.replace("{posX}", self.posX)
.replace("{posY}", self.posY)
.replace("{posZ}", self.posZ)
.replace("{eulerX}", self.eulerX)
.replace("{eulerY}", self.eulerY)
.replace("{eulerZ}", self.eulerZ)
.replace("{ixx}", self.ixx)
.replace("{ixy}", self.ixy)
.replace("{ixz}", self.ixz)
.replace("{iyy}", self.iyy)
.replace("{iyz}", self.iyz)
.replace("{izz}", self.izz)
.replace(
"{massSDF}",
self.massSDF,
)
.replace("{stl}", self.stl)
.replace("{friction}", self.friction)
)
def includeLink(self, pose=False):
if pose == False:
return (
FS.readFile(
os.path.dirname(os.path.realpath(__file__))
+ "/../../mocks/sdf/include.sdf"
)
.replace("{name}", self.name)
.replace("{uri}", "/" + self.name)
)
return (
FS.readFile(
os.path.dirname(os.path.realpath(__file__))
+ "/../../mocks/sdf/include_pose.sdf"
)
.replace("{name}", self.name)
.replace("{uri}", "/" + self.name)
.replace("{posX}", self.posX)
.replace("{posY}", self.posY)
.replace("{posZ}", self.posZ)
.replace("{eulerX}", self.eulerX)
.replace("{eulerY}", self.eulerY)
.replace("{eulerZ}", self.eulerZ)
.replace("{ixx}", self.ixx)
.replace("{ixy}", self.ixy)
.replace("{ixz}", self.ixz)
.replace("{iyy}", self.iyy)
.replace("{iyz}", self.iyz)
.replace("{izz}", self.izz)
)
def generateSDFatJoinFixed(self, sdfModels: list["GeometryModel"]):
sdf = '\n<model name="assembly">\n'
sdf += ' <link name="base_link">\n'
sdf += " <pose>0 0 0 0 0 0</pose>\n"
sdf += " </link>\n"
link = sdf + self.includeLink(pose=True)
if sdfModels.__len__() == 0:
return link
endTagLinkInc = link.__len__()
beginSDF = link[0:endTagLinkInc]
sdfJoin = beginSDF + "\n"
for el in sdfModels:
if el.name != self.name:
sdfJoin += el.includeLink(pose=True) + "\n"
endSDF = link[endTagLinkInc : link.__len__()]
for el in sdfModels:
if el.name != self.name:
sdfJoin += (
SdfJoin(
name=str(uuid.uuid4()),
parent=self.name,
child=el.name,
modelAt=el,
).toSDF()
+ "\n"
)
sdfJoin += endSDF
sdfJoin += "</model>"
return sdfJoin
def toUrdf(self):
return (
FS.readFile(
os.path.dirname(os.path.realpath(__file__))
+ "/../../mocks/urdf/model.urdf"
)
.replace("{name}", self.name)
.replace("{name}", self.name)
.replace("{uri}", "/" + self.name)
.replace("{posX}", self.posX)
.replace("{posY}", self.posY)
.replace("{posZ}", self.posZ)
.replace("{eulerX}", self.eulerX)
.replace("{eulerY}", self.eulerY)
.replace("{eulerZ}", self.eulerZ)
.replace("{ixx}", self.ixx)
.replace("{ixy}", self.ixy)
.replace("{ixz}", self.ixz)
.replace("{iyy}", self.iyy)
.replace("{iyz}", self.iyz)
.replace("{izz}", self.izz)
.replace("{stl}", self.stl)
.replace("{massSDF}", self.massSDF)
.replace("{centerMassX}", self.centerMassX)
.replace("{centerMassY}", self.centerMassY)
.replace("{centerMassZ}", self.centerMassZ)
)

View file

@ -0,0 +1,16 @@
from helper.fs import FS
import os
class SdfJoin:
def __init__(self, name, parent, modelAt, child) -> None:
self.name = name
self.parent = parent
self.child = child
self.modelAt = modelAt
pass
def toSDF(self):
return (FS.readFile(os.path.dirname(os.path.realpath(__file__)) + '/../../mocks/sdf/joint_fixed.sdf')).replace('{name}', self.name,).replace('{parent}', self.parent).replace('{child}', self.child).replace('{posX}', self.modelAt.posX).replace('{posY}', self.modelAt.posY).replace('{posZ}', self.modelAt.posZ).replace('{eulerX}', self.modelAt.eulerX).replace('{eulerY}', self.modelAt.eulerY).replace('{eulerZ}', self.modelAt.eulerZ).replace('{ixx}', self.modelAt.ixx).replace('{ixy}', self.modelAt.ixy).replace('{ixz}', self.modelAt.ixz).replace('{iyy}', self.modelAt.iyy).replace('{iyz}', self.modelAt.iyz).replace('{izz}', self.modelAt.izz)

View file

@ -0,0 +1,14 @@
from helper.xmlformatter import Formatter
from src.model.enum import Enum
from helper.fs import FS
class FormatterUseCase:
def call(outPath: str, format: str):
formatter = Formatter(
indent="1", indent_char="\t", encoding_output="ISO-8859-1", preserve=["literal"])
files = FS.readFilesTypeFolder(
outPath + Enum.folderPath, fileType=format)
for el in files:
FS.writeFile(data=str(formatter.format_file(outPath + Enum.folderPath + el),
'utf-8'), filePath=outPath + Enum.folderPath, fileName=el)

View file

@ -0,0 +1,18 @@
import os
from helper.fs import FS
class SdfGenerateWorldUseCase:
def call(assembly: str) -> str:
world = FS.readFile(
os.path.dirname(os.path.realpath(__file__)) + "/../../mocks/sdf/world.sdf"
)
beginWorld = world[0 : world.find("</world") - 1]
endWorld = world[world.find("</world") - 1 : world.__len__()]
return beginWorld + assembly + endWorld
class GeometryValidateUseCase:
def call(geometry) -> str:
return

View file

@ -0,0 +1,12 @@
import os
from helper.fs import FS
class SdfGenerateWorldUseCase:
def call(assembly:str) -> str:
world = FS.readFile(os.path.dirname(os.path.realpath(__file__))
+ '/../../mocks/sdf/world.sdf')
beginWorld = world[0:world.find('</world') - 1]
endWorld = world[world.find('</world') - 1: world.__len__()]
return beginWorld + assembly + endWorld

View file

@ -0,0 +1,78 @@
import os
from typing import Optional
from helper.fs import FS
from helper.fs import filterModels, listGetFirstValue
from src.model.asm import Assembly
from src.model.enum import Enum
from src.usecases.formatter_usecase import FormatterUseCase
from src.usecases.sdf_generate_world_usecase import SdfGenerateWorldUseCase
from src.model.sdf_geometry import GeometryModel
from distutils.dir_util import copy_tree
SDF_FILE_FORMAT = ".sdf"
CONFIG_PATH = (
os.path.dirname(os.path.realpath(__file__)) + "/../../mocks/sdf/model.config"
)
class SdfSubAssemblyUseCase(Assembly):
def call(
self,
geometryModels: list[GeometryModel],
assembly: list[str],
outPath: str,
generationFolder: str,
world: bool,
):
asm = {}
generateSubAssemblyModels = self.generateSubAssembly(assembly)
inc = 0
for key, value in generateSubAssemblyModels.items():
inc += 1
if value["assembly"].__len__() != 0:
model: Optional[GeometryModel] = listGetFirstValue(
geometryModels, None, lambda x: x.name == value["assembly"][0]
)
if model != None:
asm[key] = {
"assembly": model.generateSDFatJoinFixed(
filterModels(geometryModels, value["assembly"])
),
"part": (
listGetFirstValue(
geometryModels, None, lambda x: x.name == value["part"]
)
).includeLink(),
}
self.copy(generationFolder=generationFolder, format="/sdf", outPath=outPath)
dirPath = outPath + Enum.folderPath
for el in geometryModels:
path = dirPath + el.name + "/"
os.makedirs(path)
FS.writeFile(
data=el.toSDF(), filePath=path, fileName="/model" + SDF_FILE_FORMAT
)
FS.writeFile(
data=FS.readFile(CONFIG_PATH),
filePath=path,
fileName="/model" + ".config",
)
for key, v in asm.items():
FS.writeFile(
data=v["assembly"],
filePath=dirPath,
fileName="/" + key + SDF_FILE_FORMAT,
)
else:
for key, v in asm.items():
FS.writeFile(
data=SdfGenerateWorldUseCase.call(v["assembly"]),
filePath=dirPath,
fileName="/" + key + SDF_FILE_FORMAT,
)
FormatterUseCase.call(outPath=outPath, format=SDF_FILE_FORMAT)

View file

@ -0,0 +1,30 @@
from helper.fs import FS
from src.model.enum import Enum
from src.model.asm import Assembly
from src.model.sdf_geometry import GeometryModel
import json
import re
URDF_FILE_FORMAT = ".urdf"
URDF_GENERATOR_FILE = "urdf-generation" + ".json"
class UrdfSubAssemblyUseCase(Assembly):
def call(
self,
geometryModels: list[GeometryModel],
assembly: list[str],
outPath: str,
generationFolder: str,
world: bool,
):
dirPath = generationFolder + Enum.folderPath
asm = {}
for el in geometryModels:
asm[el.name] = el.toUrdf()
FS.writeFile(
data=json.dumps(asm, indent=4),
fileName=URDF_GENERATOR_FILE,
filePath=dirPath,
)

View file

@ -0,0 +1,116 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# blender backup files
*.blend1
install_plugin_cad.sh
.vscode
.DS_Store
# emacs backup files
~*
*~
*#
.#*
\#*\#
out/
/env.json

View file

@ -0,0 +1,31 @@
# Start dev
create env.json
```json
{
"cadDoc":"CAD_DOC_PATH_REPLACE",
"sequences":"SEQUENCES_PATH_REPLACE",
"aspDir":"ASP_DIR_REPLACE"
}
```
# Command generation assets
freecad generate.py
# Command generation insertion vectors
Загрузка субмодуля git
```
git submodule update --init
```
Создание и активация виртуального окружения
```
conda env create -f assembly/environment.yml
conda activate assembly
```
Запуск программы
```
python3 main.py
```

View file

@ -0,0 +1,205 @@
from typing import List
import FreeCAD as App
import Part
import Mesh
import Part
import MeshPart
import os
import json
import FreeCADGui as Gui
class FS:
def readJSON(path: str):
return json.loads((open(path)).read())
def writeFile(data, filePath, fileName):
file_to_open = filePath + fileName
f = open(file_to_open, 'w', encoding='utf-8',
errors='ignore')
f.write(data)
f.close()
def createFolder(path: str):
if (not os.path.exists(path)):
return os.mkdir(path)
class SimpleCopyPartModel:
id = None
copyLink = None
label = None
part = None
def getPart(self):
return self.part
def __init__(self, part) -> None:
try:
from random import randrange
self.id = str(randrange(1000000))
childObj = part
print(part)
__shape = Part.getShape(
childObj, '', needSubElement=False, refine=False)
obj = App.ActiveDocument.addObject('Part::Feature', self.id)
obj.Shape = __shape
self.part = obj
self.label = obj.Label
App.ActiveDocument.recompute()
except Exception as e:
print(e)
def remove(self):
App.ActiveDocument.removeObject(self.label)
class MeshPartModel:
id = None
mesh = None
def __init__(self, part) -> None:
try:
from random import randrange
self.id = 'mesh' + str(randrange(1000000))
document = App.ActiveDocument
mesh = document.addObject("Mesh::Feature", self.id)
shape = Part.getShape(part, "")
mesh.Mesh = MeshPart.meshFromShape(
Shape=shape, LinearDeflection=20, AngularDeflection=0.1, Relative=False)
mesh.Label = self.id
self.mesh = mesh
except Exception as e:
print(e)
pass
def remove(self):
try:
App.ActiveDocument.removeObject(self.mesh.Label)
except Exception as e:
print(e)
class JoinMeshModel:
id = None
mesh = None
def __init__(self, meshesPartModels: list['MeshPartModel']) -> None:
meshes = []
from random import randrange
for el in meshesPartModels:
meshes.append(el.mesh.Mesh)
self.id = 'MergedMesh' + str(randrange(1000000))
doc = App.ActiveDocument
merged_mesh = Mesh.Mesh()
for el in meshes:
merged_mesh.addMesh(el)
new_obj = doc.addObject("Mesh::Feature", self.id)
new_obj.Mesh = merged_mesh
new_obj.ViewObject.DisplayMode = "Flat Lines"
self.mesh = new_obj
def remove(self):
try:
App.ActiveDocument.removeObject(self.id)
except Exception as e:
print(e)
class ExportAssemblyThemAllUseCase:
def call(self, path:str, assemblys:list[str]):
assembly = assemblys
asmStructure = {}
inc = 0
for el in assembly:
if (inc != 0):
asmStructure[inc] = {
"child": el,
"parents": assembly[0:inc]
}
inc += 1
objectsFreeCad = App.ActiveDocument.Objects
asmSolids = {}
for k, v in asmStructure.items():
assemblyParentList = v['parents']
assemblyChild = v['child']
for el in assemblyParentList:
for solid in objectsFreeCad:
if (el == solid.Label):
if (asmSolids.get(k) is None):
asmSolids[k] = {'parents': [], 'child': list(
filter(lambda x: x.Label == assemblyChild, objectsFreeCad))[0]}
asmSolids[k]['parents'].append(solid)
inc = 0
for k, v in asmSolids.items():
geometry = {"0": [], "1": []}
if (k != 0):
App.activeDocument().addObject("Part::Compound", "Compound")
copyLinks = list(
map(lambda el: SimpleCopyPartModel(el), v['parents']))
if copyLinks != None:
App.activeDocument().Compound.Links = list(
map(lambda el: el.getPart(), copyLinks))
object = App.activeDocument().getObject('Compound')
boundBox = object.Shape.BoundBox
geometry['0'].append(boundBox.XMax)
geometry['0'].append(boundBox.YMax)
geometry['0'].append(boundBox.ZMax)
boundBoxChild = v['child'].Shape.BoundBox
geometry['1'].append(boundBoxChild.XMax)
geometry['1'].append(boundBoxChild.YMax)
geometry['1'].append(boundBoxChild.ZMax)
meshParents = []
for el in v['parents']:
meshParents.append(MeshPartModel(el))
joinMesh = JoinMeshModel(meshParents)
for el in meshParents:
el.remove()
import importOBJ
importOBJ.export(joinMesh.mesh, path + str(1) + '.obj')
joinMesh.remove()
importOBJ.export(v['child'], path + str(0) + '.obj')
FS.writeFile(json.dumps(geometry), path, 'translation.json')
App.ActiveDocument.removeObject("Compound")
for el in copyLinks:
el.remove()
App.activeDocument().recompute()
inc += 1
def main():
env = FS.readJSON('./env.json')
env.get('cadDoc')
aspDir = env.get('aspDir')
sequences = FS.readJSON(env.get('sequences')).get('sequences')
App.openDocument(env.get('cadDoc'))
for sequencyNumber in range(len(sequences)):
FS.createFolder(aspDir + 'assemblys/')
mainFolder = aspDir + 'assemblys/' + str(sequencyNumber) + '/'
FS.createFolder(mainFolder)
for subSequenceNumber in range(len(sequences[sequencyNumber])):
if(subSequenceNumber != 0):
subFolder = aspDir + 'assemblys/' + \
str(sequencyNumber) + '/' + str(subSequenceNumber) + '/'
FS.createFolder(subFolder)
ExportAssemblyThemAllUseCase().call(path=subFolder,assemblys=sequences[sequencyNumber][0:subSequenceNumber+1])
App.closeDocument(App.ActiveDocument.Name)
freecadQTWindow = Gui.getMainWindow()
freecadQTWindow.close()
main()

View file

@ -0,0 +1,174 @@
import os
import sys
project_base_dir = os.path.abspath(os.path.join(
os.path.dirname(os.path.abspath(__file__)), './')) + '/assembly/'
sys.path.append(project_base_dir)
sys.path.append(project_base_dir + '/baselines/')
sys.path.append(project_base_dir + '/assets/')
from scipy.spatial.transform import Rotation
import shutil
from spatialmath import *
from spatialmath.base import *
from assembly.assets.process_mesh import process_mesh
from assembly.examples.run_joint_plan import get_planner
from assembly.baselines.run_joint_plan import PyPlanner
from assembly.assets.subdivide import subdivide_to_size
import numpy as np
import json
import trimesh
import re
def merge_meshes(meshes):
# Создание пустого меша
merged_mesh = trimesh.Trimesh()
# Объединение каждого меша в один
for mesh in meshes:
merged_mesh = trimesh.util.concatenate(
[merged_mesh, trimesh.load(mesh)])
i = True
while i:
if merged_mesh.fill_holes():
i = False
return merged_mesh
os.environ['OMP_NUM_THREADS'] = '1'
class FS:
def readJSON(path: str):
return json.loads((open(path)).read())
def writeFile(data, filePath, fileName):
file_to_open = filePath + fileName
f = open(file_to_open, 'w', )
f.write(data)
def readFile(path: str):
return open(path).read()
def readFilesTypeFolder(pathFolder: str, fileType='.json'):
return os.listdir(pathFolder)
def readFolder(pathFolder: str):
return list(map(lambda el: pathFolder + '/' + el, os.listdir(pathFolder)))
def createFolder(path: str):
if (not os.path.exists(path)):
return os.mkdir(path)
def listGetFirstValue(iterable, default=False, pred=None):
return next(filter(pred, iterable), default)
def filterModels(filterModels, filterModelsDescription):
models = []
for el in filterModelsDescription:
models.append(listGetFirstValue(
filterModels, None, lambda x: x.name == el))
return models
# mesh1 = trimesh.load('/Users/idontsudo/framework/asp/out/sdf-generation/meshes/Cube.obj')
# mesh2 = trimesh.load('/Users/idontsudo/framework/asp/out/sdf-generation/meshes/Cube001.obj')
# # Объединение мешей
# merged_mesh = merge_meshes([mesh1, mesh2])
# # Сохранение объединенного меша в файл
# merged_mesh.export('merged.obj')
def main():
# from argparse import ArgumentParser
# parser = ArgumentParser()
# parser.add_argument('--asp-path', type=str, required=True)
# args = parser.parse_args()
# aspDir = args.asp_dir
# # Коректировка пути до папки с генерацией ASP
# if (aspDir == None):
# args.print_helper()
# if (aspDir[aspDir.__len__() - 1] != '/'):
# aspDir += '/'
aspDir = '/home/idontsudo/framework/asp/out/'
sequences = FS.readJSON(aspDir + 'sequences.json').get('sequences')
assemblyDirNormalize = []
for el in FS.readFolder(aspDir + 'assemblys'):
for e in FS.readFolder(el):
try:
# Пост обработка .obj обьектов
process_mesh(source_dir=e, target_dir=e +
'/process/', subdivide=e, verbose=True)
assemblyDirNormalize.append(e + '/process/')
except Exception as e:
print('ERRROR:')
print(e)
print(assemblyDirNormalize)
for el in assemblyDirNormalize:
asset_folder = os.path.join(project_base_dir, aspDir)
assembly_dir = os.path.join(asset_folder, el)
planner = get_planner('bfs')(assembly_dir, assembly_dir, 0, [
1], False, 'sdf', 0.05, 0.01, 100, 100, True)
# Планирование пути
status, t_plan, path = planner.plan(
120, seed=1, return_path=True, render=False, record_path=None
)
coords = []
for k in path:
seMatrix = SE3(k)
euler = seMatrix.eul()
coord = seMatrix.A[0:3, 3]
rot = Rotation.from_euler('xyz', euler, degrees=True).as_quat()
coords.append({'quadrelion': [rot[0], rot[1], rot[2], rot[3]], 'xyz': [
coord[0], coord[1], coord[2]], 'euler': [euler[0], euler[1], euler[2]]})
# Запись пути в кортеж
planingObject = {
"time": t_plan,
"insertion_path": coords,
"status": status,
}
# Запись результата планирования
FS.writeFile(json.dumps(planingObject),
el[0:el.__len__() - 8], 'insertion_path.json')
try:
planner = PyPlanner(assembly_dir, 'process', still_ids=[1],)
status, t_plan, path = planner.plan(
planner_name='rrt',
step_size=None,
max_time=None,
seed=1,
return_path=True,
simplify=False,
render=False
)
print(f'Status: {status}, planning time: {t_plan}')
if args.save_dir is not None:
planner.save_path(path, args.save_dir, args.n_save_state)
except Exception as e:
print(e)
main()

View file

@ -0,0 +1,3 @@
spatialmath
scipy
uuid

View file

@ -0,0 +1,37 @@
# Intersection Geometry Predicate
Осуществляется проверка геометрических вершин пересечения файлов .obj на соответствие допустимой погрешности глубины.
### CLI аргументы:
--aspPath путь до папки с асетами сборки
### вывод
на выходе делает файл intersection_geometry.json
в котором записан результат работы предиката в виде ключа status и результат в виде ключа recalculations.
В ключе recalculations, записан объект в который записываются результаты расчета пересечения.
Они состоят из объекта.
- names имена пересекающеюся деталей
- depth глубина пересечения
- point геометрические вершины
```JSON
{
"status": false,
"recalculations": {
"disk_bottom bolt ": [
{
"names": "disk_bottom bolt ",
"depth": 0.5127948565443177,
"point": [
-1.972554,
16.442781,
-9.208569
]
}
]
}
}
```

View file

@ -0,0 +1,86 @@
import trimesh
import os
import json
import argparse
class FS:
def readJSON(path: str):
return json.loads((open(path)).read())
def writeFile(data, filePath, fileName):
file_to_open = filePath + fileName
f = open(file_to_open, 'w')
f.write(data)
f.close()
def readFile(path: str):
return open(path).read()
def readFilesTypeFolder(pathFolder: str, fileType='.json'):
filesJson = list(
filter(lambda x: x[-fileType.__len__():] == fileType, os.listdir(pathFolder)))
return list(map(lambda x: pathFolder + x, filesJson))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--aspPath', help='asp generation folder')
args = parser.parse_args()
if args.aspPath == None:
parser.print_help()
aspPath = args.aspPath
pathMeshes = 'sdf/meshes/'
permissibleDepth = 0.5
trimeshObjects = []
meshes = FS.readFilesTypeFolder(aspPath + pathMeshes, '.obj')
for el in meshes:
trimeshObjects.append(trimesh.load(el))
manager = trimesh.collision.CollisionManager()
for el in range(len(trimeshObjects)):
manager.add_object(str(meshes[el]), trimeshObjects[el])
def set_to_dict(s):
keys = list(s)
values = [None] * len(s)
return {k: v for k, v in zip(keys, values)}
collisions = manager.in_collision_internal(True, True)
recalculations = {}
for el in collisions[collisions.__len__() - 1]:
if (el.depth > permissibleDepth):
labels = ''
for key in set_to_dict(el.names).keys():
label = key[key.rfind('/') + 1:key.__len__() - 4]
labels+=label + " "
message = {
'names': labels,
'depth': el.depth,
'point': el.point.tolist()
}
if(recalculations.get(labels) != None):
recalculations[labels].append(message)
else:
recalculations[labels] = [message]
if(len(list(recalculations.keys())) >= 1):
messageError = {
'status':False,
'recalculations':recalculations
}
FS.writeFile(json.dumps(messageError, ensure_ascii=False, indent=4), aspPath,'intersection_geometry.json')
else:
message = {
'status':True,
'recalculations': None
}
FS.writeFile(json.dumps(messageError, ensure_ascii=False, indent=4), aspPath,'intersection_geometry.json')
main()

View file

@ -0,0 +1,2 @@
argparse
trimesh

View file

@ -0,0 +1,105 @@
# Инструкция для запуска
Должен быть установлен пакет [BlenderProc](https://github.com/DLR-RM/BlenderProc)
## Создание датасета в формате YoloV4 для заданного объекта
Команда для запуска:
```
blenderproc run obj2Yolov4dataset.py [obj] [output_dir] [--imgs 1]
```
- obj: файл описания объекта *.obj
- output_dir: выходной каталог
- --imgs 1: количество изображений на выходе
## Создание датасета в формате YoloV4 для серии заданных объектов в заданной сцене
Команда для запуска:
```
blenderproc run objs2Yolov4dataset.py [scene] [obj_path] [output_dir] [vhacd_path] [--imgs 1]
```
- scene: путь к файлу описания сцены (*.blend)
- obj_path: путь к каталогу с файлами описания детектируемых объектов *.obj
- output_dir: выходной каталог
- vhacd_path: каталог, в котором должен быть установлен или уже установлен vhacd (по умолчанию blenderproc_resources/vhacd)
- --imgs 1: количество серий рендеринга (по 15 изображений каждая) на выходе (например, если imgs=100, то будет получено 1500 изображений)
Файл описания сцены обязательно должен содержать плоскость (с именем 'floor'), на которую будут сэмплированы объекты для обнаружения.
Должен быть собран пакет [darknet](https://github.com/AlexeyAB/darknet) для работы на заданном ПО и оборудовании (CPU, GPU ...)
---
## Обучение нейросети и получение файла с её весами
Команда для запуска:
```
darknet detector train [data] [cfg] [weight]
```
- data: файл с описанием датасета (*.data)
- cfg: файл с описанием нейросети
- weight: файл весов нейросети
Для обучения нужно загрузить файл с предобученными весами (162 MB): [yolov4.conv.137](https://github.com/AlexeyAB/darknet/releases/download/darknet_yolo_v3_optimal/yolov4.conv.137)
Для разного количества детектируемых объектов в выборке нужны свои файлы [data](https://gitlab.com/robossembler/framework/-/blob/master/ObjectDetection/yolov4_objs2.data) и [cfg](https://gitlab.com/robossembler/framework/-/blob/master/ObjectDetection/yolov4_objs2.cfg).
---
## Команда для обнаружения объектов нейросетью с обученными весами
* вариант 1 (в файле t.txt - список изображений):
```
darknet detector test yolov4_objs2.data yolov4_test.cfg yolov4_objs2_final.weights -dont_show -ext_output < t.txt > res.txt
```
* вариант 2 (файл 000015.jpg - тестовое изображение):
```
darknet detector test yolov4_objs2.data yolov4_test.cfg yolov4_objs2_final.weights -dont_show -ext_output 000015.jpg > res.txt
```
* вариант 3 (в файле t.txt - список изображений):
```
darknet detector test yolov4_objs2.data yolov4_test.cfg yolov4_objs2_final.weights -dont_show -ext_output -out res.json < t.txt
```
Файл res.txt после запуска варианта 2:
> net.optimized_memory = 0
> mini_batch = 1, batch = 1, time_steps = 1, train = 0
> Create CUDA-stream - 0
> Create cudnn-handle 0
> nms_kind: greedynms (1), beta = 0.600000
> nms_kind: greedynms (1), beta = 0.600000
> nms_kind: greedynms (1), beta = 0.600000
>
> seen 64, trained: 768 K-images (12 Kilo-batches_64)
> Detection layer: 139 - type = 28
> Detection layer: 150 - type = 28
> Detection layer: 161 - type = 28
>000015.jpg: Predicted in 620.357000 milli-seconds.
>fork.001: 94% (left_x: 145 top_y: -0 width: 38 height: 18)
>asm_element_edge.001: 28% (left_x: 195 top_y: 320 width: 40 height: 61)
>start_link.001: 87% (left_x: 197 top_y: 313 width: 39 height: 68)
>doking_link.001: 99% (left_x: 290 top_y: 220 width: 32 height: 21)
>start_link.001: 90% (left_x: 342 top_y: 198 width: 33 height: 34)
>doking_link.001: 80% (left_x: 342 top_y: 198 width: 32 height: 34)
>assemb_link.001: 100% (left_x: 426 top_y: 410 width: 45 height: 61)
Файл res.json после запуска варианта 3:
>[
{
"frame_id":1,
"filename":"img_test/000001.jpg",
"objects": [
{"class_id":5, "name":"asm_element_edge.001", "relative_coordinates":{"center_x":0.498933, "center_y":0.502946, "width":0.083075, "height":0.073736}, "confidence":0.999638},
{"class_id":4, "name":"grip-tool.001", "relative_coordinates":{"center_x":0.858856, "center_y":0.031339, "width":0.043919, "height":0.064563}, "confidence":0.996551}
]
},
{
"frame_id":2,
"filename":"img_test/000002.jpg",
"objects": [
{"class_id":1, "name":"start_link.001", "relative_coordinates":{"center_x":0.926026, "center_y":0.728457, "width":0.104029, "height":0.132757}, "confidence":0.995811},
{"class_id":0, "name":"assemb_link.001", "relative_coordinates":{"center_x":0.280403, "center_y":0.129059, "width":0.029980, "height":0.025067}, "confidence":0.916782}
]
}

View file

@ -0,0 +1,144 @@
import blenderproc as bproc
"""
obj2Yolov4dataset
Общая задача: обнаружение объекта (Object detection)
Реализуемая функция: создание датасета в формате YoloV4 для заданного объекта (*.obj)
Используется модуль blenderproc
24.01.2023 @shalenikol release 0.1
22.02.2023 @shalenikol release 0.2 исправлен расчёт x,y в convert2relative
"""
import numpy as np
import argparse
import random
import os
import shutil
import json
def convert2relative(height, width, bbox):
"""
YOLO format use relative coordinates for annotation
"""
x, y, w, h = bbox
x += w/2
y += h/2
return x/width, y/height, w/width, h/height
parser = argparse.ArgumentParser()
parser.add_argument('scene', nargs='?', default="resources/robossembler-asset.obj", help="Path to the object file.")
parser.add_argument('output_dir', nargs='?', default="output", help="Path to where the final files, will be saved")
parser.add_argument('--imgs', default=1, type=int, help="The number of times the objects should be rendered.")
args = parser.parse_args()
if not os.path.isdir(args.output_dir):
os.mkdir(args.output_dir)
bproc.init()
# load the objects into the scene
obj = bproc.loader.load_obj(args.scene)[0]
obj.set_cp("category_id", 1)
# Randomly perturbate the material of the object
mat = obj.get_materials()[0]
mat.set_principled_shader_value("Specular", random.uniform(0, 1))
mat.set_principled_shader_value("Roughness", random.uniform(0, 1))
mat.set_principled_shader_value("Base Color", np.random.uniform([0, 0, 0, 1], [1, 1, 1, 1]))
mat.set_principled_shader_value("Metallic", random.uniform(0, 1))
# Create a new light
light = bproc.types.Light()
light.set_type("POINT")
# Sample its location around the object
light.set_location(bproc.sampler.shell(
center=obj.get_location(),
radius_min=1,
radius_max=5,
elevation_min=1,
elevation_max=89
))
# Randomly set the color and energy
light.set_color(np.random.uniform([0.5, 0.5, 0.5], [1, 1, 1]))
light.set_energy(random.uniform(100, 1000))
bproc.camera.set_resolution(640, 480)
# Sample five camera poses
poses = 0
tries = 0
while tries < 10000 and poses < args.imgs:
# Sample random camera location around the object
location = bproc.sampler.shell(
center=obj.get_location(),
radius_min=1,
radius_max=4,
elevation_min=1,
elevation_max=89
)
# Compute rotation based lookat point which is placed randomly around the object
lookat_point = obj.get_location() + np.random.uniform([-0.5, -0.5, -0.5], [0.5, 0.5, 0.5])
rotation_matrix = bproc.camera.rotation_from_forward_vec(lookat_point - location, inplane_rot=np.random.uniform(-0.7854, 0.7854))
# Add homog cam pose based on location an rotation
cam2world_matrix = bproc.math.build_transformation_mat(location, rotation_matrix)
# Only add camera pose if object is still visible
if obj in bproc.camera.visible_objects(cam2world_matrix):
bproc.camera.add_camera_pose(cam2world_matrix)
poses += 1
tries += 1
# Enable transparency so the background becomes transparent
bproc.renderer.set_output_format(enable_transparency=True)
# add segmentation masks (per class and per instance)
bproc.renderer.enable_segmentation_output(map_by=["category_id", "instance", "name"])
# Render RGB images
data = bproc.renderer.render()
# Write data to coco file
res_dir = os.path.join(args.output_dir, 'coco_data')
bproc.writer.write_coco_annotations(res_dir,
instance_segmaps=data["instance_segmaps"],
instance_attribute_maps=data["instance_attribute_maps"],
color_file_format='JPEG',
colors=data["colors"],
append_to_existing_output=True)
#загрузим аннотацию
with open(os.path.join(res_dir,"coco_annotations.json"), "r") as fh:
y = json.load(fh)
# список имен объектов
with open(os.path.join(res_dir,"obj.names"), "w") as fh:
for cat in y["categories"]:
fh.write(cat["name"]+"\n")
# содадим или очистим папку data для датасета
res_data = os.path.join(res_dir, 'data')
if os.path.isdir(res_data):
for f in os.listdir(res_data):
os.remove(os.path.join(res_data, f))
else:
os.mkdir(res_data)
# список имен файлов с изображениями
s = []
with open(os.path.join(res_dir,"images.txt"), "w") as fh:
for i in y["images"]:
filename = i["file_name"]
shutil.copy(os.path.join(res_dir,filename),res_data)
fh.write(filename.replace('images','data')+"\n")
s.append((os.path.split(filename))[1])
# предполагается, что "images" и "annotations" следуют в одном и том же порядке
c = 0
for i in y["annotations"]:
bbox = i["bbox"]
im_h = i["height"]
im_w = i["width"]
rel = convert2relative(im_h,im_w,bbox)
fn = (os.path.splitext(s[c]))[0] # только имя файла
with open(os.path.join(res_data,fn+".txt"), "w") as fh:
# формат: <target> <x-center> <y-center> <width> <height>
fh.write("0 "+'{:-f} {:-f} {:-f} {:-f}'.format(rel[0],rel[1],rel[2],rel[3])+"\n")
c += 1

View file

@ -0,0 +1,296 @@
import blenderproc as bproc
"""
objs2Yolov4dataset
Общая задача: обнаружение объекта (Object detection)
Реализуемая функция: создание датасета в формате YoloV4 для серии заданных объектов (*.obj) в заданной сцене (*.blend)
Используется модуль blenderproc
17.02.2023 @shalenikol release 0.1
22.02.2023 @shalenikol release 0.2 исправлен расчёт x,y в convert2relative
"""
import sys
import numpy as np
import argparse
import random
import os
import shutil
import json
def convert2relative(height, width, bbox):
"""
YOLO format use relative coordinates for annotation
"""
x, y, w, h = bbox
x += w/2
y += h/2
return x/width, y/height, w/width, h/height
parser = argparse.ArgumentParser()
parser.add_argument('scene', nargs='?', default="resources/sklad.blend", help="Path to the scene object.")
parser.add_argument('obj_path', nargs='?', default="resources/in_obj", help="Path to the object files.")
parser.add_argument('output_dir', nargs='?', default="output", help="Path to where the final files, will be saved")
parser.add_argument('vhacd_path', nargs='?', default="blenderproc_resources/vhacd", help="The directory in which vhacd should be installed or is already installed.")
parser.add_argument('--imgs', default=2, type=int, help="The number of times the objects should be rendered.")
args = parser.parse_args()
if not os.path.isdir(args.obj_path):
print(f"{args.obj_path} : no object directory")
sys.exit()
if not os.path.isdir(args.output_dir):
os.mkdir(args.output_dir)
bproc.init()
# ? загрузим свет из сцены
#cam = bproc.loader.load_blend(args.scene, data_blocks=["cameras"])
#lights = bproc.loader.load_blend(args.scene, data_blocks=["lights"])
# загрузим объекты
list_files = os.listdir(args.obj_path)
meshs = []
i = 0
for f in list_files:
if (os.path.splitext(f))[1] == ".obj":
f = os.path.join(args.obj_path, f) # путь к файлу объекта
if os.path.isfile(f):
meshs += bproc.loader.load_obj(f)
i += 1
if i == 0:
print("Objects not found")
sys.exit()
for i,o in enumerate(meshs):
o.set_cp("category_id", i+1)
# загрузим сцену
scene = bproc.loader.load_blend(args.scene, data_blocks=["objects"])
#scene = bproc.loader.load_obj(args.scene)
# найдём пол
floor = None
for o in scene:
o.set_cp("category_id", 999)
s = o.get_name()
if s.find("floor") >= 0:
floor = o
if floor == None:
print("Floor not found in the scene")
sys.exit()
floor.enable_rigidbody(False, collision_shape='BOX')
objs = meshs + scene
for obj in meshs:
# Make the object actively participate in the physics simulation
obj.enable_rigidbody(active=True, collision_shape="COMPOUND")
# Also use convex decomposition as collision shapes
obj.build_convex_decomposition_collision_shape(args.vhacd_path)
with open(os.path.join(args.output_dir,"res.txt"), "w") as fh:
# fh.write(str(type(scene[0]))+"\n")
i = 0
for o in objs:
i += 1
loc = o.get_location()
euler = o.get_rotation_euler()
fh.write(f"{i} : {o.get_name()} {loc} {euler}\n")
# define a light and set its location and energy level
light = bproc.types.Light()
light.set_type("POINT")
light.set_location([5, -5, 5])
#light.set_energy(900)
#light.set_color([0.7, 0.7, 0.7])
light1 = bproc.types.Light(name="light1")
light1.set_type("SUN")
light1.set_location([0, 0, 0])
light1.set_rotation_euler([-0.063, 0.6177, -0.1985])
#light1.set_energy(7)
light1.set_color([1, 1, 1])
"""
# Sample its location around the object
light.set_location(bproc.sampler.shell(
center=obj.get_location(),
radius_min=2.5,
radius_max=5,
elevation_min=1,
elevation_max=89
))
"""
# define the camera intrinsics
bproc.camera.set_intrinsics_from_blender_params(1, 640, 480, lens_unit="FOV")
bproc.renderer.enable_segmentation_output(map_by=["category_id", "instance", "name"])
res_dir = os.path.join(args.output_dir, 'coco_data')
# Цикл рендеринга
n_cam_location = 5 # количество случайных локаций камеры
n_cam_poses = 3 # количество сэмплов для каждой локации камеры
# Do multiple times: Position the shapenet objects using the physics simulator and render X images with random camera poses
for r in range(args.imgs):
# Randomly set the color and energy
light.set_color(np.random.uniform([0.5, 0.5, 0.5], [1, 1, 1]))
light.set_energy(random.uniform(500, 1000))
light1.set_energy(random.uniform(3, 11))
for i,o in enumerate(objs):
mat = o.get_materials()[0]
mat.set_principled_shader_value("Specular", random.uniform(0, 1))
mat.set_principled_shader_value("Roughness", random.uniform(0, 1))
mat.set_principled_shader_value("Base Color", np.random.uniform([0, 0, 0, 1], [1, 1, 1, 1]))
mat.set_principled_shader_value("Metallic", random.uniform(0, 1))
# Clear all key frames from the previous run
bproc.utility.reset_keyframes()
# Define a function that samples 6-DoF poses
def sample_pose(obj: bproc.types.MeshObject):
obj.set_location(np.random.uniform([-1, -1.5, 0.2], [1, 2, 1.2])) #[-1, -1, 0], [1, 1, 2]))
obj.set_rotation_euler(bproc.sampler.uniformSO3())
# Sample the poses of all shapenet objects above the ground without any collisions in-between
bproc.object.sample_poses(meshs, objects_to_check_collisions = meshs + [floor], sample_pose_func = sample_pose)
# Run the simulation and fix the poses of the shapenet objects at the end
bproc.object.simulate_physics_and_fix_final_poses(min_simulation_time=4, max_simulation_time=20, check_object_interval=1)
# Find point of interest, all cam poses should look towards it
poi = bproc.object.compute_poi(meshs)
coord_max = [0.1, 0.1, 0.1]
coord_min = [0., 0., 0.]
with open(os.path.join(args.output_dir,"res.txt"), "a") as fh:
fh.write("*****************\n")
fh.write(f"{r}) poi = {poi}\n")
i = 0
for o in meshs:
i += 1
loc = o.get_location()
euler = o.get_rotation_euler()
fh.write(f" {i} : {o.get_name()} {loc} {euler}\n")
for j in range(3):
if loc[j] < coord_min[j]:
coord_min[j] = loc[j]
if loc[j] > coord_max[j]:
coord_max[j] = loc[j]
# Sample up to X camera poses
#an = np.random.uniform(0.78, 1.2) #1. #0.35
for i in range(n_cam_location):
# Sample location
location = bproc.sampler.shell(center=[0, 0, 0],
radius_min=1.1,
radius_max=3.3,
elevation_min=5,
elevation_max=89)
# координата, по которой будем сэмплировать положение камеры
j = random.randint(0, 2)
# разовый сдвиг по случайной координате
d = (coord_max[j] - coord_min[j]) / n_cam_poses
if location[j] < 0:
d = -d
for k in range(n_cam_poses):
# Compute rotation based on vector going from location towards poi
rotation_matrix = bproc.camera.rotation_from_forward_vec(poi - location, inplane_rot=np.random.uniform(-0.7854, 0.7854))
# Add homog cam pose based on location an rotation
cam2world_matrix = bproc.math.build_transformation_mat(location, rotation_matrix)
bproc.camera.add_camera_pose(cam2world_matrix)
location[j] -= d
#world_matrix = bproc.math.build_transformation_mat([2.3, -0.4, 0.66], [1.396, 0., an])
#bproc.camera.add_camera_pose(world_matrix)
#an += 0.2
# render the whole pipeline
data = bproc.renderer.render()
# Write data to coco file
bproc.writer.write_coco_annotations(res_dir,
instance_segmaps=data["instance_segmaps"],
instance_attribute_maps=data["instance_attribute_maps"],
color_file_format='JPEG',
colors=data["colors"],
append_to_existing_output=True)
#загрузим аннотацию
with open(os.path.join(res_dir,"coco_annotations.json"), "r") as fh:
y = json.load(fh)
# список имен объектов
n_obj = 0
obj_list = []
with open(os.path.join(res_dir,"obj.names"), "w") as fh:
for cat in y["categories"]:
if cat["id"] < 999:
n = cat["name"]
i = cat["id"]
obj_list.append([n,i,n_obj])
fh.write(n+"\n")
n_obj += 1
# содадим или очистим папку data для датасета
res_data = os.path.join(res_dir, 'data')
if os.path.isdir(res_data):
for f in os.listdir(res_data):
os.remove(os.path.join(res_data, f))
else:
os.mkdir(res_data)
# список имен файлов с изображениями
fn_image = os.path.join(res_dir,"images.txt")
img_list = []
with open(fn_image, "w") as fh:
for i in y["images"]:
filename = i["file_name"]
shutil.copy(os.path.join(res_dir,filename),res_data)
fh.write(filename.replace('images','data')+"\n")
img_list.append([i["id"], (os.path.split(filename))[1]])
# создадим 2 списка имен файлов для train и valid
n_image_in_series = n_cam_location * n_cam_poses # количество изображений в серии
i = 0
fh = open(fn_image, "r")
f1 = open(os.path.join(res_dir,"i_train.txt"), "w")
f2 = open(os.path.join(res_dir,"i_val.txt"), "w")
for line in fh:
i += 1
if i % n_image_in_series == 0:
f2.write(line)
else:
f1.write(line)
fh.close()
f1.close()
f2.close()
# заполним файлы с метками bbox
for i in y["annotations"]:
cat_id = i["category_id"]
if cat_id < 999:
im_id = i["image_id"]
bbox = i["bbox"]
im_h = i["height"]
im_w = i["width"]
rel = convert2relative(im_h,im_w,bbox)
# находим индекс списка с нужным изображением
j = next(k for k, (x, _) in enumerate(img_list) if x == im_id)
filename = img_list[j][1]
fn = (os.path.splitext(filename))[0] # только имя файла
with open(os.path.join(res_data,fn+".txt"), "a") as fh:
# находим индекс списка с нужным объектом
j = next(k for k, (_, x, _) in enumerate(obj_list) if x == cat_id)
# формат: <target> <x-center> <y-center> <width> <height>
fh.write(f"{obj_list[j][2]} {rel[0]} {rel[1]} {rel[2]} {rel[3]}\n")
# создадим файл описания датасета для darknet
with open(os.path.join(res_dir,"yolov4_objs2.data"), "w") as fh:
fh.write(f"classes = {n_obj}\n")
fh.write("train = i_train.txt\n")
fh.write("valid = i_val.txt\n")
fh.write("names = obj.names\n")
fh.write("backup = backup\n")
fh.write("eval = coco\n")

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,7 @@
classes= 1
train = i_train.txt
valid = i_val.txt
names = obj.names
backup = backup
eval=coco

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,7 @@
classes= 6
train = i_train.txt
valid = i_val.txt
names = obj.names
backup = backup
eval=coco

File diff suppressed because it is too large Load diff

View file

View file

@ -0,0 +1,44 @@
---
id: BOP_dataset
title: script for create BOP dataset
---
## Структура входных данных:
```
<example_dir>/
input_obj/asm_element_edge.mtl # файл материала
input_obj/asm_element_edge.obj # меш-объект
input_obj/fork.mtl
input_obj/fork.obj
input_obj/...
resources/sklad.blend # файл сцены
objs2BOPdataset.py # этот скрипт
```
## Пример команды запуска скрипта:
```
cd <example_dir>/
blenderproc run objs2BOPdataset.py resources/sklad.blend input_obj output --imgs 333
```
- resources/sklad.blend : файл сцены
- input_obj : каталог с меш-файлами
- output : выходной каталог
- imgs : количество пакетов по 9 кадров в каждом (в примере 333 * 9 = 2997)
## Структура BOP датасета на выходе:
```
output/
bop_data/
train_pbr/
000000/
depth/... # файлы глубины
mask/... # файлы маски
mask_visib/... # файлы маски видимости
rgb/... # файлы изображений RGB
scene_camera.json
scene_gt.json
scene_gt_coco.json
scene_gt_info.json
camera.json # внутренние параметры камеры (для всего датасета)
res.txt # протокол создания пакетов датасета
```

View file

@ -0,0 +1,261 @@
import blenderproc as bproc
"""
objs2BOPdataset
Общая задача: распознавание 6D позы объекта (6D pose estimation)
Реализуемая функция: создание датасета в формате BOP для серии заданных объектов (*.obj) в заданной сцене (*.blend)
Используется модуль blenderproc
29.08.2023 @shalenikol release 0.1
12.10.2023 @shalenikol release 0.2
"""
import sys
import numpy as np
import argparse
import random
import os
import shutil
import json
Not_Categories_Name = True # наименование категории в аннотации отсутствует
def convert2relative(height, width, bbox):
"""
YOLO format use relative coordinates for annotation
"""
x, y, w, h = bbox
x += w/2
y += h/2
return x/width, y/height, w/width, h/height
parser = argparse.ArgumentParser()
parser.add_argument('scene', nargs='?', default="resources/sklad.blend", help="Path to the scene object.")
parser.add_argument('obj_path', nargs='?', default="resources/in_obj", help="Path to the object files.")
parser.add_argument('output_dir', nargs='?', default="output", help="Path to where the final files, will be saved")
parser.add_argument('vhacd_path', nargs='?', default="blenderproc_resources/vhacd", help="The directory in which vhacd should be installed or is already installed.")
parser.add_argument('-single_object', nargs='?', type= bool, default=True, help="One object per frame.")
parser.add_argument('--imgs', default=2, type=int, help="The number of times the objects should be rendered.")
args = parser.parse_args()
if not os.path.isdir(args.obj_path):
print(f"{args.obj_path} : no object directory")
sys.exit()
if not os.path.isdir(args.output_dir):
os.mkdir(args.output_dir)
single_object = args.single_object
bproc.init()
# ? загрузим свет из сцены
#cam = bproc.loader.load_blend(args.scene, data_blocks=["cameras"])
#lights = bproc.loader.load_blend(args.scene, data_blocks=["lights"])
# загрузим объекты
list_files = os.listdir(args.obj_path)
obj_names = []
obj_filenames = []
all_meshs = []
nObj = 0
for f in list_files:
if (os.path.splitext(f))[1] == ".obj":
f = os.path.join(args.obj_path, f) # путь к файлу объекта
if os.path.isfile(f):
obj = bproc.loader.load_obj(f)
all_meshs += obj
obj_names += [obj[0].get_name()]
obj_filenames += [f]
nObj += 1
if nObj == 0:
print("Objects not found")
sys.exit()
for i,obj in enumerate(all_meshs):
#print(f"{i} *** {obj}")
obj.set_cp("category_id", i+1)
# загрузим сцену
scene = bproc.loader.load_blend(args.scene, data_blocks=["objects"])
# найдём объекты коллизии (пол и т.д.)
obj_type = ["floor", "obj"]
collision_objects = []
#floor = None
for o in scene:
o.set_cp("category_id", 999)
s = o.get_name()
for type in obj_type:
if s.find(type) >= 0:
collision_objects += [o]
o.enable_rigidbody(False, collision_shape='BOX')
if not collision_objects:
print("Collision objects not found in the scene")
sys.exit()
#floor.enable_rigidbody(False, collision_shape='BOX')
for obj in all_meshs:
# Make the object actively participate in the physics simulation
obj.enable_rigidbody(active=True, collision_shape="COMPOUND")
# Also use convex decomposition as collision shapes
obj.build_convex_decomposition_collision_shape(args.vhacd_path)
objs = all_meshs + scene
with open(os.path.join(args.output_dir,"res.txt"), "w") as fh:
# fh.write(str(type(scene[0]))+"\n")
i = 0
for o in objs:
i += 1
loc = o.get_location()
euler = o.get_rotation_euler()
fh.write(f"{i} : {o.get_name()} {loc} {euler} category_id = {o.get_cp('category_id')}\n")
# define a light and set its location and energy level
light = bproc.types.Light()
light.set_type("POINT")
light.set_location([5, -5, 5])
#light.set_energy(900)
#light.set_color([0.7, 0.7, 0.7])
light1 = bproc.types.Light(name="light1")
light1.set_type("SUN")
light1.set_location([0, 0, 0])
light1.set_rotation_euler([-0.063, 0.6177, -0.1985])
#light1.set_energy(7)
light1.set_color([1, 1, 1])
# define the camera intrinsics
bproc.camera.set_intrinsics_from_blender_params(1, 640, 480, lens_unit="FOV")
# add segmentation masks (per class and per instance)
bproc.renderer.enable_segmentation_output(map_by=["category_id", "instance", "name"])
#bproc.renderer.enable_segmentation_output(map_by=["category_id", "instance", "name", "bop_dataset_name"],
# default_values={"category_id": 0, "bop_dataset_name": None})
# activate depth rendering
bproc.renderer.enable_depth_output(activate_antialiasing=False)
res_dir = os.path.join(args.output_dir, "bop_data")
if os.path.isdir(res_dir):
shutil.rmtree(res_dir)
# Цикл рендеринга
n_cam_location = 3 #5 # количество случайных локаций камеры
n_cam_poses = 3 #3 # количество сэмплов для каждой локации камеры
# Do multiple times: Position the shapenet objects using the physics simulator and render X images with random camera poses
for r in range(args.imgs):
# один случайный объект в кадре / все заданные объекты
meshs = [random.choice(all_meshs)] if single_object else all_meshs[:]
# Randomly set the color and energy
light.set_color(np.random.uniform([0.5, 0.5, 0.5], [1, 1, 1]))
light.set_energy(random.uniform(500, 1000))
light1.set_energy(random.uniform(3, 11))
for i,o in enumerate(meshs): #objs
mat = o.get_materials()[0]
mat.set_principled_shader_value("Specular", random.uniform(0, 1))
mat.set_principled_shader_value("Roughness", random.uniform(0, 1))
mat.set_principled_shader_value("Base Color", np.random.uniform([0, 0, 0, 1], [1, 1, 1, 1]))
mat.set_principled_shader_value("Metallic", random.uniform(0, 1))
# Clear all key frames from the previous run
bproc.utility.reset_keyframes()
# Define a function that samples 6-DoF poses
def sample_pose(obj: bproc.types.MeshObject):
obj.set_location(np.random.uniform([-1, -1.5, 0.2], [1, 2, 1.2])) #[-1, -1, 0], [1, 1, 2]))
obj.set_rotation_euler(bproc.sampler.uniformSO3())
# Sample the poses of all shapenet objects above the ground without any collisions in-between
#bproc.object.sample_poses(meshs, objects_to_check_collisions = meshs + [floor], sample_pose_func = sample_pose)
bproc.object.sample_poses(meshs, objects_to_check_collisions = meshs + collision_objects, sample_pose_func = sample_pose)
# Run the simulation and fix the poses of the shapenet objects at the end
bproc.object.simulate_physics_and_fix_final_poses(min_simulation_time=4, max_simulation_time=20, check_object_interval=1)
# Find point of interest, all cam poses should look towards it
poi = bproc.object.compute_poi(meshs)
coord_max = [0.1, 0.1, 0.1]
coord_min = [0., 0., 0.]
with open(os.path.join(args.output_dir,"res.txt"), "a") as fh:
fh.write("*****************\n")
fh.write(f"{r}) poi = {poi}\n")
i = 0
for o in meshs:
i += 1
loc = o.get_location()
euler = o.get_rotation_euler()
fh.write(f" {i} : {o.get_name()} {loc} {euler}\n")
for j in range(3):
if loc[j] < coord_min[j]:
coord_min[j] = loc[j]
if loc[j] > coord_max[j]:
coord_max[j] = loc[j]
# Sample up to X camera poses
#an = np.random.uniform(0.78, 1.2) #1. #0.35
for i in range(n_cam_location):
# Sample location
location = bproc.sampler.shell(center=[0, 0, 0],
radius_min=1.1,
radius_max=2.2,
elevation_min=5,
elevation_max=89)
# координата, по которой будем сэмплировать положение камеры
j = random.randint(0, 2)
# разовый сдвиг по случайной координате
d = (coord_max[j] - coord_min[j]) / n_cam_poses
if location[j] < 0:
d = -d
for k in range(n_cam_poses):
# Compute rotation based on vector going from location towards poi
rotation_matrix = bproc.camera.rotation_from_forward_vec(poi - location, inplane_rot=np.random.uniform(-0.7854, 0.7854))
# Add homog cam pose based on location an rotation
cam2world_matrix = bproc.math.build_transformation_mat(location, rotation_matrix)
bproc.camera.add_camera_pose(cam2world_matrix)
location[j] -= d
#world_matrix = bproc.math.build_transformation_mat([2.3, -0.4, 0.66], [1.396, 0., an])
#bproc.camera.add_camera_pose(world_matrix)
#an += 0.2
# render the whole pipeline
data = bproc.renderer.render()
# Write data to bop format
bproc.writer.write_bop(res_dir,
target_objects = all_meshs, # Optional[List[MeshObject]] = None
depths = data["depth"],
depth_scale = 1.0,
colors = data["colors"],
color_file_format='JPEG',
append_to_existing_output = (r>0),
save_world2cam = False) # world coords are arbitrary in most real BOP datasets
# dataset="robo_ds",
"""
!!! categories -> name берётся из category_id !!!
см.ниже
blenderproc.python.writer : BopWriterUtility.py
class _BopWriterUtility
def calc_gt_coco
...
CATEGORIES = [{'id': obj.get_cp('category_id'), 'name': str(obj.get_cp('category_id')), 'supercategory':
dataset_name} for obj in dataset_objects]
поэтому заменим наименование категории в аннотации
"""
if Not_Categories_Name:
coco_file = os.path.join(res_dir,"train_pbr/000000/scene_gt_coco.json")
with open(coco_file, "r") as fh:
data = json.load(fh)
cats = data["categories"]
#print(f"type(cat) = {type(cat)} cat : {cat}")
i = 0
for cat in cats:
cat["name"] = obj_names[i]
i += 1
#print(cat)
with open(coco_file, "w") as fh:
json.dump(data, fh, indent=0)

View file

@ -0,0 +1,97 @@
from types import LambdaType, UnionType
from returns.pipeline import is_successful
from typing import List, TypeVar
from returns.result import Result, Success, Failure
import os
from model.robossembler_assets import (
MappingInstanceAtModel,
RobossemblerAssets,
Instance,
)
import re
import pathlib
from repository.file_system import FileSystemRepository
from model.robossembler_assets import Physics
from argparse import ArgumentParser
T = TypeVar("T")
class JsonReaderAndModelMapperUseCase:
def call(path: str, model: T) -> Result[T, str]:
try:
if not re.search("^(.+)\/([^\/]+)$", path):
return Failure("path not valid")
if model.from_dict == None:
return Failure("Model is not have mapping method from_dict")
return Success(model.from_dict(FileSystemRepository.readJSON(path=path)))
except:
return Failure("JsonReaderAndModelMapperUseCase unknown error")
class MappingInstanceAtModelToSdfUseCase:
def call(instances: List[MappingInstanceAtModel]) -> Result[List[str], str]:
try:
return Success(list(map(lambda el: el.toSDF(), instances)))
except:
return Failure("MappingInstanceAtModelToSdfUseCase unknown error")
class MappingSdfWorldToPhysicsModelUseCase:
def call(physicModel: Physics) -> Result[List[str], str]:
try:
return Success(Physics.toSDF(physicModel))
except:
return Failure("MappingInstanceAtModelToSdfUseCase unknown error")
class FormationOfTheSDFUseCase:
def call(worldTag: str, modelsTags: List[str], path: str) -> Result[bool, str]:
path = str(pathlib.Path(path).parent.resolve()) + "/"
if modelsTags == None:
return Failure("FormationOfTheSDFUseCase modelsTags is None")
if worldTag == None:
return Failure("FormationOfTheSDFUseCase worldTag is None")
FileSystemRepository.writeFile(
data=worldTag.replace("{models}", "\n".join(modelsTags)),
filePath=path,
fileName="world.sdf",
)
return Success(True)
def main():
parser = ArgumentParser()
parser.add_argument("--path", help="need path .json")
args = parser.parse_args()
if args.path == None:
parser.print_help()
return
path = args.path
jsonReaderAndModelMapperUseCase = JsonReaderAndModelMapperUseCase.call(
path=path, model=RobossemblerAssets
)
if not is_successful(jsonReaderAndModelMapperUseCase):
return
robossemblerAssets = jsonReaderAndModelMapperUseCase.value_or(None)
instanceSdfModel = MappingInstanceAtModelToSdfUseCase.call(
instances=robossemblerAssets.getAllAssetsInstanceAtModel()
)
sdfWorld = MappingSdfWorldToPhysicsModelUseCase.call(
physicModel=robossemblerAssets.physics
)
FormationOfTheSDFUseCase.call(
worldTag=sdfWorld.value_or(None),
modelsTags=instanceSdfModel.value_or(None),
path=path,
)
main()

View file

@ -0,0 +1,12 @@
<light type="{type_light}" name="{name_light}">
<pose>{x} {y} {z} {roll} {pitch} {yaw}</pose>
<diffuse>{r} {g} {b} {a}</diffuse>
<specular>.1 .1 .1 1</specular>
<attenuation>
<range>20</range>
<linear>0.2</linear>
<constant>0.8</constant>
<quadratic>0.01</quadratic>
</attenuation>
<cast_shadows>false</cast_shadows>
</light>

View file

@ -0,0 +1,7 @@
<model name="{name}">
<pose>{x} {y} {z} {roll} {pitch} {yaw}</pose>
<include>
<name>{name}</name>
<uri>model://{uri}</uri>
</include>
</model>

View file

@ -0,0 +1,105 @@
<?xml version="1.0"?>
<sdf version='1.9'>
<world name='mir'>
<physics name='1ms' type='{engine_type}'>
<max_step_size>0.001</max_step_size>
<real_time_factor>1.0</real_time_factor>
<real_time_update_rate>1000</real_time_update_rate>
</physics>
<plugin
name='ignition::gazebo::systems::Physics' filename='ignition-gazebo-physics-system' />
<plugin
name='ignition::gazebo::systems::UserCommands'
filename='ignition-gazebo-user-commands-system' />
<plugin
name='ignition::gazebo::systems::SceneBroadcaster'
filename='ignition-gazebo-scene-broadcaster-system' />
<plugin
name='ignition::gazebo::systems::Contact' filename='ignition-gazebo-contact-system' />
<plugin
name="ignition::gazebo::systems::Sensors" filename="ignition-gazebo-sensors-system">
<render_engine>ogre2</render_engine>
</plugin>
<gravity>{gravity_x} {gravity_y} {gravity_z}</gravity>
<magnetic_field>6e-06
2.3e-05 -4.2e-05</magnetic_field>
<atmosphere type='adiabatic' />
<scene>
<ambient>0.4 0.4 0.4 1</ambient>
<background>0.7 0.7 0.7 1</background>
<shadows>false</shadows>
</scene>
<gui fullscreen="0">
<plugin filename="GzScene3D" name="3D View">
<ignition-gui>
<title>3D View</title>
<property type="bool" key="showTitleBar">false</property>
<property type="string" key="state">docked</property>
</ignition-gui>
<engine>ogre2</engine>
<scene>scene</scene>
<ambient_light>1.0 1.0 1.0</ambient_light>
<background_color>0.4 0.6 1.0</background_color>
<camera_pose>3.3 2.8 2.8 0 0.5 -2.4</camera_pose>
</plugin>
<plugin filename="WorldStats" name="World stats">
<ignition-gui>
<title>World stats</title>
<property type="bool" key="showTitleBar">false</property>
<property type="bool" key="resizable">false</property>
<property type="double" key="height">110</property>
<property type="double" key="width">290</property>
<property type="double" key="z">1</property>
<property type="string" key="state">floating</property>
<anchors target="3D View">
<line own="right" target="right" />
<line own="bottom" target="bottom" />
</anchors>
</ignition-gui>
<sim_time>true</sim_time>
<real_time>true</real_time>
<real_time_factor>true</real_time_factor>
<iterations>true</iterations>
</plugin>
</gui>
<light type="directional" name="sun">
<cast_shadows>true</cast_shadows>
<pose>0 0 10 0 0 0</pose>
<diffuse>0.8 0.8 0.8 1</diffuse>
<specular>0.2 0.2 0.2 1</specular>
<attenuation>
<range>1000</range>
<constant>0.9</constant>
<linear>0.01</linear>
<quadratic>0.001</quadratic>
</attenuation>
<direction>-0.5 0.1 -0.9</direction>
</light>
<model name='ground'>
<static>true</static>
<link name="link">
<collision name="collision">
<geometry>
<plane>
<normal>0 0 1</normal>
</plane>
</geometry>
</collision>
<visual name="visual">
<geometry>
<plane>
<normal>0 0 1</normal>
<size>100 100</size>
</plane>
</geometry>
<material>
<ambient>0.8 0.8 0.8 1</ambient>
<diffuse>0.8 0.8 0.8 1</diffuse>
<specular>0.8 0.8 0.8 1</specular>
</material>
</visual>
</link>
</model>
{models} </world>
</sdf>

View file

@ -0,0 +1,394 @@
from dataclasses import dataclass
import os
from returns.result import Result, Success, Failure
from typing import Optional, Any, List, TypeVar, Callable, Type, cast
from enum import Enum
from repository.file_system import FileSystemRepository
T = TypeVar("T")
EnumT = TypeVar("EnumT", bound=Enum)
def from_float(x: Any) -> float:
assert isinstance(x, (float, int)) and not isinstance(x, bool)
return float(x)
def from_none(x: Any) -> Any:
return x
def from_union(fs, x):
for f in fs:
try:
return f(x)
except:
pass
assert False
def to_float(x: Any) -> float:
assert isinstance(x, float)
return x
def from_str(x: Any) -> str:
assert isinstance(x, str)
return x
def from_int(x: Any) -> int:
assert isinstance(x, int) and not isinstance(x, bool)
return x
def from_list(f: Callable[[Any], T], x: Any) -> List[T]:
assert isinstance(x, list)
return [f(y) for y in x]
def to_class(c: Type[T], x: Any) -> dict:
assert isinstance(x, c)
return cast(Any, x).to_dict()
def to_enum(c: Type[EnumT], x: Any) -> EnumT:
assert isinstance(x, c)
return x.value
@dataclass
class Model:
name: Optional[str] = None
id: Optional[str] = None
path: Optional[str] = None
@staticmethod
def from_dict(obj: Any) -> "Model":
assert isinstance(obj, dict)
name = from_union([from_str, from_none], obj.get("name"))
id = from_union([from_str, from_none], obj.get("id"))
path = from_union([from_str, from_none], obj.get("path"))
return Model(name, id, path)
def to_dict(self) -> dict:
result: dict = {}
if self.name is not None:
result["name"] = from_union([from_str, from_none], self.name)
if self.id is not None:
result["id"] = from_union([from_str, from_none], self.id)
if self.path is not None:
result["path"] = from_union([from_str, from_none], self.path)
return result
@dataclass
class Pose:
x: Optional[float] = None
y: Optional[float] = None
z: Optional[float] = None
roll: Optional[float] = None
pitch: Optional[float] = None
yaw: Optional[float] = None
@staticmethod
def from_dict(obj: Any) -> "Pose":
assert isinstance(obj, dict)
x = from_union([from_float, from_none], obj.get("x"))
y = from_union([from_float, from_none], obj.get("y"))
z = from_union([from_float, from_none], obj.get("z"))
roll = from_union([from_float, from_none], obj.get("roll"))
pitch = from_union([from_float, from_none], obj.get("pitch"))
yaw = from_union([from_float, from_none], obj.get("yaw"))
return Pose(x, y, z, roll, pitch, yaw)
def to_dict(self) -> dict:
result: dict = {}
if self.x is not None:
result["x"] = from_union([to_float, from_none], self.x)
if self.y is not None:
result["y"] = from_union([to_float, from_none], self.y)
if self.z is not None:
result["z"] = from_union([to_float, from_none], self.z)
if self.roll is not None:
result["roll"] = from_union([to_float, from_none], self.roll)
if self.pitch is not None:
result["pitch"] = from_union([to_float, from_none], self.pitch)
if self.yaw is not None:
result["yaw"] = from_union([to_float, from_none], self.yaw)
return result
class TypeEnum(Enum):
ASSET = "asset"
LIGHT = "light"
@dataclass
class Instance:
model_name: Optional[str] = None
model_id: Optional[str] = None
id: Optional[str] = None
pose: Optional[Pose] = None
scale: Optional[int] = None
type: Optional[TypeEnum] = None
parent: Optional[str] = None
light_type: Optional[str] = None
intencity: Optional[int] = None
diffuse: Optional[List[float]] = None
spot_angle: Optional[int] = None
@staticmethod
def from_dict(obj: Any) -> "Instance":
assert isinstance(obj, dict)
model_name = from_union([from_str, from_none], obj.get("model_name"))
model_id = from_union([from_str, from_none], obj.get("model_id"))
id = from_union([from_str, from_none], obj.get("id"))
pose = from_union([Pose.from_dict, from_none], obj.get("pose"))
scale = from_union([from_int, from_none], obj.get("scale"))
type = from_union([TypeEnum, from_none], obj.get("type"))
parent = from_union([from_str, from_none], obj.get("parent"))
light_type = from_union([from_str, from_none], obj.get("light_type"))
intencity = from_union([from_int, from_none], obj.get("intencity"))
diffuse = from_union(
[lambda x: from_list(from_float, x), from_none], obj.get("diffuse")
)
spot_angle = from_union([from_int, from_none], obj.get("spot_angle"))
return Instance(
model_name,
model_id,
id,
pose,
scale,
type,
parent,
light_type,
intencity,
diffuse,
spot_angle,
)
def fromMappingInstanceAtModel(
self, models: List[Model]
) -> "MappingInstanceAtModel":
for el in models:
if el.id == self.model_id:
return MappingInstanceAtModel(instance=self, model=el)
return Failure("not found model at {self.model_id} ")
def to_dict(self) -> dict:
result: dict = {}
if self.model_name is not None:
result["model_name"] = from_union([from_str, from_none], self.model_name)
if self.model_id is not None:
result["model_id"] = from_union([from_str, from_none], self.model_id)
if self.id is not None:
result["id"] = from_union([from_str, from_none], self.id)
if self.pose is not None:
result["pose"] = from_union(
[lambda x: to_class(Pose, x), from_none], self.pose
)
if self.scale is not None:
result["scale"] = from_union([from_int, from_none], self.scale)
if self.type is not None:
result["type"] = from_union(
[lambda x: to_enum(TypeEnum, x), from_none], self.type
)
if self.parent is not None:
result["parent"] = from_union([from_str, from_none], self.parent)
if self.light_type is not None:
result["light_type"] = from_union([from_str, from_none], self.light_type)
if self.intencity is not None:
result["intencity"] = from_union([from_int, from_none], self.intencity)
if self.diffuse is not None:
result["diffuse"] = from_union(
[lambda x: from_list(to_float, x), from_none], self.diffuse
)
if self.spot_angle is not None:
result["spot_angle"] = from_union([from_int, from_none], self.spot_angle)
return result
class BasePose:
def __init__(self, x: float, y: float, z: float, **kwargs):
self.x = x
self.y = y
self.z = z
def toPose(self, sdfXmlMock: str):
return (
sdfXmlMock.replace("{x}", str(self.x))
.replace("{y}", str(self.y))
.replace("{z}", str(self.z))
)
class MappingInstanceAtModel(BasePose):
instance: Instance
model: Model
def __init__(self, instance: Instance, model: Model) -> None:
self.instance = instance
self.model = model
pass
def toSDF(self):
pose = self.instance.pose
match self.instance.type:
case TypeEnum.ASSET:
mock = FileSystemRepository.readFile(
os.path.dirname(os.path.realpath(__file__))
+ "/../mocks/model_include_sdf.xml"
)
# mockPose = self.toPose(mock)
return (
mock.replace("{name}", str(self.model.name))
.replace("{x}", str(pose.x))
.replace("{y}", str(pose.y))
.replace("{z}", str(pose.z))
.replace("{pitch}", str(pose.pitch))
.replace("{yaw}", str(pose.yaw))
.replace("{roll}", str(pose.roll))
.replace("{uri}", str(self.model.path))
)
case TypeEnum.LIGHT:
pathMock = (
os.path.dirname(os.path.realpath(__file__))
+ "/../mocks/light_sdf.xml"
)
return (
FileSystemRepository.readFile(pathMock)
.replace("{x}", str(pose.x))
.replace("{y}", str(pose.y))
.replace("{z}", str(pose.z))
.replace("{pitch}", str(pose.pitch))
.replace("{yaw}", str(pose.yaw))
.replace("{roll}", str(pose.roll))
.replace("{type_light}", str(self.instance.light_type))
.replace("{name_light}", str("132"))
.replace("{r}", self.instance.diffuse[0])
.replace("{g}", self.instance.diffuse[1])
.replace("{b}", self.instance.diffuse[2])
.replace("{a}", self.instance.diffuse[3])
)
@dataclass
class Gravity:
x: Optional[int] = None
y: Optional[int] = None
z: Optional[float] = None
@staticmethod
def from_dict(obj: Any) -> "Gravity":
assert isinstance(obj, dict)
x = from_union([from_int, from_none], obj.get("x"))
y = from_union([from_int, from_none], obj.get("y"))
z = from_union([from_float, from_none], obj.get("z"))
return Gravity(x, y, z)
def to_dict(self) -> dict:
result: dict = {}
if self.x is not None:
result["x"] = from_union([from_int, from_none], self.x)
if self.y is not None:
result["y"] = from_union([from_int, from_none], self.y)
if self.z is not None:
result["z"] = from_union([to_float, from_none], self.z)
return result
@dataclass
class Physics:
engine_name: Optional[str] = None
gravity: Optional[Gravity] = None
@staticmethod
def from_dict(obj: Any) -> "Physics":
assert isinstance(obj, dict)
engine_name = from_union([from_str, from_none], obj.get("engine_name"))
gravity = from_union([Gravity.from_dict, from_none], obj.get("gravity"))
return Physics(engine_name, gravity)
def to_dict(self) -> dict:
result: dict = {}
if self.engine_name is not None:
result["engine_name"] = from_union([from_str, from_none], self.engine_name)
if self.gravity is not None:
result["gravity"] = from_union(
[lambda x: to_class(Gravity, x), from_none], self.gravity
)
return result
def toSDF(self) -> str:
pathMock = os.path.dirname(os.path.realpath(__file__)) + "/../mocks/world.xml"
gravity = self.gravity
return (
FileSystemRepository.readFile(pathMock)
.replace("{gravity_x}", str(gravity.x))
.replace("{gravity_y}", str(gravity.y))
.replace("{gravity_z}", str(gravity.z))
.replace("{engine_type}", str(self.engine_name))
)
@dataclass
class RobossemblerAssets:
models: Optional[List[Model]] = None
instances: Optional[List[Instance]] = None
physics: Optional[Physics] = None
@staticmethod
def from_dict(obj: Any) -> "RobossemblerAssets":
assert isinstance(obj, dict)
models = from_union(
[lambda x: from_list(Model.from_dict, x), from_none], obj.get("models")
)
instances = from_union(
[lambda x: from_list(Instance.from_dict, x), from_none],
obj.get("instances"),
)
physics = from_union([Physics.from_dict, from_none], obj.get("physics"))
return RobossemblerAssets(models, instances, physics)
def to_dict(self) -> dict:
result: dict = {}
if self.models is not None:
result["models"] = from_union(
[lambda x: from_list(lambda x: to_class(Model, x), x), from_none],
self.models,
)
if self.instances is not None:
result["instances"] = from_union(
[lambda x: from_list(lambda x: to_class(Instance, x), x), from_none],
self.instances,
)
if self.physics is not None:
result["physics"] = from_union(
[lambda x: to_class(Physics, x), from_none], self.physics
)
return result
def _getAllAtType(self, type: TypeEnum) -> List[Instance]:
return list(filter(lambda x: x.type == type, self.instances))
def getAllLightInstances(self) -> List[Instance]:
return list(
map(
lambda el: el.fromMappingInstanceAtModel(self.models),
self._getAllAtType(type=TypeEnum.LIGHT),
)
)
def getAllAssetsInstanceAtModel(self) -> List[MappingInstanceAtModel]:
return list(
map(
lambda el: el.fromMappingInstanceAtModel(self.models),
self._getAllAtType(type=TypeEnum.ASSET),
)
)

View file

@ -0,0 +1,25 @@
import json
import os
class FileSystemRepository:
def readJSON(path: str):
return json.loads((open(path)).read())
def writeFile(data, filePath, fileName):
file_to_open = filePath + fileName
f = open(file_to_open, "w", encoding="utf-8", errors="ignore")
f.write(data)
f.close()
def readFile(path: str):
return open(path).read()
def readFilesTypeFolder(pathFolder: str, fileType=".json"):
filesJson = list(
filter(
lambda x: x[-fileType.__len__() :] == fileType, os.listdir(pathFolder)
)
)
return filesJson

View file

@ -0,0 +1,14 @@
import argparse
from usecases.stability_check_usecase import StabilityCheckUseCase
# python3 main.py --aspPath /Users/idontsudo/Desktop/asp-example/
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--aspPath", help="asp folder generation path")
args = parser.parse_args()
StabilityCheckUseCase().call(args.aspPath)
main()

View file

@ -0,0 +1,229 @@
from typing import Any, List, TypeVar, Type, cast, Callable
import numpy as np
import pybullet as p
import time
import pybullet_data
import os
import json
from time import sleep
T = TypeVar("T")
def from_str(x):
assert isinstance(x, str)
return x
def from_float(x: Any) -> float:
assert isinstance(x, (float, int)) and not isinstance(x, bool)
return float(x)
def to_float(x: Any) -> float:
assert isinstance(x, float)
return x
def from_int(x: Any) -> int:
assert isinstance(x, int) and not isinstance(x, bool)
return x
def to_class(c: Type[T], x: Any) -> dict:
assert isinstance(x, c)
return cast(Any, x).to_dict()
def from_list(f: Callable[[Any], T], x: Any) -> List[T]:
assert isinstance(x, list)
return [f(y) for y in x]
class Coords:
x: float
y: float
z: float
def __init__(self, x: float, y: float, z: float) -> None:
self.x = x
self.y = y
self.z = z
@staticmethod
def from_dict(obj: Any) -> "Coords":
assert isinstance(obj, dict)
x = from_float(obj.get("x"))
y = from_float(obj.get("y"))
z = from_float(obj.get("z"))
return Coords(x, y, z)
def to_dict(self) -> dict:
result: dict = {}
result["x"] = to_float(self.x)
result["y"] = to_float(self.y)
result["z"] = to_float(self.z)
return result
class SimulatorStabilityResultModel:
id: str
quaternion: Coords
position: Coords
def __init__(self, id: int, quaternion: Coords, position: Coords) -> None:
self.id = id
self.quaternion = quaternion
self.position = position
@staticmethod
def from_dict(obj: Any) -> "SimulatorStabilityResultModel":
assert isinstance(obj, dict)
id = from_str(obj.get("id"))
quaternion = Coords.from_dict(obj.get("quaternion"))
position = Coords.from_dict(obj.get("position"))
return SimulatorStabilityResultModel(id, quaternion, position)
def to_dict(self) -> dict:
result: dict = {}
result["id"] = from_str(self.id)
result["quaternion"] = to_class(Coords, self.quaternion)
result["position"] = to_class(Coords, self.position)
return result
def SimulatorStabilityModelfromdict(s: Any) -> List[SimulatorStabilityResultModel]:
return from_list(SimulatorStabilityResultModel.from_dict, s)
def SimulatorStabilityModeltodict(x: List[SimulatorStabilityResultModel]) -> Any:
return from_list(lambda x: to_class(SimulatorStabilityResultModel, x), x)
class StabilityCheckUseCase:
def urdfLoader(
self, assembly: list[str], outPath: str, urdfGeneration: dict[str:str]
):
urdfs = []
for assemblyCount in range(len(assembly)):
urdf = urdfGeneration.get(assembly[assemblyCount])
file_to_open = outPath + "/generation/" + str(assemblyCount) + ".urdf"
f = open(file_to_open, "w", encoding="utf-8", errors="ignore")
f.write(urdf)
f.close()
urdfs.append(os.path.abspath(f.name))
return urdfs
def executeSimulation(
self,
assembly: list[str],
outPath: str,
urdfGeneration: dict[str:str],
duration: int,
) -> list["SimulatorStabilityResultModel"]:
p.connect(p.DIRECT)
p.setGravity(0, 0, -10)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.loadURDF("plane.urdf")
resultCoords = []
urdfs = self.urdfLoader(
assembly=assembly, urdfGeneration=urdfGeneration, outPath=outPath
)
bulletIds = []
for el in urdfs:
id = p.loadURDF(el)
bulletIds.append(id)
for i in range(duration):
if i + 200 == duration:
inc = 0
for bulletUUID in bulletIds:
pos, rot = p.getBasePositionAndOrientation(bulletUUID)
resultCoords.append(
SimulatorStabilityResultModel(
id=assembly[inc],
quaternion=Coords(x=rot[0], y=rot[1], z=rot[2]),
position=Coords(x=pos[0], y=pos[1], z=pos[2]),
)
)
p.removeBody(bulletUUID)
inc += 1
p.stepSimulation()
time.sleep(1.0 / 240.0)
return resultCoords
def call(self, aspPath: str):
try:
assemblyFolder = aspPath
assemblesStructures = json.loads(
(open(assemblyFolder + "sequences.json")).read()
).get("sequences")
tasks = len(assemblesStructures) * len(assemblesStructures[0])
taskCounter = 0
urdfGeneration = json.loads(
(open(assemblyFolder + "generation/urdf-generation.json")).read()
)
for activeAssemblyNumber in range(len(assemblesStructures)):
pathSaveResultAssemblyFolder = (
aspPath + "stability" + "/" + str(activeAssemblyNumber + 1) + "/"
)
if not os.path.exists(pathSaveResultAssemblyFolder):
os.makedirs(pathSaveResultAssemblyFolder)
for subAssemblyNumber in range(
len(assemblesStructures[activeAssemblyNumber])
):
taskCounter += 1
subAssembly = assemblesStructures[activeAssemblyNumber][
0 : subAssemblyNumber + 1
]
print(subAssembly)
if subAssembly == [
"disk_top",
"disk_middel",
]:
asm = []
for el in subAssembly:
asm.append(el)
resultSimulationStates = self.executeSimulation(
assembly=asm,
outPath=aspPath,
urdfGeneration=urdfGeneration,
duration=1000,
)
pathSaveResultSubAssemblyFolder = (
aspPath
+ "stability"
+ "/"
+ str(activeAssemblyNumber + 1)
+ "/"
+ str(subAssemblyNumber)
+ "/"
)
if not os.path.exists(pathSaveResultSubAssemblyFolder):
os.makedirs(pathSaveResultSubAssemblyFolder)
results = {}
for state in resultSimulationStates:
results[state.id] = state.to_dict()
f = open(
pathSaveResultSubAssemblyFolder
+ "/"
+ "motion_result.json",
"w",
encoding="utf-8",
errors="ignore",
)
f.write(json.dumps(results, ensure_ascii=False, indent=4))
f.close()
percentageOfCompletion = taskCounter / tasks * 100
print("process complete: " + str(percentageOfCompletion) + "%")
except Exception as e:
print(e)