Добавил тест для записи и одновременно чтения ID устройства по CAN

This commit is contained in:
lulko 2025-02-28 10:08:55 +03:00
parent dc445c11b7
commit 119bc4f091

View file

@ -0,0 +1,73 @@
import can
import time
def send_write_read_requests():
try:
bus = can.interface.Bus(channel='can0', bustype='socketcan')
# Конфигурация сообщений (ЗАПОЛНИТЕ ВАШИ ЗНАЧЕНИЯ)
write_msg = {
'arbitration_id': 0x01, # CAN ID для записи
'data': [0xA0, 0x69, 0x00, 0x00], # Данные для записи (4 байта)
'description': "Установка id устройства"
}
read_msg = {
'arbitration_id': 0x01, # CAN ID для чтения
'data': [0x99], # Команда запроса данных
'description': "Запрос id устройства",
'response_id': 0x69, # Ожидаемый ID ответа
'timeout': 1.0 # Таймаут ожидания ответа (сек)
}
# 1. Отправка команды записи
print("Отправка команды записи...")
msg = can.Message(
arbitration_id=write_msg['arbitration_id'],
data=write_msg['data'],
is_extended_id=False
)
bus.send(msg)
print(f"Запись: ID={hex(msg.arbitration_id)}, Данные={list(msg.data)}")
# Ждем обработки команды устройством
time.sleep(1.5)
# 2. Отправка запроса чтения и ожидание ответа
print("\nОтправка запроса чтения...")
msg = can.Message(
arbitration_id=read_msg['arbitration_id'],
data=read_msg['data'],
is_extended_id=False
)
bus.send(msg)
print(f"Чтение: ID={hex(msg.arbitration_id)}, Команда={list(msg.data)}")
# Ожидаем ответа
start_time = time.time()
response_received = False
print("\nОжидание ответа...")
while (time.time() - start_time) < read_msg['timeout']:
response = bus.recv(timeout=0.1)
if response and response.arbitration_id == read_msg['response_id']:
print(f"\nПолучен ответ: ID={hex(response.arbitration_id)}")
print(f"Данные: {list(response.data)}")
print(f"Длина: {response.dlc} байт")
response_received = True
break
if not response_received:
print("\nОшибка: ответ не получен в течение заданного времени")
except KeyboardInterrupt:
print("\nПрерывание пользователем")
except Exception as e:
print(f"Ошибка: {str(e)}")
finally:
bus.shutdown()
print("\nCAN соединение закрыто")
if __name__ == '__main__':
send_write_read_requests()