This commit is contained in:
Valentin Dabstep 2025-06-04 09:57:43 +03:00
commit 6be663c914
60 changed files with 29349 additions and 490 deletions

View file

@ -0,0 +1,141 @@
import can
import sys
import time
from intelhex import IntelHex
# Конфигурация
CAN_CHANNEL = 'socketcan'
CAN_INTERFACE = 'can0'
CAN_BITRATE = 1000000
#ch =int(input("Введите id устройства:"))
ch = int(sys.argv[2])
BOOT_CAN_ID = (ch * 16) + 1
DATA_CAN_ID = (ch * 16) + 3
BOOT_CAN_END = (ch * 16) + 2
ACK_CAN_ID = 0x05
#конфиг для crc16 ibm
def debug_print(msg):
print(f"[DEBUG] {msg}")
def calculate_crc16(data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def send_firmware(hex_file):
try:
debug_print("Инициализация CAN...")
bus = can.interface.Bus(
channel=CAN_INTERFACE,
bustype=CAN_CHANNEL,
bitrate=CAN_BITRATE
)
debug_print("Чтение HEX-файла...")
ih = IntelHex(hex_file)
binary_data = ih.tobinstr() # Исправлено на tobinstr()
fw_size = len(binary_data)
debug_print(f"Размер прошивки: {fw_size} байт")
# Расчет CRC
debug_print("Расчёт CRC...")
# calculator = Calculator(Crc16.IBM)
fw_crc = calculate_crc16(binary_data)
debug_print(f"CRC: 0x{fw_crc:04X}")
# Отправка START
start_data = bytearray([0x01])
start_data += fw_size.to_bytes(4, 'little')
start_data += fw_crc.to_bytes(2, 'little')
debug_print(f"START: {list(start_data)}")
start_msg = can.Message(
arbitration_id=BOOT_CAN_ID,
data=bytes(start_data),
is_extended_id=False
)
try:
bus.send(start_msg)
except can.CanError as e:
debug_print(f"Ошибка отправки START: {str(e)}")
return
# Ожидание ACK
debug_print("Ожидание ACK...")
ack = wait_for_ack(bus)
if not ack:
debug_print("Таймаут ACK START")
return
debug_print(f"Получен ACK: {list(ack.data)}")
# Отправка данных
packet_size = 8
for i in range(0, len(binary_data), packet_size):
chunk = binary_data[i:i+packet_size]
# Дополнение до 8 байт
if len(chunk) < 8:
chunk += b'\xFF' * (8 - len(chunk))
debug_print(f"Пакет {i//8}: {list(chunk)}")
data_msg = can.Message(
arbitration_id=DATA_CAN_ID,
data=chunk,
is_extended_id=False
)
try:
bus.send(data_msg)
except can.CanError as e:
debug_print(f"Ошибка отправки данных: {str(e)}")
return
ack = wait_for_ack(bus)
if not ack:
debug_print("Таймаут ACK DATA")
return
# Финал
debug_print("Отправка FINISH...")
finish_msg = can.Message(
arbitration_id=BOOT_CAN_END,
data=bytes([0xAA]),
is_extended_id=False
)
bus.send(finish_msg)
ack = wait_for_ack(bus, timeout=1.0)
if ack and ack.data[0] == 0xAA:
debug_print("Прошивка подтверждена!")
else:
debug_print("Ошибка верификации!")
except Exception as e:
debug_print(f"Критическая ошибка: {str(e)}")
finally:
bus.shutdown()
def wait_for_ack(bus, timeout=1.0):
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1) # Неблокирующий режим
if msg and msg.arbitration_id == ACK_CAN_ID:
return msg
return None
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Использование: sudo python3 can_flasher.py firmware.hex")
sys.exit(1)
send_firmware(sys.argv[1])

View file

