fix: typo in header name and change buf0 flags and fix tests

This commit is contained in:
Ilya Uraev 2025-04-19 13:42:10 +03:00
parent ffd778ab69
commit 54c8296622
5 changed files with 149 additions and 61 deletions

View file

@ -2,29 +2,68 @@ import can
import struct
import time
def process_can_message(msg):
if msg.dlc == 5: # Check the message length
print(f"Received message with ID: {msg.arbitration_id}")
print(f"Data: {msg.data}")
# The first byte determines the data type (flag)
flag = chr(msg.data[0])
if flag == 'A': # Angle
angle_bytes = msg.data[1:5]
angle = struct.unpack('<f', bytes(angle_bytes))[0]
print(f"Angle: {angle} degrees")
elif flag == 'V': # Velocity
velocity_bytes = msg.data[1:5]
velocity = struct.unpack('<f', bytes(velocity_bytes))[0]
print(f"Velocity: {velocity} rad/s")
elif flag == 'E' and msg.dlc >= 2: # Enable/Disable
enabled = msg.data[1] # Expecting 1 byte (0 or 1)
print(f"Enabled: {bool(enabled)}")
else:
print(f"Unknown flag: {flag}")
else:
print(f"Received message with unexpected length: {msg.dlc}")
REG_READ = 0x7
DEVICE_ID = 0x00 # Старый ID устройства
REG_POS = 0x72 # Предположим, что команда записи позиции — 0x2
CAN_INTERFACE = 'can0'
def validate_crc16(data):
"""Функция расчета CRC16 (MODBUS)"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def process_can_message(bus):
# ID и команда
arbitration_id = (DEVICE_ID << 4) | REG_READ
id_bytes = list(arbitration_id.to_bytes(2, byteorder='little'))
# Команда и параметры
data_read = [REG_POS] + list(struct.pack('<f', 0))
# Для CRC — весь пакет: id + команда + параметры
full_data_for_crc = id_bytes + data_read
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
# Итоговый пакет
packet = data_read + crc_bytes
# Создание и отправка сообщения
msg = can.Message(
arbitration_id=arbitration_id,
is_extended_id=False,
data=packet
)
try:
bus.send(msg)
except can.CanError:
print("Ошибка отправки сообщения")
def receive_response(bus, timeout=2.0):
"""Ожидание ответа"""
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=timeout)
data_bytes = msg.data[1:5] # берём первые 4 байта
print(data_bytes)
angle = struct.unpack('<f', bytes(data_bytes))[0] # '<f' = little-endian float
# angle = int.from_bytes(data_bytes, byteorder='little', signed=True) # или 'big'
if msg:
print(f"[Прием] Angle: 0x{msg.arbitration_id:03X}, Данные: {angle}, RAW: {bytes(msg.data)}")
return msg
print("[Ошибка] Таймаут")
return None
def receive_can_messages():
try:
@ -34,9 +73,26 @@ def receive_can_messages():
print("Waiting for messages on the CAN bus...")
while True:
msg = bus.recv()
if msg:
process_can_message(msg)
process_can_message(bus)
response = receive_response(bus)
if response:
data = response.data
if len(data) < 4:
print("Ответ слишком короткий")
else:
received_crc = int.from_bytes(data[-2:], byteorder='little')
print("Полученный CRC: ", received_crc)
# Расчет CRC по всему пакету без CRC
calc_crc = validate_crc16(data[:-2])
if received_crc == calc_crc:
if data[0] == REG_POS:
print("\nУСПЕХ!")
else:
print(f"Некорректный ответ: {list(data)}")
else:
print("CRC не совпадает, данные повреждены.")
else:
print("Нет ответа от устройства.")
except KeyboardInterrupt:
print("\nExiting program...")