add queue service for tasks

This commit is contained in:
shalenikol 2025-03-28 21:36:11 +03:00
parent 70dca46f26
commit b4d4b8133d
5 changed files with 1172 additions and 1 deletions

View file

@ -1,6 +1,6 @@
obj_id,device_type,device_name,supply,freq,napor,mass,power,temp_range,date,serial_num,ta_range,pump_num,i_ob,i_sum,ip,engine_num
Object ID,Тип устройства,Наименование,"Q - подача, м³/ч",n - частота оборотов,"H - напор, м",m - масса,P2 - мощность,T - диапазон температур,Дата производства,Заводской номер,Диапазон ta в градусах цельсия,Кол-во насосов,Iоб,Iсум,IP,Кол-во электродвигателей
1,Насос общепромышленный,,50,1450 об/мин,30,200.0 кг,15.0 кВт,-10...+80°C,01-15-23,SN12345,,,,,,
1,Насос общепромышленный,"КММ-ХА 80-50-200б/2/18,5-Е-55Т/BBQV-HC-У3",50,1450 об/мин,30,200.0 кг,15.0 кВт,-10...+80°C,01-15-23,SN12345,,,,,,
2,Насос взрывозащищённый (Ex),"КММ-ХА 80-50-200б/2/18,5-Е-55Т/BBQV-HC-У3",30 м³/ч,1450 об/мин,25,180.0 кг,10.0 кВт,-20...+60°C,02-10-23,SN12346,-40...+85°C,,,,,
3,Насосная установка,,100 м³/ч,,50,500.0 кг,30.0 кВт,,03-01-23,SN12347,2,,55,4,
4,Шкаф управления,,,,,,,,04-05-23,SN12348,,,30.0A,50.0A,IP54,3

Can't render this file because it has a wrong number of fields in line 5.

View file

@ -0,0 +1,38 @@
[
{
"obj_id": 1,
"place_name": "bunker_1",
"place_aruco": 21,
"place_pos": "{pose}",
"graver_template": "template_obj_1.svg",
"dimensions": "{X,Y,Z}",
"skills": ["sid", "..."]
},
{
"obj_id": 2,
"place_name": "bunker_2",
"place_aruco": 22,
"place_pos": "{pose}",
"graver_template": "{file}",
"dimensions": "{X,Y,Z}",
"skills": ["sid", "..."]
},
{
"obj_id": 3,
"place_name": "bunker_3",
"place_aruco": 23,
"place_pos": "{pose}",
"graver_template": "{file}",
"dimensions": "{X,Y,Z}",
"skills": ["sid", "..."]
},
{
"obj_id": 4,
"place_name": "bunker_4",
"place_aruco": 24,
"place_pos": "{pose}",
"graver_template": "{file}",
"dimensions": "{X,Y,Z}",
"skills": ["sid", "..."]
}
]

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 177 KiB

View file

@ -1,5 +1,6 @@
install(PROGRAMS
grasping_service.py
get_key_pose_frame.py
queue_srv.py
DESTINATION lib/${PROJECT_NAME}
)

View file