@ -0,0 +1,70 @@
import can
import time
import sys
# Конфигурация
CAN_INTERFACE = 'can0'
OLD_DEVICE_ID = int(sys.argv[1]) # Текущий ID устройства (по умолчанию)
REG_WRITE = 0x8 # Код команды чтения
REG_ID = 0x55 # Адрес регистра с Firmware Update
def send_can_message(bus, can_id, data):
"""Отправка CAN-сообщения"""
try:
msg = can.Message(
arbitration_id=can_id,
data=data,
is_extended_id=False
)
bus.send(msg)
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}")
return True
except can.CanError as e:
print(f"Ошибка CAN: {e}")
return False
def validate_crc16(data):
"""Расчет CRC16 (MODBUS) для проверки целостности данных"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
# Инициализация CAN-интерфейса
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
# ======= 1. Запрос текущего ID устройства =======
# Формируем CAN ID для чтения: (OLD_DEVICE_ID << 4) | REG_READ
can_id_read = (OLD_DEVICE_ID << 4) | REG_WRITE
# Данные для запроса: [регистр, резервный байт]
data_read = [REG_ID, 0x00]
# Формируем полные данные для расчета CRC:
# - CAN ID разбивается на 2 байта (little-endian)
# - Добавляем данные запроса
full_data_for_crc = list(can_id_read.to_bytes(2, 'little')) + data_read
# Рассчитываем CRC и разбиваем на байты (little-endian)
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, 'little'))
# Собираем итоговый пакет: данные + CRC
packet_read = data_read + crc_bytes
print("Переход в boot режим", packet_read)
send_can_message(bus, can_id_read, packet_read)
bus.shutdown()
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Использование: python3 firmware_test.py address")
sys.exit(1)

View file

@ -0,0 +1,103 @@
import can
import time
import struct
# Конфигурация
CAN_INTERFACE = 'can0'
OLD_DEVICE_ID = 0x00 # Текущий ID устройства (по умолчанию)
REG_READ = 0x7 # Код команды чтения
REG_ID = 0x30 # Адрес регистра с REG_PMOTOR_POSPID_Kp устройства
def send_can_message(bus, can_id, data):
"""Отправка CAN-сообщения"""
try:
msg = can.Message(
arbitration_id=can_id,
data=data,
is_extended_id=False
)
bus.send(msg)
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}")
return True
except can.CanError as e:
print(f"Ошибка CAN: {e}")
return False
def receive_response(bus, timeout=1.0):
"""Ожидание ответа от устройства"""
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
return msg
print("[Ошибка] Таймаут")
return None
def validate_crc16(data):
"""Расчет CRC16 (MODBUS) для проверки целостности данных"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
# Инициализация CAN-интерфейса
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
# ======= 1. Запрос текущего ID устройства =======
# Формируем CAN ID для чтения: (OLD_DEVICE_ID << 4) | REG_READ
can_id_read = (OLD_DEVICE_ID << 4) | REG_READ
# Данные для запроса: [регистр, резервный байт]
data_read = [REG_ID, 0x00]
# Формируем полные данные для расчета CRC:
# - CAN ID разбивается на 2 байта (little-endian)
# - Добавляем данные запроса
full_data_for_crc = list(can_id_read.to_bytes(2, 'little')) + data_read
# Рассчитываем CRC и разбиваем на байты (little-endian)
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, 'little'))
# Собираем итоговый пакет: данные + CRC
packet_read = data_read + crc_bytes
print("Запрос на чтение ID:", packet_read)
send_can_message(bus, can_id_read, packet_read)
# ======= 2. Получение и проверка ответа =======
response = receive_response(bus)
if response:
data = response.data
if len(data) < 4:
print("Слишком короткий ответ")
# Проверяем минимальную длину ответа (данные + CRC)
else:
id_bytes = response.arbitration_id.to_bytes(1,byteorder='little')
#buff with id and data without CRC
full_data = list(id_bytes) + list(data[:-2])
print(f"Received full_data: {list(full_data)}")
received_crc = int.from_bytes(data[-2:], byteorder='little')
#calc CRC
calc_crc = validate_crc16(full_data)
print(f"Расчитанный CRC PYTHON : 0x{calc_crc:02X}")
if received_crc == calc_crc:
# Если CRC совпадает, проверяем структуру ответа:
kp_value = struct.unpack('<f', bytes(data[1:5]))[0]
print(f"Текущий Kp устройства: {kp_value:.3f}")
else:
print("Ошибка: CRC не совпадает")
else:
print("Устройство не ответило")
# Завершаем работу с шиной
bus.shutdown()

View file

