import can import sys import time from intelhex import IntelHex # Конфигурация CAN_CHANNEL = 'socketcan' CAN_INTERFACE = 'can0' CAN_BITRATE = 1000000 #ch =int(input("Введите id устройства:")) ch = int(sys.argv[2]) BOOT_CAN_ID = (ch * 16) + 1 DATA_CAN_ID = (ch * 16) + 3 BOOT_CAN_END = (ch * 16) + 2 ACK_CAN_ID = 0x05 #конфиг для crc16 ibm def debug_print(msg): print(f"[DEBUG] {msg}") def calculate_crc16(data: bytes) -> int: crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc def send_firmware(hex_file): try: debug_print("Инициализация CAN...") bus = can.interface.Bus( channel=CAN_INTERFACE, bustype=CAN_CHANNEL, bitrate=CAN_BITRATE ) debug_print("Чтение HEX-файла...") ih = IntelHex(hex_file) binary_data = ih.tobinstr() # Исправлено на tobinstr() fw_size = len(binary_data) debug_print(f"Размер прошивки: {fw_size} байт") # Расчет CRC debug_print("Расчёт CRC...") # calculator = Calculator(Crc16.IBM) fw_crc = calculate_crc16(binary_data) debug_print(f"CRC: 0x{fw_crc:04X}") # Отправка START start_data = bytearray([0x01]) start_data += fw_size.to_bytes(4, 'little') start_data += fw_crc.to_bytes(2, 'little') debug_print(f"START: {list(start_data)}") start_msg = can.Message( arbitration_id=BOOT_CAN_ID, data=bytes(start_data), is_extended_id=False ) try: bus.send(start_msg) except can.CanError as e: debug_print(f"Ошибка отправки START: {str(e)}") return # Ожидание ACK debug_print("Ожидание ACK...") ack = wait_for_ack(bus) if not ack: debug_print("Таймаут ACK START") return debug_print(f"Получен ACK: {list(ack.data)}") # Отправка данных packet_size = 8 for i in range(0, len(binary_data), packet_size): chunk = binary_data[i:i+packet_size] # Дополнение до 8 байт if len(chunk) < 8: chunk += b'\xFF' * (8 - len(chunk)) debug_print(f"Пакет {i//8}: {list(chunk)}") data_msg = can.Message( arbitration_id=DATA_CAN_ID, data=chunk, is_extended_id=False ) try: bus.send(data_msg) except can.CanError as e: debug_print(f"Ошибка отправки данных: {str(e)}") return ack = wait_for_ack(bus) if not ack: debug_print("Таймаут ACK DATA") return # Финал debug_print("Отправка FINISH...") finish_msg = can.Message( arbitration_id=BOOT_CAN_END, data=bytes([0xAA]), is_extended_id=False ) bus.send(finish_msg) ack = wait_for_ack(bus, timeout=1.0) if ack and ack.data[0] == 0xAA: debug_print("Прошивка подтверждена!") else: debug_print("Ошибка верификации!") except Exception as e: debug_print(f"Критическая ошибка: {str(e)}") finally: bus.shutdown() def wait_for_ack(bus, timeout=1.0): start_time = time.time() while time.time() - start_time < timeout: msg = bus.recv(timeout=0.1) # Неблокирующий режим if msg and msg.arbitration_id == ACK_CAN_ID: return msg return None if __name__ == "__main__": import sys if len(sys.argv) != 3: print("Использование: sudo python3 can_flasher.py firmware.hex") sys.exit(1) send_firmware(sys.argv[1])