From a2fc185c1e36427fc56a43561a70b9fcb2ca47e5 Mon Sep 17 00:00:00 2001 From: lulko Date: Mon, 14 Apr 2025 21:33:36 +0300 Subject: [PATCH] Add test id --- include/flash.h | 2 +- src/main.cpp | 6 ++ test/python_test_id.py | 155 +++++++++++++++++++++++++---------------- 3 files changed, 101 insertions(+), 62 deletions(-) diff --git a/include/flash.h b/include/flash.h index f413e22..eb9682b 100644 --- a/include/flash.h +++ b/include/flash.h @@ -66,7 +66,7 @@ void write_param(uint8_t param_id, uint8_t val); FLASH_RECORD* load_params(); void compact_page(); void flash_read(uint32_t addr,FLASH_RECORD* ptr); -bool validate_crc(FLASH_RECORD* crc); +bool validate_crc(FLASH_RECORD* crc,uint32_t length); void flash_write(uint32_t addr, FLASH_RECORD* record); #endif /* FLASH_H_ */ diff --git a/src/main.cpp b/src/main.cpp index 5239d48..29ff725 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -32,6 +32,8 @@ uint32_t SectorError; volatile uint32_t msg_id; volatile uint16_t id_x; volatile uint8_t msg_ch; +volatile uint8_t crc_h; +volatile uint8_t crc_l; volatile float kt = 0.1; //for torgue calculation @@ -155,6 +157,8 @@ void send_id() { CAN_TX_msg.id = flash_rec->value; CAN_TX_msg.buf[0] = 'I'; memcpy(&CAN_TX_msg.buf[1], &(flash_rec->value), sizeof(uint8_t)); + memcpy(&CAN_TX_msg.buf[6], (uint8_t*)&crc_h, sizeof(uint8_t)); + memcpy(&CAN_TX_msg.buf[7], (uint8_t*)&crc_l, sizeof(uint8_t)); Can.write(CAN_TX_msg); } @@ -187,6 +191,8 @@ void send_data() { void listen_can(const CAN_message_t &msg) { msg_id = msg.id; + crc_h = msg.buf[6]; + crc_l = msg.buf[7]; /* 0x691 69 - адрес устройства 1 - что делать дальше с данными */ diff --git a/test/python_test_id.py b/test/python_test_id.py index 8a8797d..e313966 100644 --- a/test/python_test_id.py +++ b/test/python_test_id.py @@ -1,73 +1,106 @@ import can import time -def send_write_read_requests(): +# Конфигурация CAN интерфейса +CAN_INTERFACE = 'can0' +OLD_DEVICE_ID = 0x70 # Текущий ID устройства +NEW_DEVICE_ID = 0x45 # Новый ID устройства +REG_WRITE = 0x8 # Команда записи (msg_ch) +REG_READ = 0x7 # Команда чтения (msg_ch) +REG_ID = 0x1 # Регистр для работы с ID + +def calculate_crc16_modbus(data: bytes) -> int: + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x0001: + crc = (crc >> 1) ^ 0xA001 + else: + crc >>= 1 + return crc + +def send_can_message(bus, can_id, data): + """Отправка CAN-сообщения с CRC""" + # Добавляем CRC к данным + data_bytes = bytes(data) + crc = calculate_crc16_modbus(data_bytes) + full_data = list(data) + [0x00] * (8 - len(data)) + full_data[6] = (crc >> 8) & 0xFF + full_data[7] = crc & 0xFF + try: - bus = can.interface.Bus(channel='can0', bustype='socketcan') - - # Конфигурация сообщений (ЗАПОЛНИТЕ ВАШИ ЗНАЧЕНИЯ) - write_msg = { - 'arbitration_id': 0x01, # CAN ID для записи - 'data': [0x27, 0xA0, 0xFF, 0x00], # Данные для записи (4 байта) - 'description': "Установка id устройства" - } - - read_msg = { - 'arbitration_id': 0x01, # CAN ID для чтения - 'data': [0xFF,0x99], # Адрес новый + команда запроса данных - 'description': "Запрос id устройства", - 'response_id': 0xFF, # Ожидаемый ID ответа - 'timeout': 1.0 # Таймаут ожидания ответа (сек) - } - - # 1. Отправка команды записи - print("Отправка команды записи...") msg = can.Message( - arbitration_id=write_msg['arbitration_id'], - data=write_msg['data'], + arbitration_id=can_id, + data=full_data, is_extended_id=False ) bus.send(msg) - print(f"Запись: ID={hex(msg.arbitration_id)}, Данные={list(msg.data)}") - - # Ждем обработки команды устройством - time.sleep(2.0) + print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {full_data}") + return True + except can.CanError as e: + print(f"Ошибка CAN: {e}") + return False - # 2. Отправка запроса чтения и ожидание ответа - print("\nОтправка запроса чтения...") - msg = can.Message( - arbitration_id=read_msg['arbitration_id'], - data=read_msg['data'], - is_extended_id=False - ) - bus.send(msg) - print(f"Чтение: ID={hex(msg.arbitration_id)}, Команда={list(msg.data)}") - - # Ожидаем ответа - start_time = time.time() - response_received = False - - print("\nОжидание ответа...") - while (time.time() - start_time) < read_msg['timeout']: - response = bus.recv(timeout=0.1) +def receive_response(bus, timeout=2.0): + """Ожидание ответа с проверкой CRC""" + start_time = time.time() + while time.time() - start_time < timeout: + msg = bus.recv(timeout=0.1) + if msg: + print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}") - if response and response.arbitration_id == read_msg['response_id']: - print(f"\nПолучен ответ: ID={hex(response.arbitration_id)}") - print(f"Данные: {list(response.data)}") - print(f"Длина: {response.dlc} байт") - response_received = True - break - - if not response_received: - print("\nОшибка: ответ не получен в течение заданного времени") + # Проверяем CRC + if len(msg.data) < 2: + print("Ошибка: Слишком короткое сообщение") + continue + + received_crc = (msg.data[-2] << 8) | msg.data[-1] + calculated_crc = calculate_crc16_modbus(bytes(msg.data[:-2])) + + if received_crc != calculated_crc: + print(f"Ошибка CRC! Получено: 0x{received_crc:04X}, Ожидалось: 0x{calculated_crc:04X}") + continue + + return msg.data[:-2] # Возвращаем данные без CRC + + print("[Ошибка] Таймаут ожидания ответа") + return None - except KeyboardInterrupt: - print("\nПрерывание пользователем") - except Exception as e: - print(f"Ошибка: {str(e)}") - finally: - bus.shutdown() - print("\nCAN соединение закрыто") +# Инициализация CAN шины +bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan') -if __name__ == '__main__': - send_write_read_requests() +# Формирование и отправка команды изменения ID +write_can_id = (OLD_DEVICE_ID << 4) | REG_WRITE +write_data = [REG_ID, NEW_DEVICE_ID] + +print("\n--- Отправка команды изменения ID ---") +if not send_can_message(bus, write_can_id, write_data): + print("Ошибка отправки!") + bus.shutdown() + exit(1) + +# Ожидание обработки команды +time.sleep(0.5) + +# Запрос подтверждения нового ID +read_can_id = (NEW_DEVICE_ID << 4) | REG_READ +read_data = [REG_ID, 0x00] + +print("\n--- Проверка нового ID ---") +if not send_can_message(bus, read_can_id, read_data): + print("Ошибка отправки запроса!") + bus.shutdown() + exit(1) + +# Получение ответа +response = receive_response(bus) +if response: + if len(response) >= 2 and response[0] == ord('I') and response[1] == NEW_DEVICE_ID: + print(f"\nУСПЕХ! ID устройства изменен на 0x{NEW_DEVICE_ID:02X}") + else: + print(f"\nОШИБКА! Некорректный ответ: {response}") +else: + print("\nУстройство не ответило") + +bus.shutdown()