103 lines
3.8 KiB
Python
103 lines
3.8 KiB
Python
import can
|
||
import time
|
||
import struct
|
||
# Конфигурация
|
||
CAN_INTERFACE = 'can0'
|
||
OLD_DEVICE_ID = 0x00 # Текущий ID устройства (по умолчанию)
|
||
REG_READ = 0x7 # Код команды чтения
|
||
REG_ID = 0x30 # Адрес регистра с REG_PMOTOR_POSPID_Kp устройства
|
||
|
||
def send_can_message(bus, can_id, data):
|
||
"""Отправка CAN-сообщения"""
|
||
try:
|
||
msg = can.Message(
|
||
arbitration_id=can_id,
|
||
data=data,
|
||
is_extended_id=False
|
||
)
|
||
bus.send(msg)
|
||
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}")
|
||
return True
|
||
except can.CanError as e:
|
||
print(f"Ошибка CAN: {e}")
|
||
return False
|
||
|
||
def receive_response(bus, timeout=1.0):
|
||
"""Ожидание ответа от устройства"""
|
||
start_time = time.time()
|
||
while time.time() - start_time < timeout:
|
||
msg = bus.recv(timeout=0.1)
|
||
if msg:
|
||
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
|
||
return msg
|
||
print("[Ошибка] Таймаут")
|
||
return None
|
||
|
||
def validate_crc16(data):
|
||
"""Расчет CRC16 (MODBUS) для проверки целостности данных"""
|
||
crc = 0xFFFF
|
||
for byte in data:
|
||
crc ^= byte
|
||
for _ in range(8):
|
||
if crc & 0x0001:
|
||
crc = (crc >> 1) ^ 0xA001
|
||
else:
|
||
crc >>= 1
|
||
return crc
|
||
|
||
# Инициализация CAN-интерфейса
|
||
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
|
||
|
||
# ======= 1. Запрос текущего ID устройства =======
|
||
|
||
# Формируем CAN ID для чтения: (OLD_DEVICE_ID << 4) | REG_READ
|
||
can_id_read = (OLD_DEVICE_ID << 4) | REG_READ
|
||
|
||
# Данные для запроса: [регистр, резервный байт]
|
||
data_read = [REG_ID, 0x00]
|
||
|
||
# Формируем полные данные для расчета CRC:
|
||
# - CAN ID разбивается на 2 байта (little-endian)
|
||
# - Добавляем данные запроса
|
||
full_data_for_crc = list(can_id_read.to_bytes(2, 'little')) + data_read
|
||
|
||
# Рассчитываем CRC и разбиваем на байты (little-endian)
|
||
crc = validate_crc16(full_data_for_crc)
|
||
crc_bytes = list(crc.to_bytes(2, 'little'))
|
||
|
||
# Собираем итоговый пакет: данные + CRC
|
||
packet_read = data_read + crc_bytes
|
||
|
||
print("Запрос на чтение ID:", packet_read)
|
||
send_can_message(bus, can_id_read, packet_read)
|
||
|
||
# ======= 2. Получение и проверка ответа =======
|
||
response = receive_response(bus)
|
||
if response:
|
||
data = response.data
|
||
|
||
if len(data) < 4:
|
||
print("Слишком короткий ответ")
|
||
|
||
# Проверяем минимальную длину ответа (данные + CRC)
|
||
else:
|
||
id_bytes = response.arbitration_id.to_bytes(1,byteorder='little')
|
||
#buff with id and data without CRC
|
||
full_data = list(id_bytes) + list(data[:-2])
|
||
print(f"Received full_data: {list(full_data)}")
|
||
received_crc = int.from_bytes(data[-2:], byteorder='little')
|
||
#calc CRC
|
||
calc_crc = validate_crc16(full_data)
|
||
|
||
print(f"Расчитанный CRC PYTHON : 0x{calc_crc:02X}")
|
||
if received_crc == calc_crc:
|
||
# Если CRC совпадает, проверяем структуру ответа:
|
||
kp_value = struct.unpack('<f', bytes(data[1:5]))[0]
|
||
print(f"Текущий Kp устройства: {kp_value:.3f}")
|
||
else:
|
||
print("Ошибка: CRC не совпадает")
|
||
else:
|
||
print("Устройство не ответило")
|
||
|
||
# Завершаем работу с шиной
|
||
bus.shutdown()
|