import can import time import sys # Конфигурация CAN_INTERFACE = 'can0' OLD_DEVICE_ID = int(sys.argv[1]) # Текущий ID устройства REG_READ = 0x7 # Код команды чтения REG_ID = 0x01 # Адрес регистра с ID устройства def flush_can_buffer(bus, duration=0.3): """Очистка входного буфера CAN""" start_time = time.time() flushed_count = 0 while time.time() - start_time < duration: msg = bus.recv(timeout=0) if msg: flushed_count += 1 print(f"Очищено сообщений из буфера: {flushed_count}") def send_can_message(bus, can_id, data): """Отправка CAN-сообщения""" try: msg = can.Message( arbitration_id=can_id, data=data, is_extended_id=False ) bus.send(msg) print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}") return True except can.CanError as e: print(f"Ошибка CAN: {e}") return False def receive_response(bus, timeout=1.0): """Ожидание ответа от устройства (сохраняем вашу оригинальную логику)""" start_time = time.time() while time.time() - start_time < timeout: msg = bus.recv(timeout=0.1) if msg: print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}") return msg print("[Ошибка] Таймаут приема") return None def validate_crc16(data): """Расчет CRC16 (MODBUS) для проверки целостности данных""" crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc def main(): # Инициализация CAN-интерфейса try: bus = can.interface.Bus( channel=CAN_INTERFACE, bustype='socketcan', bitrate=1000000 # Совпадает с устройством ) except Exception as e: print(f"Ошибка инициализации CAN: {e}") sys.exit(1) # ======= 1. Подготовка запроса ======= can_id_read = (OLD_DEVICE_ID << 4) | REG_READ data_read = [REG_ID, 0x00] # Формируем полные данные для расчета CRC: full_data_for_crc = list(can_id_read.to_bytes(2, 'little')) + data_read # Рассчитываем CRC crc = validate_crc16(full_data_for_crc) crc_bytes = list(crc.to_bytes(2, 'little')) # Собираем итоговый пакет packet_read = data_read + crc_bytes # ======= 2. Отправка запроса с повторами ======= max_retries = 3 response = None for attempt in range(max_retries): print(f"\nПопытка {attempt+1}/{max_retries}") # Очистка буфера перед отправкой flush_can_buffer(bus, 0.3) # Отправка запроса print(f"Отправка запроса на чтение ID: {packet_read}") if not send_can_message(bus, can_id_read, packet_read): print("Ошибка отправки, повтор...") time.sleep(0.2) continue # Ожидание ответа response = receive_response(bus, timeout=0.5) if response: break print("Ответ не получен, повтор...") time.sleep(0.2) # ======= 3. Обработка ответа ======= if not response: print("Устройство не ответило после всех попыток") bus.shutdown() sys.exit(1) data = response.data if len(data) < 4: print("Слишком короткий ответ") bus.shutdown() sys.exit(1) # Проверяем минимальную длину ответа (данные + CRC) id_bytes = response.arbitration_id.to_bytes(1, byteorder='little') full_data = list(id_bytes) + list(data[:-2]) print(f"Полные данные для CRC: {full_data}") received_crc = int.from_bytes(data[-2:], byteorder='little') calc_crc = validate_crc16(full_data) print(f"Расчитанный CRC: 0x{calc_crc:04X}, Полученный CRC: 0x{received_crc:04X}") if received_crc == calc_crc: print(f"Текущий ID устройства: 0x{data[1]:02X}") else: print("Ошибка: CRC не совпадает") # Завершаем работу с шиной bus.shutdown() if __name__ == "__main__": if len(sys.argv) != 2: print("Использование: python3 can_flasher.py <адрес_устройства>") print("Пример: python3 can_flasher.py 1") sys.exit(1) main()