servo/controller/fw/embed/test/read_angle.py

98 lines
2.8 KiB
Python

import can
import struct
import time
import argparse
# Константы
CAN_INTERFACE = 'can0'
DEVICE_ID = 0x27 # ID ADDR for servo
REG_WRITE = 0x7
REG_POS = 0x72 # MOTOR+ANGLE = 0x72
def validate_crc16(data):
# Calculate CRC16
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def receive_response(bus, timeout=1.0):
"""Ожидание ответа от устройства"""
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
return msg
print("[Ошибка] Таймаут")
return None
def send_target_angle(bus):
# ID and cmd
arbitration_id = (DEVICE_ID << 4) | REG_WRITE
id_bytes = list(arbitration_id.to_bytes(2, byteorder='little'))
# cmd + parametrs
data_write = [REG_POS]
full_data_for_crc = id_bytes + data_write
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
# Full packet
packet = data_write + crc_bytes
msg = can.Message(
arbitration_id=arbitration_id,
is_extended_id=False,
data=packet
)
bus.send(msg)
response = receive_response(bus)
if response:
data = response.data
if len(data) < 4:
print("Слишком короткий ответ")
# Проверяем минимальную длину ответа (данные + CRC)
else:
id_bytes = response.arbitration_id.to_bytes(1,byteorder='little')
#buff with id and data without CRC
full_data = list(id_bytes) + list(data[:-2])
print(f"Received full_data: {list(full_data)}")
received_crc = int.from_bytes(data[-2:], byteorder='little')
#calc CRC
calc_crc = validate_crc16(full_data)
print(f"Расчитанный CRC PYTHON : 0x{calc_crc:02X}")
if received_crc == calc_crc:
# Если CRC совпадает, проверяем структуру ответа:
velocity = struct.unpack('<f', bytes(data[1:5]))[0]
print(f"Угол: {velocity}")
else:
print("Ошибка: CRC не совпадает")
else:
print("Устройство не ответило")
def main():
# Инициализация CAN
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
print("CAN шина инициализирована.")
send_target_angle(bus)
bus.shutdown()
if __name__ == '__main__':
main()