106 lines
3.7 KiB
Python
106 lines
3.7 KiB
Python
import can
|
||
import time
|
||
|
||
# Конфигурация CAN интерфейса
|
||
CAN_INTERFACE = 'can0'
|
||
OLD_DEVICE_ID = 0x70 # Текущий ID устройства
|
||
NEW_DEVICE_ID = 0x45 # Новый ID устройства
|
||
REG_WRITE = 0x8 # Команда записи (msg_ch)
|
||
REG_READ = 0x7 # Команда чтения (msg_ch)
|
||
REG_ID = 0x1 # Регистр для работы с ID
|
||
|
||
def calculate_crc16_modbus(data: bytes) -> int:
|
||
crc = 0xFFFF
|
||
for byte in data:
|
||
crc ^= byte
|
||
for _ in range(8):
|
||
if crc & 0x0001:
|
||
crc = (crc >> 1) ^ 0xA001
|
||
else:
|
||
crc >>= 1
|
||
return crc
|
||
|
||
def send_can_message(bus, can_id, data):
|
||
"""Отправка CAN-сообщения с CRC"""
|
||
# Добавляем CRC к данным
|
||
data_bytes = bytes(data)
|
||
crc = calculate_crc16_modbus(data_bytes)
|
||
full_data = list(data) + [0x00] * (8 - len(data))
|
||
full_data[6] = (crc >> 8) & 0xFF
|
||
full_data[7] = crc & 0xFF
|
||
|
||
try:
|
||
msg = can.Message(
|
||
arbitration_id=can_id,
|
||
data=full_data,
|
||
is_extended_id=False
|
||
)
|
||
bus.send(msg)
|
||
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {full_data}")
|
||
return True
|
||
except can.CanError as e:
|
||
print(f"Ошибка CAN: {e}")
|
||
return False
|
||
|
||
def receive_response(bus, timeout=2.0):
|
||
"""Ожидание ответа с проверкой CRC"""
|
||
start_time = time.time()
|
||
while time.time() - start_time < timeout:
|
||
msg = bus.recv(timeout=0.1)
|
||
if msg:
|
||
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
|
||
|
||
# Проверяем CRC
|
||
if len(msg.data) < 2:
|
||
print("Ошибка: Слишком короткое сообщение")
|
||
continue
|
||
|
||
received_crc = (msg.data[-2] << 8) | msg.data[-1]
|
||
calculated_crc = calculate_crc16_modbus(bytes(msg.data[:-2]))
|
||
|
||
if received_crc != calculated_crc:
|
||
print(f"Ошибка CRC! Получено: 0x{received_crc:04X}, Ожидалось: 0x{calculated_crc:04X}")
|
||
continue
|
||
|
||
return msg.data[:-2] # Возвращаем данные без CRC
|
||
|
||
print("[Ошибка] Таймаут ожидания ответа")
|
||
return None
|
||
|
||
# Инициализация CAN шины
|
||
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
|
||
|
||
# Формирование и отправка команды изменения ID
|
||
write_can_id = (OLD_DEVICE_ID << 4) | REG_WRITE
|
||
write_data = [REG_ID, NEW_DEVICE_ID]
|
||
|
||
print("\n--- Отправка команды изменения ID ---")
|
||
if not send_can_message(bus, write_can_id, write_data):
|
||
print("Ошибка отправки!")
|
||
bus.shutdown()
|
||
exit(1)
|
||
|
||
# Ожидание обработки команды
|
||
time.sleep(0.5)
|
||
|
||
# Запрос подтверждения нового ID
|
||
read_can_id = (NEW_DEVICE_ID << 4) | REG_READ
|
||
read_data = [REG_ID, 0x00]
|
||
|
||
print("\n--- Проверка нового ID ---")
|
||
if not send_can_message(bus, read_can_id, read_data):
|
||
print("Ошибка отправки запроса!")
|
||
bus.shutdown()
|
||
exit(1)
|
||
|
||
# Получение ответа
|
||
response = receive_response(bus)
|
||
if response:
|
||
if len(response) >= 2 and response[0] == ord('I') and response[1] == NEW_DEVICE_ID:
|
||
print(f"\nУСПЕХ! ID устройства изменен на 0x{NEW_DEVICE_ID:02X}")
|
||
else:
|
||
print(f"\nОШИБКА! Некорректный ответ: {response}")
|
||
else:
|
||
print("\nУстройство не ответило")
|
||
|
||
bus.shutdown()
|