@ -1,47 +0,0 @@
import can
import struct
import time
def process_can_message(msg):
if msg.dlc == 5: # Check the message length
print(f"Received message with ID: {msg.arbitration_id}")
print(f"Data: {msg.data}")
# The first byte determines the data type (flag)
flag = chr(msg.data[0])
if flag == 'A': # Angle
angle_bytes = msg.data[1:5]
angle = struct.unpack('<f', bytes(angle_bytes))[0]
print(f"Angle: {angle} degrees")
elif flag == 'V': # Velocity
velocity_bytes = msg.data[1:5]
velocity = struct.unpack('<f', bytes(velocity_bytes))[0]
print(f"Velocity: {velocity} rad/s")
elif flag == 'E' and msg.dlc >= 2: # Enable/Disable
enabled = msg.data[1] # Expecting 1 byte (0 or 1)
print(f"Enabled: {bool(enabled)}")
else:
print(f"Unknown flag: {flag}")
else:
print(f"Received message with unexpected length: {msg.dlc}")
def receive_can_messages():
try:
# Connect to the CAN bus
bus = can.interface.Bus(channel='can0', bustype='socketcan')
print("Waiting for messages on the CAN bus...")
while True:
msg = bus.recv()
if msg:
process_can_message(msg)
except KeyboardInterrupt:
print("\nExiting program...")
except Exception as e:
print(f"Error: {e}")
if __name__ == '__main__':
receive_can_messages()

View file

