rename python test file names
This commit is contained in:
parent
10c7da1107
commit
3fd612a56e
13 changed files with 109 additions and 57 deletions
|
@ -1,47 +0,0 @@
|
||||||
import can
|
|
||||||
import struct
|
|
||||||
import time
|
|
||||||
|
|
||||||
def process_can_message(msg):
|
|
||||||
if msg.dlc == 5: # Check the message length
|
|
||||||
print(f"Received message with ID: {msg.arbitration_id}")
|
|
||||||
print(f"Data: {msg.data}")
|
|
||||||
|
|
||||||
# The first byte determines the data type (flag)
|
|
||||||
flag = chr(msg.data[0])
|
|
||||||
|
|
||||||
if flag == 'A': # Angle
|
|
||||||
angle_bytes = msg.data[1:5]
|
|
||||||
angle = struct.unpack('<f', bytes(angle_bytes))[0]
|
|
||||||
print(f"Angle: {angle} degrees")
|
|
||||||
elif flag == 'V': # Velocity
|
|
||||||
velocity_bytes = msg.data[1:5]
|
|
||||||
velocity = struct.unpack('<f', bytes(velocity_bytes))[0]
|
|
||||||
print(f"Velocity: {velocity} rad/s")
|
|
||||||
elif flag == 'E' and msg.dlc >= 2: # Enable/Disable
|
|
||||||
enabled = msg.data[1] # Expecting 1 byte (0 or 1)
|
|
||||||
print(f"Enabled: {bool(enabled)}")
|
|
||||||
else:
|
|
||||||
print(f"Unknown flag: {flag}")
|
|
||||||
else:
|
|
||||||
print(f"Received message with unexpected length: {msg.dlc}")
|
|
||||||
|
|
||||||
def receive_can_messages():
|
|
||||||
try:
|
|
||||||
# Connect to the CAN bus
|
|
||||||
bus = can.interface.Bus(channel='can0', bustype='socketcan')
|
|
||||||
|
|
||||||
print("Waiting for messages on the CAN bus...")
|
|
||||||
|
|
||||||
while True:
|
|
||||||
msg = bus.recv()
|
|
||||||
if msg:
|
|
||||||
process_can_message(msg)
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("\nExiting program...")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
receive_can_messages()
|
|
|
@ -4,7 +4,7 @@ import struct
|
||||||
import sys
|
import sys
|
||||||
# Конфигурация
|
# Конфигурация
|
||||||
CAN_INTERFACE = 'can0'
|
CAN_INTERFACE = 'can0'
|
||||||
DEVICE_ID = int(sys.argv[1]) # ID устройства по умолчанию
|
DEVICE_ID = 0x27 # ID ADDR for servo
|
||||||
REG_READ = 0x7 # Код команды чтения
|
REG_READ = 0x7 # Код команды чтения
|
||||||
REG_MOTOR_POSPID_Kp = 0x30
|
REG_MOTOR_POSPID_Kp = 0x30
|
||||||
REG_MOTOR_POSPID_Ki = 0x31
|
REG_MOTOR_POSPID_Ki = 0x31
|
||||||
|
@ -120,7 +120,7 @@ def main():
|
||||||
bus.shutdown()
|
bus.shutdown()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if len(sys.argv) != 2:
|
# if len(sys.argv) != 2:
|
||||||
print("Используйте python3 read_pid.py addr")
|
# print("Используйте python3 read_pid.py addr")
|
||||||
sys.exit(1)
|
# sys.exit(1)
|
||||||
main()
|
main()
|
98
controller/fw/embed/test/read_angle.py
Normal file
98
controller/fw/embed/test/read_angle.py
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
import can
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
# Константы
|
||||||
|
CAN_INTERFACE = 'can0'
|
||||||
|
DEVICE_ID = 0x27 # ID ADDR for servo
|
||||||
|
REG_WRITE = 0x7
|
||||||
|
REG_POS = 0x72 # MOTOR+ANGLE = 0x72
|
||||||
|
|
||||||
|
def validate_crc16(data):
|
||||||
|
# Calculate CRC16
|
||||||
|
crc = 0xFFFF
|
||||||
|
for byte in data:
|
||||||
|
crc ^= byte
|
||||||
|
for _ in range(8):
|
||||||
|
if crc & 0x0001:
|
||||||
|
crc = (crc >> 1) ^ 0xA001
|
||||||
|
else:
|
||||||
|
crc >>= 1
|
||||||
|
return crc
|
||||||
|
|
||||||
|
|
||||||
|
def receive_response(bus, timeout=1.0):
|
||||||
|
"""Ожидание ответа от устройства"""
|
||||||
|
start_time = time.time()
|
||||||
|
while time.time() - start_time < timeout:
|
||||||
|
msg = bus.recv(timeout=0.1)
|
||||||
|
if msg:
|
||||||
|
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
|
||||||
|
return msg
|
||||||
|
print("[Ошибка] Таймаут")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def send_target_angle(bus):
|
||||||
|
# ID and cmd
|
||||||
|
arbitration_id = (DEVICE_ID << 4) | REG_WRITE
|
||||||
|
id_bytes = list(arbitration_id.to_bytes(2, byteorder='little'))
|
||||||
|
|
||||||
|
# cmd + parametrs
|
||||||
|
data_write = [REG_POS]
|
||||||
|
|
||||||
|
|
||||||
|
full_data_for_crc = id_bytes + data_write
|
||||||
|
crc = validate_crc16(full_data_for_crc)
|
||||||
|
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
|
||||||
|
|
||||||
|
# Full packet
|
||||||
|
packet = data_write + crc_bytes
|
||||||
|
|
||||||
|
|
||||||
|
msg = can.Message(
|
||||||
|
arbitration_id=arbitration_id,
|
||||||
|
is_extended_id=False,
|
||||||
|
data=packet
|
||||||
|
)
|
||||||
|
|
||||||
|
bus.send(msg)
|
||||||
|
response = receive_response(bus)
|
||||||
|
|
||||||
|
|
||||||
|
if response:
|
||||||
|
data = response.data
|
||||||
|
|
||||||
|
if len(data) < 4:
|
||||||
|
print("Слишком короткий ответ")
|
||||||
|
|
||||||
|
# Проверяем минимальную длину ответа (данные + CRC)
|
||||||
|
else:
|
||||||
|
id_bytes = response.arbitration_id.to_bytes(1,byteorder='little')
|
||||||
|
#buff with id and data without CRC
|
||||||
|
full_data = list(id_bytes) + list(data[:-2])
|
||||||
|
print(f"Received full_data: {list(full_data)}")
|
||||||
|
received_crc = int.from_bytes(data[-2:], byteorder='little')
|
||||||
|
#calc CRC
|
||||||
|
calc_crc = validate_crc16(full_data)
|
||||||
|
|
||||||
|
print(f"Расчитанный CRC PYTHON : 0x{calc_crc:02X}")
|
||||||
|
if received_crc == calc_crc:
|
||||||
|
# Если CRC совпадает, проверяем структуру ответа:
|
||||||
|
velocity = struct.unpack('<f', bytes(data[1:5]))[0]
|
||||||
|
print(f"Угол: {velocity}")
|
||||||
|
else:
|
||||||
|
print("Ошибка: CRC не совпадает")
|
||||||
|
else:
|
||||||
|
print("Устройство не ответило")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# Инициализация CAN
|
||||||
|
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
|
||||||
|
print("CAN шина инициализирована.")
|
||||||
|
|
||||||
|
send_target_angle(bus)
|
||||||
|
|
||||||
|
bus.shutdown()
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
|
@ -1,3 +1,4 @@
|
||||||
|
from can.interface import Bus
|
||||||
import can
|
import can
|
||||||
import struct
|
import struct
|
||||||
import time
|
import time
|
||||||
|
@ -5,7 +6,7 @@ import argparse
|
||||||
|
|
||||||
# Константы
|
# Константы
|
||||||
CAN_INTERFACE = 'can0'
|
CAN_INTERFACE = 'can0'
|
||||||
DEVICE_ID = 0x00 # ID ADDR for servo
|
DEVICE_ID = 0x27 # ID ADDR for servo
|
||||||
REG_WRITE = 0x8
|
REG_WRITE = 0x8
|
||||||
REG_POS = 0x72 # MOTOR+ANGLE = 0x72
|
REG_POS = 0x72 # MOTOR+ANGLE = 0x72
|
||||||
|
|
||||||
|
@ -56,7 +57,7 @@ def main():
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# Инициализация CAN
|
# Инициализация CAN
|
||||||
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
|
bus = Bus(channel=CAN_INTERFACE, bustype='socketcan')
|
||||||
print("CAN шина инициализирована.")
|
print("CAN шина инициализирована.")
|
||||||
|
|
||||||
send_target_angle(bus, args.angle)
|
send_target_angle(bus, args.angle)
|
|
@ -4,7 +4,7 @@ import struct
|
||||||
import sys
|
import sys
|
||||||
# Конфигурация
|
# Конфигурация
|
||||||
CAN_INTERFACE = 'can0'
|
CAN_INTERFACE = 'can0'
|
||||||
DEVICE_ID = int(sys.argv[1]) # Установка id устройства ввод в десятичной системе счисления
|
DEVICE_ID = 0x27 # ID ADDR for servo
|
||||||
REG_WRITE = 0x8 # Код команды записи
|
REG_WRITE = 0x8 # Код команды записи
|
||||||
REG_MOTOR_POSPID_Kp = 0x30
|
REG_MOTOR_POSPID_Kp = 0x30
|
||||||
REG_MOTOR_POSPID_Ki = 0x31
|
REG_MOTOR_POSPID_Ki = 0x31
|
||||||
|
@ -89,7 +89,7 @@ def main():
|
||||||
bus.shutdown()
|
bus.shutdown()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if len(sys.argv) != 2:
|
# if len(sys.argv) != 2:
|
||||||
print("Используйте python3 pid_set.py addr")
|
# print("Используйте python3 pid_set.py addr")
|
||||||
sys.exit(1)
|
# sys.exit(1)
|
||||||
main()
|
main()
|
Loading…
Add table
Add a link
Reference in a new issue