@ -0,0 +1,277 @@
#!/usr/bin/env python3
import os
from typing import Dict, Tuple
import rclpy
import json
from ament_index_python.packages import get_package_share_directory
from rbs_utils_interfaces.srv import AddTasks, TaskFromQueue
from rclpy.node import Node
from rclpy.service import Service
import cairosvg
import xml.etree.ElementTree as ET
FILE_QUEUE = "tasks_queue.json"
FILE_QCFG = "obj_cfg.json" # obj_id должны идти последовательно, начиная с 1
FILE_TEMP_LIST = "temp.txt"
KEY_QUEUE = "Queue"
KEY_TASK_INDEX = "TaskId"
KEY_OBJECT_INDEX = "obj_id"
KEY_GRAVER_TMPL = "graver_template"
KEY_GRAVER_PNG = "graver_png"
KEY_GRAVER_SVG = "graver_svg"
KEY_CSV_TASK = "csv"
XMLTAG_FOR_REPLACE = "text"
FACTOR_mm_to_pixel = 10 # mm to pixel conversion factor
def get_tags_without_namespace(element):
"""
Получаем имя тега без пространства имен
"""
return element.tag.split('}')[-1] if '}' in element.tag else element.tag
class QueueService(Node):
__TASK_NO = 0
__TASK_TAKE_ON = 1
__TASK_COMPLETED = 2
def __init__(self) -> None:
super().__init__("queue_service")
self._cstate = self.__TASK_NO
self.sz_queue = 0 # size of queue
self.task_last_index = 0
self.TaskDict: Dict = {} # dictionary with parameters of the current task
self.cfgdir = os.path.join(get_package_share_directory("rbs_mill_assist"), "config")
self.workdir = os.path.join(self.cfgdir, "queue")
self.fn_queue = os.path.join(self.workdir, FILE_QUEUE) # file with data of queue
self.fn_cfg = os.path.join(self.cfgdir, FILE_QCFG) # file with data of objects config
self.fn_tmp = os.path.join(self.workdir, FILE_TEMP_LIST) # file with list of temp files
os.makedirs(self.workdir, exist_ok=True)
if os.path.isfile(self.fn_queue):
self.init_queue()
else:
with open(self.fn_queue, "w") as fh:
json.dump({KEY_QUEUE:[]}, fh)
self.srv_add: Service = self.create_service(AddTasks, "queue/add_tasks", self.add_tasks)
self.srv_get: Service = self.create_service(AddTasks, "queue/get_task", self.get_task)
self.srv_takeon: Service = self.create_service(TaskFromQueue, "queue/takeon_task", self.takeon_task)
self.srv_completed: Service = self.create_service(TaskFromQueue, "queue/task_completed", self.task_completed)
self.get_logger().info(f"'queue_service' loaded")
def get_task(self, request: TaskFromQueue.Request, response: TaskFromQueue.Response) -> TaskFromQueue.Response:
"""
Get json-string with parameters of the current task
"""
response.ok = False
if self._cstate == self.__TASK_TAKE_ON: # есть текущее задание
response.task = json.dumps(self.TaskDict, ensure_ascii=False)
response.ok = True
return response
def task_completed(self, request: TaskFromQueue.Request, response: TaskFromQueue.Response) -> TaskFromQueue.Response:
"""
Сomplete the current task
"""
response.ok = False
if self._cstate == self.__TASK_TAKE_ON: # есть текущее задание
# добавим файлы для удаления, которые больше не нужны
with open(self.fn_tmp, "a") as fh:
fh.write(self.TaskDict[KEY_GRAVER_PNG]+"\n")
fh.write(self.TaskDict[KEY_GRAVER_SVG]+"\n")
response.task = json.dumps(self.TaskDict, ensure_ascii=False)
self.TaskDict = {}
self._cstate = self.__TASK_COMPLETED
response.ok = True
return response
def takeon_task(self, request: TaskFromQueue.Request, response: TaskFromQueue.Response) -> TaskFromQueue.Response:
"""
Take a task from the queue for execution
"""
response.ok = False
if self.sz_queue and not self._cstate == self.__TASK_TAKE_ON: # очередь не пуста и нет текущего задания
with open(self.fn_queue, "r") as fh:
jdata = json.load(fh)
self.TaskDict = jdata[KEY_QUEUE].pop(0)
self.sz_queue = len(jdata)
with open(self.fn_queue, "w") as fh:
json.dump(jdata, fh, ensure_ascii=False, indent=2)
self._cstate = self.__TASK_TAKE_ON
response.ok = True
response.task = json.dumps(self.TaskDict, ensure_ascii=False)
self.get_logger().info(f"Task has been accepted for execution (TaskId={self.TaskDict['TaskId']})")
return response
def delete_tmp(self, fn_tmp:str) -> None:
"""
Delete temporary files from list in text file
"""
if os.path.isfile(fn_tmp):
with open(fn_tmp, "r+") as fh:
paths = fh.readlines()
fh.truncate(0) # Очищаем файл
# Удаляем пробелы и символы новой строки
paths = [path.strip() for path in paths]
# Удаляем файлы
for path in paths:
if os.path.isfile(path):
try:
os.remove(path)
except Exception as e:
self.get_logger().info(f"Ошибка при удалении файла {path}: {e}")
def init_queue(self) -> None:
"""
Init of tasks queue.
"""
# delete temporary files
self.delete_tmp(self.fn_tmp)
with open(self.fn_queue, "r") as fh:
jdata = json.load(fh)
queue = jdata[KEY_QUEUE]
self.sz_queue = len(queue)
if self.sz_queue > 0:
self.task_last_index = queue[self.sz_queue-1][KEY_TASK_INDEX]
def update_svg(self, svg_f:str, csv:Dict, task_id:int) -> Tuple[str, int, int]:
"""
Return updating SVG-file
"""
# Загружаем и парсим SVG как XML-файл
tree = ET.parse(svg_f)
root = tree.getroot()
# Размеры результирующего изображения
width = int(''.join(filter(str.isdigit, root.get("width")))) * FACTOR_mm_to_pixel
height = int(''.join(filter(str.isdigit, root.get("height")))) * FACTOR_mm_to_pixel
# Проходим по всем элементам в дереве
for elem in root.iter():
tag = get_tags_without_namespace(elem)
if tag == XMLTAG_FOR_REPLACE:
id = elem.get("id")
if id in csv: # поле есть в словаре
n = 0
for el_text in elem.iter(): # ищем двойные поля
tspan = get_tags_without_namespace(el_text)
if tspan == "tspan":
n += 1
if n == 1:
el_1 = el_text
else:
el_2 = el_text
val = str(csv[id]) # значение из словаря
if n > 1: # двойное поле
src_str = el_1.text # значение из шаблона (определяет максимальное кол-во символов в первой строке поля)
src_len = len(src_str)
# if src_len: # в случае, если шаблон заполнен
l = src_len+1 if src_len else len(val)/2 + 1
i_div = val[:l].rfind('-') # индекс для разделения строки
i_div = l-1 if i_div < 2 else i_div+1 # если нет '-'
el_1.text = val[:i_div]
el_2.text = val[i_div:]
self.get_logger().info(f"{id}: 1) '{el_1.text}' 2) '{el_2.text}'")
elif n == 1: #for el_text in elem.iter():
pre = el_1.text
el_1.text = val # Заменяем текст элемента на значение из словаря
self.get_logger().info(f"{id}: before: '{pre}' after '{el_1.text}'")
# Сохраняем изменённое дерево в новый SVG-файл
svg_f = os.path.join(self.workdir, f"tmpl_{task_id}.svg")
tree.write(svg_f, encoding='utf-8', xml_declaration=True)
return svg_f, width, height
def set_taskdata(self, src_d: Dict) -> Dict:
"""
Setting up task data, filling in templates
"""
task_id = src_d[KEY_TASK_INDEX]
svg = src_d[KEY_GRAVER_TMPL]
svg_file = os.path.join(self.cfgdir, svg)
png = ""
if os.path.isfile(svg_file):
# Обработка SVG
svg_file, w, h = self.update_svg(svg_file, src_d[KEY_CSV_TASK], task_id)
# Конвертация SVG в PNG
png = os.path.join(self.workdir, f"tmpl_{task_id}.png")
# cairosvg.svg2png(url=svg_file, write_to=png)
cairosvg.svg2png(url=svg_file, write_to=png, output_width=w, output_height=h)
else:
svg_file = ""
src_d[KEY_GRAVER_PNG] = png
src_d[KEY_GRAVER_SVG] = svg_file
return src_d
def add_tasks(self, request: AddTasks.Request, response: AddTasks.Response) -> AddTasks.Response:
"""
Adding a task list to the queue
"""
fn = request.tasks_csv
if not os.path.isfile(fn):
response.ok = False
self.get_logger().error(f"No such task file: {fn}")
return response
with open(self.fn_cfg, "r") as fh:
cfg = json.load(fh)
sz_cfg = len(cfg)
with open(self.fn_queue, "r") as fh:
queue_data = json.load(fh)
queue = queue_data[KEY_QUEUE]
import pandas as pd
# Считываем CSV-файл
data = pd.read_csv(fn, delimiter=',', skiprows=0, encoding="utf-8", keep_default_na=False)
# # Получаем количество строк
# num_rows = data.shape[0]
for index, row in data.iterrows():
if index == 0: # строка с наименованиями
continue
rd = row.to_dict()
icfg = int(rd[KEY_OBJECT_INDEX]) - 1
if icfg < sz_cfg and icfg >= 0:
self.task_last_index += 1
dict_res = self.set_taskdata({KEY_TASK_INDEX: self.task_last_index} | cfg[icfg] | {KEY_CSV_TASK: rd})
if dict_res[KEY_GRAVER_PNG]: # проверка на valid task
queue += [dict_res]
self.get_logger().info(f"Индекс: {index}, Данные: {dict_res}")
with open(self.fn_queue, "w") as fh:
# json.dump({KEY_QUEUE: queue}, fh, ensure_ascii=False)
json.dump({KEY_QUEUE: queue}, fh, ensure_ascii=False, indent=2)
self.sz_queue = len(queue)
response.ok = True
return response
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
i_node = QueueService()
executor.add_node(i_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
i_node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()