@ -1,37 +0,0 @@
import can
import struct
import time
import argparse
# Function to send the target angle
def send_target_angle(bus, target_angle):
msg = can.Message()
msg.arbitration_id = 1 # Message ID
msg.is_extended_id = False
msg.dlc = 5 # Message length
msg.data = [ord('A')] + list(struct.pack('<f', target_angle)) # 'A' for the command identifier, followed by the angle in float format
try:
bus.send(msg)
print(f"Sent message with target angle: {target_angle} degrees")
print(f"Message data: {msg.data}")
except can.CanError:
print("Message failed to send")
# Main function
def main():
parser = argparse.ArgumentParser(description="Send target angles over CAN bus.")
parser.add_argument("--angle", type=float, required=True, help="Target angle to send over the CAN bus")
args = parser.parse_args()
target_angle = args.angle
# CAN interface setup
bus = can.interface.Bus(channel='can0', bustype='socketcan', bitrate=1000000) # Ensure the bitrate matches the microcontroller settings
print("CAN bus initialized, sending target angles...")
# Loop to send messages
send_target_angle(bus, target_angle)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,126 @@
import can
import time
import struct
import sys
# Конфигурация
CAN_INTERFACE = 'can0'
DEVICE_ID = int(sys.argv[1]) # ID ADDR for servo
REG_READ = 0x7 # Код команды чтения
REG_MOTOR_POSPID_Kp = 0x30
REG_MOTOR_POSPID_Ki = 0x31
REG_MOTOR_POSPID_Kd = 0x32
def send_can_message(bus, can_id, data):
"""Отправка CAN-сообщения"""
try:
msg = can.Message(
arbitration_id=can_id,
data=data,
is_extended_id=False
)
bus.send(msg)
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}")
return True
except can.CanError as e:
print(f"Ошибка CAN: {e}")
return False
def validate_crc16(data):
"""Расчет CRC16 (MODBUS)"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def send_read_request(bus, device_id, register):
"""Отправка запроса на чтение регистра"""
can_id = (device_id << 4) | REG_READ
data_part = [register, 0x00]
# Расчет CRC для CAN ID (2 байта) + данные
full_data_for_crc = list(can_id.to_bytes(2, 'little')) + data_part
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, 'little'))
# Формирование итогового пакета
packet = data_part + crc_bytes
send_can_message(bus, can_id, packet)
def receive_pid_response(bus, timeout=1.0):
"""Получение и проверка ответа с PID-значением"""
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg and msg.arbitration_id == DEVICE_ID:
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
if len(msg.data) < 8:
print("Ошибка: Слишком короткий ответ")
return None
# Извлечение данных и CRC
data = msg.data
received_crc = int.from_bytes(data[-2:], byteorder='little')
# Подготовка данных для проверки CRC
id_bytes = msg.arbitration_id.to_bytes(1, 'little')
full_data = list(id_bytes) + list(data[:-2])
# Проверка CRC
calc_crc = validate_crc16(full_data)
if calc_crc != received_crc:
print(f"Ошибка CRC: ожидалось 0x{calc_crc:04X}, получено 0x{received_crc:04X}")
return None
# Извлечение float значения
try:
value = struct.unpack('<f', bytes(data[1:5]))[0]
return value
except struct.error:
print("Ошибка распаковки float")
return None
print("Таймаут ожидания ответа")
return None
def main():
"""Основная логика чтения PID-коэффициентов"""
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
try:
# Чтение коэффициентов с задержкой
print("\nЧтение Kp...")
send_read_request(bus, DEVICE_ID, REG_MOTOR_POSPID_Kp)
kp = receive_pid_response(bus)
if kp is not None:
print(f"Текущий Kp: {kp:.3f}")
time.sleep(1)
print("\nЧтение Ki...")
send_read_request(bus, DEVICE_ID, REG_MOTOR_POSPID_Ki)
ki = receive_pid_response(bus)
if ki is not None:
print(f"Текущий Ki: {ki:.3f}")
time.sleep(1)
print("\nЧтение Kd...")
send_read_request(bus, DEVICE_ID, REG_MOTOR_POSPID_Kd)
kd = receive_pid_response(bus)
if kd is not None:
print(f"Текущий Kd: {kd:.3f}")
finally:
bus.shutdown()
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Используйте python3 read_pid.py addr")
sys.exit(1)
main()

View file

@ -0,0 +1,98 @@
import can
import struct
import time
import argparse
# Константы
CAN_INTERFACE = 'can0'
DEVICE_ID = 0x27 # ID ADDR for servo
REG_WRITE = 0x7
REG_POS = 0x72 # MOTOR+ANGLE = 0x72
def validate_crc16(data):
# Calculate CRC16
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def receive_response(bus, timeout=1.0):
"""Ожидание ответа от устройства"""
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
return msg
print("[Ошибка] Таймаут")
return None
def send_target_angle(bus):
# ID and cmd
arbitration_id = (DEVICE_ID << 4) | REG_WRITE
id_bytes = list(arbitration_id.to_bytes(2, byteorder='little'))
# cmd + parametrs
data_write = [REG_POS]
full_data_for_crc = id_bytes + data_write
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
# Full packet
packet = data_write + crc_bytes
msg = can.Message(
arbitration_id=arbitration_id,
is_extended_id=False,
data=packet
)
bus.send(msg)
response = receive_response(bus)
if response:
data = response.data
if len(data) < 4:
print("Слишком короткий ответ")
# Проверяем минимальную длину ответа (данные + CRC)
else:
id_bytes = response.arbitration_id.to_bytes(1,byteorder='little')
#buff with id and data without CRC
full_data = list(id_bytes) + list(data[:-2])
print(f"Received full_data: {list(full_data)}")
received_crc = int.from_bytes(data[-2:], byteorder='little')
#calc CRC
calc_crc = validate_crc16(full_data)
print(f"Расчитанный CRC PYTHON : 0x{calc_crc:02X}")
if received_crc == calc_crc:
# Если CRC совпадает, проверяем структуру ответа:
velocity = struct.unpack('<f', bytes(data[1:5]))[0]
print(f"Угол: {velocity}")
else:
print("Ошибка: CRC не совпадает")
else:
print("Устройство не ответило")
def main():
# Инициализация CAN
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
print("CAN шина инициализирована.")
send_target_angle(bus)
bus.shutdown()
if __name__ == '__main__':
main()

View file

@ -0,0 +1,108 @@
import can
import time
import sys
# Конфигурация
CAN_INTERFACE = 'can0'
OLD_DEVICE_ID = int(sys.argv[1]) # Текущий ID устройства (по умолчанию)
REG_READ = 0x7 # Код команды чтения
REG_ID = 0x01 # Адрес регистра с ID устройства
def send_can_message(bus, can_id, data):
"""Отправка CAN-сообщения"""
try:
msg = can.Message(
arbitration_id=can_id,
data=data,
is_extended_id=False
)
bus.send(msg)
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}")
return True
except can.CanError as e:
print(f"Ошибка CAN: {e}")
return False
def receive_response(bus, timeout=1.0):
"""Ожидание ответа от устройства"""
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
return msg
print("[Ошибка] Таймаут")
return None
def validate_crc16(data):
"""Расчет CRC16 (MODBUS) для проверки целостности данных"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
# Инициализация CAN-интерфейса
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
# ======= 1. Запрос текущего ID устройства =======
# Формируем CAN ID для чтения: (OLD_DEVICE_ID << 4) | REG_READ
can_id_read = (OLD_DEVICE_ID << 4) | REG_READ
# Данные для запроса: [регистр, резервный байт]
data_read = [REG_ID, 0x00]
# Формируем полные данные для расчета CRC:
# - CAN ID разбивается на 2 байта (little-endian)
# - Добавляем данные запроса
full_data_for_crc = list(can_id_read.to_bytes(2, 'little')) + data_read
# Рассчитываем CRC и разбиваем на байты (little-endian)
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, 'little'))
# Собираем итоговый пакет: данные + CRC
packet_read = data_read + crc_bytes
print("Запрос на чтение ID:", packet_read)
send_can_message(bus, can_id_read, packet_read)
# ======= 2. Получение и проверка ответа =======
response = receive_response(bus)
if response:
data = response.data
if len(data) < 4:
print("Слишком короткий ответ")
# Проверяем минимальную длину ответа (данные + CRC)
else:
id_bytes = response.arbitration_id.to_bytes(1,byteorder='little')
#buff with id and data without CRC
full_data = list(id_bytes) + list(data[:-2])
print(f"Received full_data: {list(full_data)}")
received_crc = int.from_bytes(data[-2:], byteorder='little')
#calc CRC
calc_crc = validate_crc16(full_data)
print(f"Расчитанный CRC PYTHON : 0x{calc_crc:02X}")
if received_crc == calc_crc:
# Если CRC совпадает, проверяем структуру ответа:
print(f"Текущий ID устройства: 0x{data[1]:02X}")
else:
print("Ошибка: CRC не совпадает")
else:
print("Устройство не ответило")
# Завершаем работу с шиной
bus.shutdown()
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Использование: python3 can_flasher.py address")
sys.exit(1)

