ADD: DEADBEEF packet and vizualize #8

This commit is contained in:
vanyabeat 2024-01-24 00:34:54 +03:00
parent 578962d9fe
commit 8abc3b0709

View file

@ -1,48 +1,88 @@
import serial import serial
import struct
import threading
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import matplotlib.animation as animation from matplotlib.animation import FuncAnimation
import numpy as np # Configure your serial port settings.
SERIAL_PORT = '/dev/ttyUSB0' # Update to match your serial port.
BAUD_RATE = 115200 # Update to match your settings.
FLOAT_SIZE = 4 # Size of a float in bytes (usually 4 bytes)
# Open the serial port. Replace 'COM3' with your serial port name. # Shared state, updated by the reading thread, read by the plotting function.
serial_port = '/dev/ttyUSB0' # Initialize values with None.
baud_rate = 115200 # Adjust as per your device configuration adc_values = [None, None, None]
ser = serial.Serial(serial_port, baud_rate, timeout=1) angle = None
# Setup the matplotlib figure and axes for polar plot. # Define the thread that will read the serial data.
fig, ax = plt.subplots(subplot_kw={'projection': 'polar'}) def read_serial_data(ser):
line, = ax.plot([], [], 'b-', lw=2) # Line object to update the angle global adc_values, angle
ax.set_ylim(0, 1) # Set the radius range (fixed in this case) try:
while True:
byte = ser.read(1)
if byte == b'\xDE':
next_byte = ser.read(1)
if next_byte == b'\xAD':
data = ser.read(FLOAT_SIZE * 4)
if len(data) == FLOAT_SIZE * 4:
floats = struct.unpack('<4f', data)
packet_end = ser.read(2)
if packet_end == bytes([0xBE, 0xAF]):
# Update our shared state.
adc_values = list(floats[0:3])
angle = floats[3]
except serial.SerialException as e:
print("Serial exception:", e)
except struct.error as e:
print("Unpacking error:", e)
except:
print("Other error or thread exiting.")
# Adjust the orientation so that 0 is up and the angle proceeds clockwise. # Start the serial reader in a separate thread.
ax.set_theta_zero_location('N') thread = threading.Thread(target=read_serial_data, args=(serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1),))
ax.set_theta_direction(-1) thread.daemon = True # Thread will be killed when the main thread exits.
thread.start()
# Initialization function to clear the data. # Set up the Matplotlib figures and axes.
def init(): fig = plt.figure(figsize=(10, 5))
line.set_data([], [])
return (line,)
# Update function that reads from the serial port # Create a normal subplot for the bar graph.
# and updates the plot with the new angle. ax_bar = fig.add_subplot(1, 2, 1)
def update(frame):
line.set_data([], [])
raw_data = ser.readline().strip()
angle = 0
if raw_data:
try:
angle = float(raw_data)
except:
angle = 0
angle_rad = np.deg2rad(angle) # Convert to radians
# Update the plot with the new angle.
line.set_data([angle_rad, angle_rad], [0, 1])
return (line,)
# Create the animation object with a suitable update interval (ms). # Create another subplot for the polar plot by specifying polar=True.
ani = animation.FuncAnimation(fig, update, init_func=init, blit=True, interval=1) ax_polar = fig.add_subplot(1, 2, 2, polar=True)
# Start the plot animation # Set the properties for polar plot.
ax_polar.set_theta_zero_location('N')
ax_polar.set_theta_direction(-1)
# Bar plot axis.
bars = ax_bar.bar(['ADC0', 'ADC1', 'ADC2'], [0, 0, 0])
ax_bar.set_ylim(0, 0.4) # Set the limit for your ADC values.
# Polar plot axis.
ax_polar.set_theta_zero_location('N')
ax_polar.set_theta_direction(-1)
angle_plot, = ax_polar.plot([], [], 'go') # Initial empty plot.
# Update function for the animation.
def update(i):
# Global variables holding the shared state.
global adc_values, angle
# Check if we have received new data.
if all(v is not None for v in adc_values) and angle is not None:
# Update the bar plot.
for bar, value in zip(bars, adc_values):
bar.set_height(value)
# Update the polar plot.
angle_rad = angle / 180.0 * 3.14159 # Convert degrees to radians.
angle_plot.set_data([angle_rad], [1]) # Set angle for the plot, using radial distance of 1.
return bars.patches + [angle_plot]
# Create the animation itself, which will call update function repeatedly.
ani = FuncAnimation(fig, update, blit=True, interval=1) # Update interval is in milliseconds.
# Show the plot.
plt.show() plt.show()
# Closing the serial port after the plot is closed.
ser.close()