Add Python test scripts for CAN communication

- Created `python_can.py`:
  - Implements a CAN message receiver that processes angle, velocity, and enable/disable flags.
- Added `python_enable_motor.py`:
  - Sends enable/disable commands to the motor via CAN.
- Added `python_send_angle.py`:
  - Sends target angle commands over the CAN bus.
- Added `python_send_velocity.py`:
  - Sends target velocity commands over the CAN bus.
- Configured all scripts to use `python-can` library with `socketcan` interface for CAN communication.
This commit is contained in:
Ilya Uraev 2025-01-18 18:34:05 +03:00
parent 1534b854fc
commit 51e8fac95a
4 changed files with 162 additions and 0 deletions

View file

@ -0,0 +1,47 @@
import can
import struct
import time
def process_can_message(msg):
if msg.dlc == 5: # Check the message length
print(f"Received message with ID: {msg.arbitration_id}")
print(f"Data: {msg.data}")
# The first byte determines the data type (flag)
flag = chr(msg.data[0])
if flag == 'A': # Angle
angle_bytes = msg.data[1:5]
angle = struct.unpack('<f', bytes(angle_bytes))[0]
print(f"Angle: {angle} degrees")
elif flag == 'V': # Velocity
velocity_bytes = msg.data[1:5]
velocity = struct.unpack('<f', bytes(velocity_bytes))[0]
print(f"Velocity: {velocity} rad/s")
elif flag == 'E' and msg.dlc >= 2: # Enable/Disable
enabled = msg.data[1] # Expecting 1 byte (0 or 1)
print(f"Enabled: {bool(enabled)}")
else:
print(f"Unknown flag: {flag}")
else:
print(f"Received message with unexpected length: {msg.dlc}")
def receive_can_messages():
try:
# Connect to the CAN bus
bus = can.interface.Bus(channel='can0', bustype='socketcan')
print("Waiting for messages on the CAN bus...")
while True:
msg = bus.recv()
if msg:
process_can_message(msg)
except KeyboardInterrupt:
print("\nExiting program...")
except Exception as e:
print(f"Error: {e}")
if __name__ == '__main__':
receive_can_messages()

View file

@ -0,0 +1,47 @@
import can
import sys
# Function to send the motor enable/disable command
def send_motor_enable(bus, enable):
"""
Sends a command to enable or disable the motor.
:param bus: The CAN bus
:param enable: 1 to enable the motor, 0 to disable it
"""
msg = can.Message()
msg.arbitration_id = 1 # Message ID
msg.is_extended_id = False
msg.dlc = 2 # Message length (flag + 1 byte of data)
msg.data = [ord('E'), enable] # 'E' for the command, followed by 0 or 1
try:
bus.send(msg)
state = "enabled" if enable else "disabled"
print(f"Sent message to {state} motor")
print(f"Message data: {msg.data}")
except can.CanError:
print("Message failed to send")
sys.exit(1) # Exit the program on failure
def main():
# CAN interface setup
try:
bus = can.interface.Bus(channel='can0', bustype='socketcan', bitrate=1000000) # Ensure the bitrate matches the microcontroller settings
print("CAN bus initialized.")
# Ensure the state is passed via arguments
if len(sys.argv) != 2 or sys.argv[1] not in ['0', '1']:
print("Usage: python3 script_name.py <0|1>")
print("0 - Disable motor, 1 - Enable motor")
sys.exit(1)
enable = int(sys.argv[1])
send_motor_enable(bus, enable)
except Exception as e:
print(f"Error initializing CAN bus: {e}")
sys.exit(1)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,34 @@
import can
import struct
import time
# Function to send the target angle
def send_target_angle(bus, target_angle):
msg = can.Message()
msg.arbitration_id = 1 # Message ID
msg.is_extended_id = False
msg.dlc = 5 # Message length
msg.data = [ord('A')] + list(struct.pack('<f', target_angle)) # 'A' for the command identifier, followed by the angle in float format
try:
bus.send(msg)
print(f"Sent message with target angle: {target_angle} degrees")
print(f"Message data: {msg.data}")
except can.CanError:
print("Message failed to send")
def main():
# CAN interface setup
bus = can.interface.Bus(channel='can0', bustype='socketcan', bitrate=1000000) # Ensure the bitrate matches the microcontroller settings
print("CAN bus initialized, sending target angles...")
# Target angle
target_angle = 0.0
# Loop to send messages
while True:
send_target_angle(bus, target_angle)
time.sleep(1) # Delay before sending the next message
if __name__ == '__main__':
main()

View file

@ -0,0 +1,34 @@
import can
import struct
import time
# Function to send the target speed
def send_target_speed(bus, target_speed):
msg = can.Message()
msg.arbitration_id = 1 # Message ID
msg.is_extended_id = False
msg.dlc = 5 # Message length
msg.data = [ord('V')] + list(struct.pack('<f', target_speed)) # 'V' for the command identifier, followed by the speed in float format
try:
bus.send(msg)
print(f"Sent message with target speed: {target_speed} m/s")
print(f"Message data: {msg.data}")
except can.CanError:
print("Message failed to send")
def main():
# CAN interface setup
bus = can.interface.Bus(channel='can0', bustype='socketcan', bitrate=1000000) # Ensure the bitrate matches the microcontroller settings
print("CAN bus initialized, sending target speeds...")
# Target speed
target_speed = 0.0 # in meters per second
# Loop to send messages
while True:
send_target_speed(bus, target_speed)
time.sleep(1) # Delay before sending the next message
if __name__ == '__main__':
main()