From b4d4b8133d41d05f6ef567cde958166e815593e1 Mon Sep 17 00:00:00 2001 From: shalenikol Date: Fri, 28 Mar 2025 21:36:11 +0300 Subject: [PATCH] add queue service for tasks --- docs/tasks.csv | 2 +- rbs_mill_assist/config/obj_cfg.json | 38 + rbs_mill_assist/config/template_obj_1.svg | 855 ++++++++++++++++++++++ rbs_mill_assist/scripts/CMakeLists.txt | 1 + rbs_mill_assist/scripts/queue_srv.py | 277 +++++++ 5 files changed, 1172 insertions(+), 1 deletion(-) create mode 100644 rbs_mill_assist/config/obj_cfg.json create mode 100644 rbs_mill_assist/config/template_obj_1.svg create mode 100644 rbs_mill_assist/scripts/queue_srv.py diff --git a/docs/tasks.csv b/docs/tasks.csv index 6d28063..1c7ad06 100644 --- a/docs/tasks.csv +++ b/docs/tasks.csv @@ -1,6 +1,6 @@ obj_id,device_type,device_name,supply,freq,napor,mass,power,temp_range,date,serial_num,ta_range,pump_num,i_ob,i_sum,ip,engine_num Object ID,Тип устройства,Наименование,"Q - подача, м³/ч",n - частота оборотов,"H - напор, м",m - масса,P2 - мощность,T - диапазон температур,Дата производства,Заводской номер,Диапазон ta в градусах цельсия,Кол-во насосов,Iоб,Iсум,IP,Кол-во электродвигателей -1,Насос общепромышленный,,50,1450 об/мин,30,200.0 кг,15.0 кВт,-10...+80°C,01-15-23,SN12345,,,,,, +1,Насос общепромышленный,"КММ-ХА 80-50-200б/2/18,5-Е-55Т/BBQV-HC-У3",50,1450 об/мин,30,200.0 кг,15.0 кВт,-10...+80°C,01-15-23,SN12345,,,,,, 2,Насос взрывозащищённый (Ex),"КММ-ХА 80-50-200б/2/18,5-Е-55Т/BBQV-HC-У3",30 м³/ч,1450 об/мин,25,180.0 кг,10.0 кВт,-20...+60°C,02-10-23,SN12346,-40...+85°C,,,,, 3,Насосная установка,,100 м³/ч,,50,500.0 кг,30.0 кВт,,03-01-23,SN12347,2,,55,4, 4,Шкаф управления,,,,,,,,04-05-23,SN12348,,,30.0A,50.0A,IP54,3 diff --git a/rbs_mill_assist/config/obj_cfg.json b/rbs_mill_assist/config/obj_cfg.json new file mode 100644 index 0000000..d96dc33 --- /dev/null +++ b/rbs_mill_assist/config/obj_cfg.json @@ -0,0 +1,38 @@ +[ + { + "obj_id": 1, + "place_name": "bunker_1", + "place_aruco": 21, + "place_pos": "{pose}", + "graver_template": "template_obj_1.svg", + "dimensions": "{X,Y,Z}", + "skills": ["sid", "..."] + }, + { + "obj_id": 2, + "place_name": "bunker_2", + "place_aruco": 22, + "place_pos": "{pose}", + "graver_template": "{file}", + "dimensions": "{X,Y,Z}", + "skills": ["sid", "..."] + }, + { + "obj_id": 3, + "place_name": "bunker_3", + "place_aruco": 23, + "place_pos": "{pose}", + "graver_template": "{file}", + "dimensions": "{X,Y,Z}", + "skills": ["sid", "..."] + }, + { + "obj_id": 4, + "place_name": "bunker_4", + "place_aruco": 24, + "place_pos": "{pose}", + "graver_template": "{file}", + "dimensions": "{X,Y,Z}", + "skills": ["sid", "..."] + } +] \ No newline at end of file diff --git a/rbs_mill_assist/config/template_obj_1.svg b/rbs_mill_assist/config/template_obj_1.svg new file mode 100644 index 0000000..6c72a8a --- /dev/null +++ b/rbs_mill_assist/config/template_obj_1.svg @@ -0,0 +1,855 @@ + +image/svg+xml1КММ-ХА 80-50-200б/2/18,5-E-55T/BBQV-PC-У32,3SN21216121624.12.21251,2954 diff --git a/rbs_mill_assist/scripts/CMakeLists.txt b/rbs_mill_assist/scripts/CMakeLists.txt index 73a80a0..0f33c0a 100644 --- a/rbs_mill_assist/scripts/CMakeLists.txt +++ b/rbs_mill_assist/scripts/CMakeLists.txt @@ -1,5 +1,6 @@ install(PROGRAMS grasping_service.py get_key_pose_frame.py + queue_srv.py DESTINATION lib/${PROJECT_NAME} ) diff --git a/rbs_mill_assist/scripts/queue_srv.py b/rbs_mill_assist/scripts/queue_srv.py new file mode 100644 index 0000000..aed9f72 --- /dev/null +++ b/rbs_mill_assist/scripts/queue_srv.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 + +import os +from typing import Dict, Tuple + +import rclpy +import json +from ament_index_python.packages import get_package_share_directory +from rbs_utils_interfaces.srv import AddTasks, TaskFromQueue +from rclpy.node import Node +from rclpy.service import Service + +import cairosvg +import xml.etree.ElementTree as ET + +FILE_QUEUE = "tasks_queue.json" +FILE_QCFG = "obj_cfg.json" # obj_id должны идти последовательно, начиная с 1 +FILE_TEMP_LIST = "temp.txt" + +KEY_QUEUE = "Queue" +KEY_TASK_INDEX = "TaskId" +KEY_OBJECT_INDEX = "obj_id" +KEY_GRAVER_TMPL = "graver_template" +KEY_GRAVER_PNG = "graver_png" +KEY_GRAVER_SVG = "graver_svg" +KEY_CSV_TASK = "csv" + +XMLTAG_FOR_REPLACE = "text" + +FACTOR_mm_to_pixel = 10 # mm to pixel conversion factor + +def get_tags_without_namespace(element): + """ + Получаем имя тега без пространства имен + """ + return element.tag.split('}')[-1] if '}' in element.tag else element.tag + +class QueueService(Node): + __TASK_NO = 0 + __TASK_TAKE_ON = 1 + __TASK_COMPLETED = 2 + + def __init__(self) -> None: + super().__init__("queue_service") + self._cstate = self.__TASK_NO + self.sz_queue = 0 # size of queue + self.task_last_index = 0 + self.TaskDict: Dict = {} # dictionary with parameters of the current task + self.cfgdir = os.path.join(get_package_share_directory("rbs_mill_assist"), "config") + self.workdir = os.path.join(self.cfgdir, "queue") + self.fn_queue = os.path.join(self.workdir, FILE_QUEUE) # file with data of queue + self.fn_cfg = os.path.join(self.cfgdir, FILE_QCFG) # file with data of objects config + self.fn_tmp = os.path.join(self.workdir, FILE_TEMP_LIST) # file with list of temp files + + os.makedirs(self.workdir, exist_ok=True) + if os.path.isfile(self.fn_queue): + self.init_queue() + else: + with open(self.fn_queue, "w") as fh: + json.dump({KEY_QUEUE:[]}, fh) + + self.srv_add: Service = self.create_service(AddTasks, "queue/add_tasks", self.add_tasks) + self.srv_get: Service = self.create_service(AddTasks, "queue/get_task", self.get_task) + self.srv_takeon: Service = self.create_service(TaskFromQueue, "queue/takeon_task", self.takeon_task) + self.srv_completed: Service = self.create_service(TaskFromQueue, "queue/task_completed", self.task_completed) + self.get_logger().info(f"'queue_service' loaded") + + def get_task(self, request: TaskFromQueue.Request, response: TaskFromQueue.Response) -> TaskFromQueue.Response: + """ + Get json-string with parameters of the current task + """ + response.ok = False + if self._cstate == self.__TASK_TAKE_ON: # есть текущее задание + response.task = json.dumps(self.TaskDict, ensure_ascii=False) + response.ok = True + return response + + def task_completed(self, request: TaskFromQueue.Request, response: TaskFromQueue.Response) -> TaskFromQueue.Response: + """ + Сomplete the current task + """ + response.ok = False + if self._cstate == self.__TASK_TAKE_ON: # есть текущее задание + # добавим файлы для удаления, которые больше не нужны + with open(self.fn_tmp, "a") as fh: + fh.write(self.TaskDict[KEY_GRAVER_PNG]+"\n") + fh.write(self.TaskDict[KEY_GRAVER_SVG]+"\n") + + response.task = json.dumps(self.TaskDict, ensure_ascii=False) + self.TaskDict = {} + self._cstate = self.__TASK_COMPLETED + response.ok = True + return response + + def takeon_task(self, request: TaskFromQueue.Request, response: TaskFromQueue.Response) -> TaskFromQueue.Response: + """ + Take a task from the queue for execution + """ + response.ok = False + if self.sz_queue and not self._cstate == self.__TASK_TAKE_ON: # очередь не пуста и нет текущего задания + with open(self.fn_queue, "r") as fh: + jdata = json.load(fh) + + self.TaskDict = jdata[KEY_QUEUE].pop(0) + self.sz_queue = len(jdata) + + with open(self.fn_queue, "w") as fh: + json.dump(jdata, fh, ensure_ascii=False, indent=2) + + self._cstate = self.__TASK_TAKE_ON + response.ok = True + response.task = json.dumps(self.TaskDict, ensure_ascii=False) + self.get_logger().info(f"Task has been accepted for execution (TaskId={self.TaskDict['TaskId']})") + return response + + def delete_tmp(self, fn_tmp:str) -> None: + """ + Delete temporary files from list in text file + """ + if os.path.isfile(fn_tmp): + with open(fn_tmp, "r+") as fh: + paths = fh.readlines() + fh.truncate(0) # Очищаем файл + + # Удаляем пробелы и символы новой строки + paths = [path.strip() for path in paths] + # Удаляем файлы + for path in paths: + if os.path.isfile(path): + try: + os.remove(path) + except Exception as e: + self.get_logger().info(f"Ошибка при удалении файла {path}: {e}") + + def init_queue(self) -> None: + """ + Init of tasks queue. + """ + # delete temporary files + self.delete_tmp(self.fn_tmp) + + with open(self.fn_queue, "r") as fh: + jdata = json.load(fh) + queue = jdata[KEY_QUEUE] + self.sz_queue = len(queue) + if self.sz_queue > 0: + self.task_last_index = queue[self.sz_queue-1][KEY_TASK_INDEX] + + def update_svg(self, svg_f:str, csv:Dict, task_id:int) -> Tuple[str, int, int]: + """ + Return updating SVG-file + """ + # Загружаем и парсим SVG как XML-файл + tree = ET.parse(svg_f) + root = tree.getroot() + # Размеры результирующего изображения + width = int(''.join(filter(str.isdigit, root.get("width")))) * FACTOR_mm_to_pixel + height = int(''.join(filter(str.isdigit, root.get("height")))) * FACTOR_mm_to_pixel + + # Проходим по всем элементам в дереве + for elem in root.iter(): + tag = get_tags_without_namespace(elem) + if tag == XMLTAG_FOR_REPLACE: + id = elem.get("id") + if id in csv: # поле есть в словаре + n = 0 + for el_text in elem.iter(): # ищем двойные поля + tspan = get_tags_without_namespace(el_text) + if tspan == "tspan": + n += 1 + if n == 1: + el_1 = el_text + else: + el_2 = el_text + + val = str(csv[id]) # значение из словаря + if n > 1: # двойное поле + src_str = el_1.text # значение из шаблона (определяет максимальное кол-во символов в первой строке поля) + src_len = len(src_str) + # if src_len: # в случае, если шаблон заполнен + l = src_len+1 if src_len else len(val)/2 + 1 + i_div = val[:l].rfind('-') # индекс для разделения строки + i_div = l-1 if i_div < 2 else i_div+1 # если нет '-' + el_1.text = val[:i_div] + el_2.text = val[i_div:] + self.get_logger().info(f"{id}: 1) '{el_1.text}' 2) '{el_2.text}'") + + elif n == 1: #for el_text in elem.iter(): + pre = el_1.text + el_1.text = val # Заменяем текст элемента на значение из словаря + self.get_logger().info(f"{id}: before: '{pre}' after '{el_1.text}'") + + # Сохраняем изменённое дерево в новый SVG-файл + svg_f = os.path.join(self.workdir, f"tmpl_{task_id}.svg") + tree.write(svg_f, encoding='utf-8', xml_declaration=True) + + return svg_f, width, height + + def set_taskdata(self, src_d: Dict) -> Dict: + """ + Setting up task data, filling in templates + """ + task_id = src_d[KEY_TASK_INDEX] + svg = src_d[KEY_GRAVER_TMPL] + svg_file = os.path.join(self.cfgdir, svg) + png = "" + if os.path.isfile(svg_file): + # Обработка SVG + svg_file, w, h = self.update_svg(svg_file, src_d[KEY_CSV_TASK], task_id) + # Конвертация SVG в PNG + png = os.path.join(self.workdir, f"tmpl_{task_id}.png") + # cairosvg.svg2png(url=svg_file, write_to=png) + cairosvg.svg2png(url=svg_file, write_to=png, output_width=w, output_height=h) + else: + svg_file = "" + + src_d[KEY_GRAVER_PNG] = png + src_d[KEY_GRAVER_SVG] = svg_file + return src_d + + def add_tasks(self, request: AddTasks.Request, response: AddTasks.Response) -> AddTasks.Response: + """ + Adding a task list to the queue + """ + fn = request.tasks_csv + if not os.path.isfile(fn): + response.ok = False + self.get_logger().error(f"No such task file: {fn}") + return response + + with open(self.fn_cfg, "r") as fh: + cfg = json.load(fh) + sz_cfg = len(cfg) + + with open(self.fn_queue, "r") as fh: + queue_data = json.load(fh) + queue = queue_data[KEY_QUEUE] + + import pandas as pd + # Считываем CSV-файл + data = pd.read_csv(fn, delimiter=',', skiprows=0, encoding="utf-8", keep_default_na=False) + # # Получаем количество строк + # num_rows = data.shape[0] + for index, row in data.iterrows(): + if index == 0: # строка с наименованиями + continue + rd = row.to_dict() + icfg = int(rd[KEY_OBJECT_INDEX]) - 1 + if icfg < sz_cfg and icfg >= 0: + self.task_last_index += 1 + dict_res = self.set_taskdata({KEY_TASK_INDEX: self.task_last_index} | cfg[icfg] | {KEY_CSV_TASK: rd}) + if dict_res[KEY_GRAVER_PNG]: # проверка на valid task + queue += [dict_res] + self.get_logger().info(f"Индекс: {index}, Данные: {dict_res}") + + with open(self.fn_queue, "w") as fh: + # json.dump({KEY_QUEUE: queue}, fh, ensure_ascii=False) + json.dump({KEY_QUEUE: queue}, fh, ensure_ascii=False, indent=2) + + self.sz_queue = len(queue) + + response.ok = True + return response + +def main(): + rclpy.init() + executor = rclpy.executors.SingleThreadedExecutor() + i_node = QueueService() + executor.add_node(i_node) + try: + executor.spin() + except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): + i_node.destroy_node() + rclpy.shutdown() + +if __name__ == "__main__": + main()