diff --git a/ros2/src/franka_gello_state_publisher/config/gello_config.yaml b/ros2/src/franka_gello_state_publisher/config/gello_config.yaml index 71e413b..1203ee1 100644 --- a/ros2/src/franka_gello_state_publisher/config/gello_config.yaml +++ b/ros2/src/franka_gello_state_publisher/config/gello_config.yaml @@ -1,4 +1,4 @@ -usb-FTDI_USB__-__Serial_Converter_FT94EVW4-if00-port0: +usb-FTDI_USB__-__Serial_Converter_FTA7NPA1-if00-port0: side: "sample-config" num_joints: 7 joint_signs: [1, 1, 1, 1, 1, -1, 1] diff --git a/ros2/src/franka_gello_state_publisher/franka_gello_state_publisher/driver.py b/ros2/src/franka_gello_state_publisher/franka_gello_state_publisher/driver.py index e69de29..89325c7 100644 --- a/ros2/src/franka_gello_state_publisher/franka_gello_state_publisher/driver.py +++ b/ros2/src/franka_gello_state_publisher/franka_gello_state_publisher/driver.py @@ -0,0 +1,274 @@ +import time +from threading import Event, Lock, Thread +from typing import Protocol, Sequence + +import numpy as np +from dynamixel_sdk.group_sync_read import GroupSyncRead +from dynamixel_sdk.group_sync_write import GroupSyncWrite +from dynamixel_sdk.packet_handler import PacketHandler +from dynamixel_sdk.port_handler import PortHandler +from dynamixel_sdk.robotis_def import ( + COMM_SUCCESS, + DXL_HIBYTE, + DXL_HIWORD, + DXL_LOBYTE, + DXL_LOWORD, +) + +# Constants +ADDR_TORQUE_ENABLE = 64 +ADDR_GOAL_POSITION = 116 +LEN_GOAL_POSITION = 4 +ADDR_PRESENT_POSITION = 132 +LEN_PRESENT_POSITION = 4 +TORQUE_ENABLE = 1 +TORQUE_DISABLE = 0 + + +class DynamixelDriverProtocol(Protocol): + def set_joints(self, joint_angles: Sequence[float]): + """Set the joint angles for the Dynamixel servos. + + Args: + joint_angles (Sequence[float]): A list of joint angles. + """ + ... + + def torque_enabled(self) -> bool: + """Check if torque is enabled for the Dynamixel servos. + + Returns: + bool: True if torque is enabled, False if it is disabled. + """ + ... + + def set_torque_mode(self, enable: bool): + """Set the torque mode for the Dynamixel servos. + + Args: + enable (bool): True to enable torque, False to disable. + """ + ... + + def get_joints(self) -> np.ndarray: + """Get the current joint angles in radians. + + Returns: + np.ndarray: An array of joint angles. + """ + ... + + def close(self): + """Close the driver.""" + + +class FakeDynamixelDriver(DynamixelDriverProtocol): + def __init__(self, ids: Sequence[int]): + self._ids = ids + self._joint_angles = np.zeros(len(ids), dtype=int) + self._torque_enabled = False + + def set_joints(self, joint_angles: Sequence[float]): + if len(joint_angles) != len(self._ids): + raise ValueError( + "The length of joint_angles must match the number of servos" + ) + if not self._torque_enabled: + raise RuntimeError("Torque must be enabled to set joint angles") + self._joint_angles = np.array(joint_angles) + + def torque_enabled(self) -> bool: + return self._torque_enabled + + def set_torque_mode(self, enable: bool): + self._torque_enabled = enable + + def get_joints(self) -> np.ndarray: + return self._joint_angles.copy() + + def close(self): + pass + + +class DynamixelDriver(DynamixelDriverProtocol): + def __init__( + self, ids: Sequence[int], port: str = "/dev/ttyUSB0", baudrate: int = 57600 + ): + """Initialize the DynamixelDriver class. + + Args: + ids (Sequence[int]): A list of IDs for the Dynamixel servos. + port (str): The USB port to connect to the arm. + baudrate (int): The baudrate for communication. + """ + self._ids = ids + self._joint_angles = None + self._lock = Lock() + + # Initialize the port handler, packet handler, and group sync read/write + self._portHandler = PortHandler(port) + self._packetHandler = PacketHandler(2.0) + self._groupSyncRead = GroupSyncRead( + self._portHandler, + self._packetHandler, + ADDR_PRESENT_POSITION, + LEN_PRESENT_POSITION, + ) + self._groupSyncWrite = GroupSyncWrite( + self._portHandler, + self._packetHandler, + ADDR_GOAL_POSITION, + LEN_GOAL_POSITION, + ) + + # Open the port and set the baudrate + if not self._portHandler.openPort(): + raise RuntimeError("Failed to open the port") + + if not self._portHandler.setBaudRate(baudrate): + raise RuntimeError(f"Failed to change the baudrate, {baudrate}") + + # Add parameters for each Dynamixel servo to the group sync read + for dxl_id in self._ids: + if not self._groupSyncRead.addParam(dxl_id): + raise RuntimeError( + f"Failed to add parameter for Dynamixel with ID {dxl_id}" + ) + + # Disable torque for each Dynamixel servo + self._torque_enabled = False + try: + self.set_torque_mode(self._torque_enabled) + except Exception as e: + print(f"port: {port}, {e}") + + self._stop_thread = Event() + self._start_reading_thread() + + def set_joints(self, joint_angles: Sequence[float]): + if len(joint_angles) != len(self._ids): + raise ValueError( + "The length of joint_angles must match the number of servos" + ) + if not self._torque_enabled: + raise RuntimeError("Torque must be enabled to set joint angles") + + for dxl_id, angle in zip(self._ids, joint_angles): + # Convert the angle to the appropriate value for the servo + position_value = int(angle * 2048 / np.pi) + + # Allocate goal position value into byte array + param_goal_position = [ + DXL_LOBYTE(DXL_LOWORD(position_value)), + DXL_HIBYTE(DXL_LOWORD(position_value)), + DXL_LOBYTE(DXL_HIWORD(position_value)), + DXL_HIBYTE(DXL_HIWORD(position_value)), + ] + + # Add goal position value to the Syncwrite parameter storage + dxl_addparam_result = self._groupSyncWrite.addParam( + dxl_id, param_goal_position + ) + if not dxl_addparam_result: + raise RuntimeError( + f"Failed to set joint angle for Dynamixel with ID {dxl_id}" + ) + + # Syncwrite goal position + dxl_comm_result = self._groupSyncWrite.txPacket() + if dxl_comm_result != COMM_SUCCESS: + raise RuntimeError("Failed to syncwrite goal position") + + # Clear syncwrite parameter storage + self._groupSyncWrite.clearParam() + + def torque_enabled(self) -> bool: + return self._torque_enabled + + def set_torque_mode(self, enable: bool): + torque_value = TORQUE_ENABLE if enable else TORQUE_DISABLE + with self._lock: + for dxl_id in self._ids: + dxl_comm_result, dxl_error = self._packetHandler.write1ByteTxRx( + self._portHandler, dxl_id, ADDR_TORQUE_ENABLE, torque_value + ) + if dxl_comm_result != COMM_SUCCESS or dxl_error != 0: + print(dxl_comm_result) + print(dxl_error) + raise RuntimeError( + f"Failed to set torque mode for Dynamixel with ID {dxl_id}" + ) + + self._torque_enabled = enable + + def _start_reading_thread(self): + self._reading_thread = Thread(target=self._read_joint_angles) + self._reading_thread.daemon = True + self._reading_thread.start() + + def _read_joint_angles(self): + # Continuously read joint angles and update the joint_angles array + while not self._stop_thread.is_set(): + time.sleep(0.001) + with self._lock: + _joint_angles = np.zeros(len(self._ids), dtype=int) + dxl_comm_result = self._groupSyncRead.txRxPacket() + if dxl_comm_result != COMM_SUCCESS: + print(f"warning, comm failed: {dxl_comm_result}") + continue + for i, dxl_id in enumerate(self._ids): + if self._groupSyncRead.isAvailable( + dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION + ): + angle = self._groupSyncRead.getData( + dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION + ) + angle = np.int32(np.uint32(angle)) + _joint_angles[i] = angle + else: + raise RuntimeError( + f"Failed to get joint angles for Dynamixel with ID {dxl_id}" + ) + self._joint_angles = _joint_angles + # self._groupSyncRead.clearParam() # TODO what does this do? should i add it + + def get_joints(self) -> np.ndarray: + # Return a copy of the joint_angles array to avoid race conditions + while self._joint_angles is None: + time.sleep(0.1) + # with self._lock: + _j = self._joint_angles.copy() + return _j / 2048.0 * np.pi + + def close(self): + self._stop_thread.set() + self._reading_thread.join() + self._portHandler.closePort() + + +def main(): + # Set the port, baudrate, and servo IDs + ids = [1] + + # Create a DynamixelDriver instance + try: + driver = DynamixelDriver(ids) + except FileNotFoundError: + driver = DynamixelDriver(ids, port="/dev/cu.usbserial-FT7WBMUB") + + # Test setting torque mode + driver.set_torque_mode(True) + driver.set_torque_mode(False) + + # Test reading the joint angles + try: + while True: + joint_angles = driver.get_joints() + print(f"Joint angles for IDs {ids}: {joint_angles}") + # print(f"Joint angles for IDs {ids[1]}: {joint_angles[1]}") + except KeyboardInterrupt: + driver.close() + + +if __name__ == "__main__": + main() # Test the driver diff --git a/ros2/src/franka_gello_state_publisher/franka_gello_state_publisher/gello_publisher.py b/ros2/src/franka_gello_state_publisher/franka_gello_state_publisher/gello_publisher.py index 6c7d9f3..1a5b2ce 100644 --- a/ros2/src/franka_gello_state_publisher/franka_gello_state_publisher/gello_publisher.py +++ b/ros2/src/franka_gello_state_publisher/franka_gello_state_publisher/gello_publisher.py @@ -72,7 +72,7 @@ class GelloPublisher(Node): """Whether or not the gripper is attached.""" joint_ids = list(range(1, self.num_joints + 1)) - self.driver = DynamixelDriver(joint_ids, port=self.com_port, baudrate=57600) + self.driver = DynamixelDriver(joint_ids, port=self.com_port, baudrate=2000000) """The driver for the Dynamixel motors.""" self.best_offsets = np.array(config[self.port]["best_offsets"])