View file

@ -0,0 +1,67 @@
from can.interface import Bus
import can
import struct
import time
import argparse
# Константы
CAN_INTERFACE = 'can0'
DEVICE_ID = 0x27 # ID ADDR for servo
REG_WRITE = 0x8
REG_POS = 0x72 # MOTOR+ANGLE = 0x72
def validate_crc16(data):
# Calculate CRC16
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def send_target_angle(bus, target_angle):
# ID and cmd
arbitration_id = (DEVICE_ID << 4) | REG_WRITE
id_bytes = list(arbitration_id.to_bytes(2, byteorder='little'))
# cmd + parametrs
data_write = [REG_POS] + list(struct.pack('<f', target_angle))
full_data_for_crc = id_bytes + data_write
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
# Full packet
packet = data_write + crc_bytes
msg = can.Message(
arbitration_id=arbitration_id,
is_extended_id=False,
data=packet
)
try:
bus.send(msg)
print(f"[Отправка] CAN ID: 0x{arbitration_id:03X}, Угол: {target_angle} rad, Данные: {list(msg.data)}")
except can.CanError:
print("Ошибка отправки сообщения")
def main():
parser = argparse.ArgumentParser(description="Отправка угла позиции по CAN.")
parser.add_argument("--angle", type=float, required=True, help="Угол (в градусах)")
args = parser.parse_args()
# Инициализация CAN
bus = Bus(channel=CAN_INTERFACE, bustype='socketcan')
print("CAN шина инициализирована.")
send_target_angle(bus, args.angle)
bus.shutdown()
if __name__ == '__main__':
main()

View file

