Merge branch 'shalenikol_18' into 'dev'

Shalenikol 18

See merge request robossembler/robossembler-cnc-graver-assist!8
This commit is contained in:
shalenikol 2025-03-24 15:42:01 +00:00
commit 8d73b0451d
3 changed files with 178 additions and 0 deletions

View file

@ -0,0 +1,38 @@
[
{
"obj_id": 1,
"place_name": "bunker_1",
"place_aruco": 21,
"place_pos": "{pose}",
"graver_template": "{file}",
"dimensions": "{X,Y,Z}",
"skills": ["sid", "..."]
},
{
"obj_id": 2,
"place_name": "bunker_2",
"place_aruco": 22,
"place_pos": "{pose}",
"graver_template": "{file}",
"dimensions": "{X,Y,Z}",
"skills": ["sid", "..."]
},
{
"obj_id": 3,
"place_name": "bunker_3",
"place_aruco": 23,
"place_pos": "{pose}",
"graver_template": "{file}",
"dimensions": "{X,Y,Z}",
"skills": ["sid", "..."]
},
{
"obj_id": 4,
"place_name": "bunker_4",
"place_aruco": 24,
"place_pos": "{pose}",
"graver_template": "{file}",
"dimensions": "{X,Y,Z}",
"skills": ["sid", "..."]
}
]

View file

@ -1,5 +1,6 @@
install(PROGRAMS
grasping_service.py
get_key_pose_frame.py
queue_srv.py
DESTINATION lib/${PROJECT_NAME}
)

View file

@ -0,0 +1,139 @@
#!/usr/bin/env python3
import os
from typing import Dict
import rclpy
import json
# import yaml
from ament_index_python.packages import get_package_share_directory
from rbs_utils_interfaces.srv import AddTasks
from rclpy.node import Node
from rclpy.service import Service
FILE_QUEUE = "tasks_queue.json"
FILE_QCFG = "obj_cfg.json" # obj_id должны идти последовательно, начиная с 1
FILE_TEMP_LIST = "temp.txt"
KEY_QUEUE = "Queue"
KEY_TASK_INDEX = "TaskId"
KEY_OBJECT_INDEX = "obj_id"
class QueueService(Node):
def __init__(self) -> None:
super().__init__("queue_service")
self.sz_queue = 0 # size of queue
self.task_last_index = 0
self.cfgdir = os.path.join(get_package_share_directory("rbs_mill_assist"), "config")
self.workdir = os.path.join(self.cfgdir, "queue")
self.fn_queue = os.path.join(self.workdir, FILE_QUEUE) # file with data of queue
self.fn_cfg = os.path.join(self.cfgdir, FILE_QCFG) # file with data of objects config
self.fn_tmp = os.path.join(self.workdir, FILE_TEMP_LIST) # file with list of temp files
os.makedirs(self.workdir, exist_ok=True)
if os.path.isfile(self.fn_queue):
self.init_queue()
else:
with open(self.fn_queue, "w") as fh:
json.dump({KEY_QUEUE:[]}, fh)
self.srv_add: Service = self.create_service(AddTasks, "add_tasks", self.add_tasks)
self.get_logger().info(f"'queue_service' loaded")
def delete_tmp(self, fn_tmp:str) -> None:
"""
Delete temporary files from list in text file
"""
if os.path.isfile(fn_tmp):
with open(fn_tmp, "r+") as fh:
paths = fh.readlines()
fh.truncate(0) # Очищаем файл
# Удаляем пробелы и символы новой строки
paths = [path.strip() for path in paths]
# Удаляем файлы
for path in paths:
if os.path.isfile(path):
try:
os.remove(path)
except Exception as e:
self.get_logger().info(f"Ошибка при удалении файла {path}: {e}")
def init_queue(self) -> None:
"""
Init of tasks queue.
"""
# delete temporary files
self.delete_tmp(self.fn_tmp)
with open(self.fn_queue, "r") as fh:
jdata = json.load(fh)
queue = jdata[KEY_QUEUE]
self.sz_queue = len(queue)
if self.sz_queue > 0:
self.task_last_index = queue[self.sz_queue-1][KEY_TASK_INDEX]
def set_taskdata(self, src_d: Dict) -> Dict:
"""
Setting up task data, filling in templates
"""
# TODO
return src_d
def add_tasks(self, request: AddTasks.Request, response: AddTasks.Response) -> AddTasks.Response:
"""
Adding a task list to the queue
"""
fn = request.tasks_csv
if not os.path.isfile(fn):
response.ok = False
self.get_logger().error(f"No such task file: {fn}")
return response
with open(self.fn_cfg, "r") as fh:
cfg = json.load(fh)
sz_cfg = len(cfg)
with open(self.fn_queue, "r") as fh:
queue_data = json.load(fh)
queue = queue_data[KEY_QUEUE]
import pandas as pd
# Считываем CSV-файл
data = pd.read_csv(fn, delimiter=',', skiprows=0, encoding="utf-8")
# # Получаем количество строк
# num_rows = data.shape[0]
for index, row in data.iterrows():
if index == 0: # строка с наименованиями
continue
rd = row.to_dict()
icfg = int(rd[KEY_OBJECT_INDEX]) - 1
if icfg < sz_cfg and icfg >= 0:
self.task_last_index += 1
dict_res = {KEY_TASK_INDEX: self.task_last_index} | cfg[icfg] | rd
queue += [dict_res]
self.get_logger().info(f"Индекс: {index}, Данные: {dict_res}")
with open(self.fn_queue, "w") as fh:
# json.dump({KEY_QUEUE: queue}, fh)
json.dump({KEY_QUEUE: queue}, fh, indent=2, ensure_ascii=False)
self.sz_queue = len(queue)
response.ok = True
return response
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
i_node = QueueService()
executor.add_node(i_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
i_node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()