servo/controller/fw/embed/test/python_test_boot.py

142 lines
4.2 KiB
Python

import can
import sys
import time
from intelhex import IntelHex
from crc import Calculator, Crc16
# Конфигурация
CAN_CHANNEL = 'socketcan'
CAN_INTERFACE = 'can0'
CAN_BITRATE = 1000000
#ch =int(input("Введите id устройства:"))
ch = int(sys.argv[2])
BOOT_CAN_ID = (ch * 16) + 1
DATA_CAN_ID = (ch * 16) + 3
BOOT_CAN_END = (ch * 16) + 2
ACK_CAN_ID = 0x05
#конфиг для crc16 ibm
def debug_print(msg):
print(f"[DEBUG] {msg}")
def calculate_crc16_modbus(data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def send_firmware(hex_file):
try:
debug_print("Инициализация CAN...")
bus = can.interface.Bus(
channel=CAN_INTERFACE,
bustype=CAN_CHANNEL,
bitrate=CAN_BITRATE
)
debug_print("Чтение HEX-файла...")
ih = IntelHex(hex_file)
binary_data = ih.tobinstr() # Исправлено на tobinstr()
fw_size = len(binary_data)
debug_print(f"Размер прошивки: {fw_size} байт")
# Расчет CRC
debug_print("Расчёт CRC...")
calculator = Calculator(Crc16.IBM)
fw_crc = calculate_crc16_modbus(binary_data)
debug_print(f"CRC: 0x{fw_crc:04X}")
# Отправка START
start_data = bytearray([0x01])
start_data += fw_size.to_bytes(4, 'little')
start_data += fw_crc.to_bytes(2, 'little')
debug_print(f"START: {list(start_data)}")
start_msg = can.Message(
arbitration_id=BOOT_CAN_ID,
data=bytes(start_data),
is_extended_id=False
)
try:
bus.send(start_msg)
except can.CanError as e:
debug_print(f"Ошибка отправки START: {str(e)}")
return
# Ожидание ACK
debug_print("Ожидание ACK...")
ack = wait_for_ack(bus)
if not ack:
debug_print("Таймаут ACK START")
return
debug_print(f"Получен ACK: {list(ack.data)}")
# Отправка данных
packet_size = 8
for i in range(0, len(binary_data), packet_size):
chunk = binary_data[i:i+packet_size]
# Дополнение до 8 байт
if len(chunk) < 8:
chunk += b'\xFF' * (8 - len(chunk))
debug_print(f"Пакет {i//8}: {list(chunk)}")
data_msg = can.Message(
arbitration_id=DATA_CAN_ID,
data=chunk,
is_extended_id=False
)
try:
bus.send(data_msg)
except can.CanError as e:
debug_print(f"Ошибка отправки данных: {str(e)}")
return
ack = wait_for_ack(bus)
if not ack:
debug_print("Таймаут ACK DATA")
return
# Финал
debug_print("Отправка FINISH...")
finish_msg = can.Message(
arbitration_id=BOOT_CAN_END,
data=bytes([0xAA]),
is_extended_id=False
)
bus.send(finish_msg)
ack = wait_for_ack(bus, timeout=3)
if ack and ack.data[0] == 0xAA:
debug_print("Прошивка подтверждена!")
else:
debug_print("Ошибка верификации!")
except Exception as e:
debug_print(f"Критическая ошибка: {str(e)}")
finally:
bus.shutdown()
def wait_for_ack(bus, timeout=1.0):
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0) # Неблокирующий режим
if msg and msg.arbitration_id == ACK_CAN_ID:
return msg
return None
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Использование: sudo python3 can_flasher.py firmware.hex")
sys.exit(1)
send_firmware(sys.argv[1])