@ -0,0 +1,124 @@
import can
import time
import sys
# Конфигурация
CAN_INTERFACE = 'can0'
OLD_DEVICE_ID = int(sys.argv[1])
NEW_DEVICE_ID = int(sys.argv[2])
REG_WRITE = 0x8
REG_READ = 0x7
REG_ID = 0x1
def send_can_message(bus, can_id, data):
"""Отправка CAN-сообщения"""
try:
msg = can.Message(
arbitration_id=can_id,
data=data,
is_extended_id=False
)
bus.send(msg)
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}")
return True
except can.CanError as e:
print(f"Ошибка CAN: {e}")
return False
def receive_response(bus, timeout=1.0):
"""Ожидание ответа"""
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
return msg
print("[Ошибка] Таймаут")
return None
def validate_crc16(data):
"""Функция расчета CRC16 (MODBUS)"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
# Инициализация
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
# ======= 1. Отправляем команду изменить ID =======
# Весь буфер: id + команда + параметры
OLD_WITH_REG = (OLD_DEVICE_ID << 4) | REG_WRITE
id_bytes = list(OLD_WITH_REG.to_bytes(2, byteorder='little'))
# Важные части сообщения: address (id), команда, параметры
data_write = [REG_ID, NEW_DEVICE_ID] # команда изменить ID
# Полностью собираем массив для CRC (включая id и команду)
full_data_for_crc = id_bytes + data_write
# Расчет CRC по всему пакету
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
# Итоговый пакет: команда + параметры + CRC
packet_write = data_write + crc_bytes
print("Отправляем: команда изменить ID + CRC:", packet_write)
# Отправляем с `OLD_DEVICE_ID` в качестве адреса
send_can_message(bus, (OLD_DEVICE_ID << 4) | REG_WRITE, packet_write)
time.sleep(1.0)
# ======= 2. Запрашиваем текущий ID (используем новый адрес) =======
# Теперь для запроса используем **уже новый id**
NEW_WITH_REG = (NEW_DEVICE_ID << 4) | REG_READ
current_id_bytes = list(NEW_WITH_REG.to_bytes(2, byteorder='little'))
data_read = [REG_ID, 0x00]
full_data_for_crc = current_id_bytes + data_read
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
packet_read = data_read + crc_bytes
print("Запрос на чтение ID + CRC (после смены):", packet_read)
send_can_message(bus, (NEW_DEVICE_ID << 4) | REG_READ, packet_read)
# ======= 3. Получение и проверка ответа =======
response = receive_response(bus)
if response:
data = response.data
if len(data) < 4:
print("Ответ слишком короткий")
else:
id_bytes = response.arbitration_id.to_bytes(1,byteorder='little')
#buff with id and data without CRC
full_data = list(id_bytes) + list(data[:-2])
print(f"Received full_data: {list(full_data)}")
received_crc = int.from_bytes(data[-2:], byteorder='little')
#calc CRC
calc_crc = validate_crc16(full_data)
if received_crc == calc_crc:
if data[0] == ord('I') and data[1] == NEW_DEVICE_ID:
print(f"\nУСПЕХ! ID устройства изменен на 0x{NEW_DEVICE_ID:02X}")
else:
print(f"Некорректный ответ: {list(data)}")
else:
print("CRC не совпадает, данные повреждены.")
else:
print("Нет ответа от устройства.")
bus.shutdown()
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Использование: python3 can_flasher.py old_addr new addr")
sys.exit(1)

View file

@ -0,0 +1,78 @@
import subprocess
import os
import sys
def flash_hex_with_stlink(hex_file_path):
if not os.path.isfile(hex_file_path):
print(f"❌ Файл не найден: {hex_file_path}")
return False
command = [
"st-flash",
"--format", "ihex",
"write",
hex_file_path
]
try:
print(f"⚡️ Прошиваем {hex_file_path} через ST-Link...")
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=30
)
print("▬▬▬ STDOUT ▬▬▬")
print(result.stdout)
print("▬▬▬ STDERR ▬▬▬")
print(result.stderr)
if result.returncode == 0:
print("✅ Прошивка успешно завершена!")
# Добавленный блок сброса
try:
print("🔄 Выполняем сброс устройства...")
reset_result = subprocess.run(
["st-info", "--reset"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10
)
if reset_result.returncode == 0:
print("♻️ Устройство успешно сброшено!")
else:
print(f"⚠️ Ошибка (код: {reset_result.returncode})")
print("▬▬▬ STDERR сброса ▬▬▬")
print(reset_result.stderr)
except Exception as e:
print(f"⚠️ Ошибка при сбросе: {str(e)}")
return True
else:
print(f"❌ Ошибка прошивки (код: {result.returncode})")
return False
except FileNotFoundError:
print("❌ st-flash не найден! Установите stlink-tools.")
return False
except subprocess.TimeoutExpired:
print("❌ Таймаут операции! Проверьте подключение ST-Link.")
return False
except Exception as e:
print(f"❌ Неизвестная ошибка: {str(e)}")
return False
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Использование: python stlink_flash.py <firmware.hex>")
sys.exit(1)
if flash_hex_with_stlink(sys.argv[1]):
sys.exit(0)
else:
sys.exit(1)

View file

@ -0,0 +1,100 @@
import subprocess
import os
import sys
import time
def flash_hex_with_stlink(hex_file_path, component_name):
if not os.path.isfile(hex_file_path):
print(f"❌ Файл {component_name} не найден: {hex_file_path}")
return False
command = [
"st-flash",
"--format", "ihex",
"write",
hex_file_path
]
try:
print(f"⚡️ Прошиваем {component_name} ({hex_file_path}) через ST-Link...")
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=30
)
print("▬▬▬ STDOUT ▬▬▬")
print(result.stdout)
print("▬▬▬ STDERR ▬▬▬")
print(result.stderr)
if result.returncode == 0:
print(f"{component_name} успешно прошит!")
return True
else:
print(f"❌ Ошибка прошивки {component_name} (код: {result.returncode})")
return False
except FileNotFoundError:
print("❌ st-flash не найден! Установите stlink-tools.")
return False
except subprocess.TimeoutExpired:
print(f"❌ Таймаут операции при прошивке {component_name}! Проверьте подключение ST-Link.")
return False
except Exception as e:
print(f"❌ Неизвестная ошибка при прошивке {component_name}: {str(e)}")
return False
def reset_device():
try:
print("🔄 Выполняем сброс(перезагрузку) устройства...")
reset_result = subprocess.run(
["st-info", "--reset"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10
)
if reset_result.returncode == 0:
print("♻️ Устройство успешно сброшено!")
return True
else:
print(f"⚠️ Ошибка при сбросе (код: {reset_result.returncode})")
print("▬▬▬ STDERR сброса ▬▬▬")
print(reset_result.stderr)
return False
except Exception as e:
print(f"⚠️ Ошибка при сбросе: {str(e)}")
return False
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Использование: python stlink_flash.py <bootloader.hex> <application.hex>")
print("Пример: python stlink_flash.py bootloader.hex firmware.hex")
sys.exit(1)
bootloader_path = sys.argv[1]
app_path = sys.argv[2]
# Прошиваем сначала бутлоадер
if not flash_hex_with_stlink(bootloader_path, "Bootloader"):
print("\n💥 Ошибка прошивки бутлоадера!")
sys.exit(1)
# Сбрасываем устройство после прошивки бутлоадера
reset_device()
time.sleep(1) # Короткая пауза
# Прошиваем основное приложение
if not flash_hex_with_stlink(app_path, "Application"):
print("\n💥 Ошибка прошивки основного приложения!")
sys.exit(1)
# Финальный сброс устройства
reset_device()
print("\n🎉 Все компоненты успешно прошиты!")
sys.exit(0)

View file

@ -0,0 +1,95 @@
import can
import time
import struct
import sys
# Конфигурация
CAN_INTERFACE = 'can0'
DEVICE_ID = int(sys.argv[1]) # ID ADDR for servo
REG_WRITE = 0x8 # Код команды записи
REG_MOTOR_POSPID_Kp = 0x30
REG_MOTOR_POSPID_Ki = 0x31
REG_MOTOR_POSPID_Kd = 0x32
def send_can_message(bus, can_id, data):
"""Отправка CAN-сообщения"""
try:
msg = can.Message(
arbitration_id=can_id,
data=data,
is_extended_id=False
)
bus.send(msg)
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}")
return True
except can.CanError as e:
print(f"Ошибка CAN: {e}")
return False
def validate_crc16(data):
"""Расчет CRC16 (MODBUS) для проверки целостности данных"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def send_pid_value(bus, device_id, reg, value):
"""Отправка коэффициента PID на устройство"""
# Формируем CAN ID для записи: (device_id << 4) | REG_WRITE
can_id_write = (device_id << 4) | REG_WRITE
# Упаковываем значение в байты (little-endian)
float_bytes = struct.pack('<f', value)
# Формируем часть данных (регистр + значение)
data_part = [reg] + list(float_bytes)
# Полные данные для расчета CRC: CAN ID + данные
full_data_for_crc = list(can_id_write.to_bytes(2, 'little')) + data_part
# Рассчитываем CRC и разбиваем на байты (little-endian)
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, 'little'))
# Собираем итоговый пакет данных
can_data = data_part + crc_bytes
# Отправляем сообщение
send_can_message(bus, can_id_write, can_data)
def main():
# Запрос коэффициентов у пользователя
try:
p = float(input("Введите коэффициент P: "))
i = float(input("Введите коэффициент I: "))
d = float(input("Введите коэффициент D: "))
except ValueError:
print("Ошибка: Введите числовые значения.")
return
# Инициализация CAN-интерфейса
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
try:
# Отправка коэффициентов с задержкой
send_pid_value(bus, DEVICE_ID, REG_MOTOR_POSPID_Kp, p)
time.sleep(1)
send_pid_value(bus, DEVICE_ID, REG_MOTOR_POSPID_Ki, i)
time.sleep(1)
send_pid_value(bus, DEVICE_ID, REG_MOTOR_POSPID_Kd, d)
finally:
# Завершение работы с шиной
bus.shutdown()
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Используйте python3 pid_set.py addr")
sys.exit(1)
main()

View file

@ -0,0 +1,122 @@
import can
import time
import struct
# Конфигурация
CAN_INTERFACE = 'can0'
DEVICE_ID = 0x00
SET_PID_P = 3.6
REG_WRITE = 0x8
REG_READ = 0x7
REG_ID = 0x30 #REG_MOTOR_POSPID_Kp
PID_P = 0x01
def send_can_message(bus, can_id, data):
"""Отправка CAN-сообщения"""
try:
msg = can.Message(
arbitration_id=can_id,
data=data,
is_extended_id=False
)
bus.send(msg)
print(f"[Отправка] CAN ID: 0x{can_id:03X}, Данные: {list(data)}")
return True
except can.CanError as e:
print(f"Ошибка CAN: {e}")
return False
def receive_response(bus, timeout=1.0):
print("Ожидание ответа")
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
print(f"[Прием] CAN ID: 0x{msg.arbitration_id:03X}, Данные: {list(msg.data)}")
return msg
print("[Ошибка] Таймаут")
return None
def validate_crc16(data):
"""Функция расчета CRC16 (MODBUS)"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
# Инициализация
bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
# Перевод float -> hex -> int
result = (struct.unpack('<I',struct.pack('<f', float(SET_PID_P)))[0])
result_bytes = result.to_bytes(4, byteorder='little')
# ======= 1. Отправляем команду изменить ID =======
# Весь буфер: id + команда + параметры
OLD_WITH_REG = (DEVICE_ID << 4) | REG_WRITE
id_bytes = list(OLD_WITH_REG.to_bytes(2, byteorder='little'))
# Важные части сообщения: address (id), команда, параметры
data_write = [REG_ID] + list(result_bytes) # команда изменить PID_P
# Полностью собираем массив для CRC (включая id и команду)
full_data_for_crc = id_bytes + data_write
# Расчет CRC по всему пакету
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
# Итоговый пакет: команда + параметры + CRC
packet_write = data_write + crc_bytes
print("Отправляем: команда изменить PID_p + CRC:", packet_write)
# Отправляем с `OLD_DEVICE_ID` в качестве адреса
send_can_message(bus, (DEVICE_ID << 4) | REG_WRITE, packet_write)
time.sleep(1.0)
# ======= 2. Запрашиваем текущий ID (используем новый адрес) =======
# Теперь для запроса используем **уже новый id**
NEW_WITH_REG = (DEVICE_ID << 4) | REG_READ
current_id_bytes = list(NEW_WITH_REG.to_bytes(2, byteorder='little'))
data_read = [REG_ID, 0x00]
full_data_for_crc = current_id_bytes + data_read
crc = validate_crc16(full_data_for_crc)
crc_bytes = list(crc.to_bytes(2, byteorder='little'))
packet_read = data_read + crc_bytes
print("Запрос на чтение ID + CRC (после смены):", packet_read)
send_can_message(bus, (DEVICE_ID << 4) | REG_READ, packet_read)
# ======= 3. Получение и проверка ответа =======
response = receive_response(bus)
if response:
data = response.data
if len(data) < 4:
print("Ответ слишком короткий")
else:
id_bytes = response.arbitration_id.to_bytes(1,byteorder='little')
#buff with id and data without CRC
full_data = list(id_bytes) + list(data[:-2])
print(f"Received full_data: {list(full_data)}")
received_crc = int.from_bytes(data[-2:], byteorder='little')
#calc CRC
calc_crc = validate_crc16(full_data)
if received_crc == calc_crc:
if data[0] == int(REG_ID):
kp_val = struct.unpack('<f', bytes(data[1:5]))[0]
print(f"\nУСПЕХ! PID_P = {kp_val:.3f}")
else:
print(f"Некорректный ответ: {list(data)}")
else:
print("CRC не совпадает, данные повреждены.")
else:
print("Нет ответа от устройства.")
bus.shutdown()