diff --git a/gello/agents/gello_agent.py b/gello/agents/gello_agent.py index f2b0c61..65bff3a 100644 --- a/gello/agents/gello_agent.py +++ b/gello/agents/gello_agent.py @@ -147,3 +147,9 @@ class GelloAgent(Agent): dyna_joints = self._robot.get_joint_state() gripper = dyna_joints[-1] return dyna_joints[:-1], gripper + + +class FakeGelloAgent(Agent): + + def __init__(self): + self.gripper_state = 0.0 diff --git a/gello/dynamixel/driver.py b/gello/dynamixel/driver.py index 3e0dfd5..63e95c2 100644 --- a/gello/dynamixel/driver.py +++ b/gello/dynamixel/driver.py @@ -26,6 +26,7 @@ TORQUE_DISABLE = 0 class DynamixelDriverProtocol(Protocol): + def set_joints(self, joint_angles: Sequence[float]): """Set the joint angles for the Dynamixel servos. @@ -63,6 +64,7 @@ class DynamixelDriverProtocol(Protocol): class FakeDynamixelDriver(DynamixelDriverProtocol): + def __init__(self, ids: Sequence[int]): self._ids = ids self._joint_angles = np.zeros(len(ids), dtype=int) @@ -71,8 +73,7 @@ class FakeDynamixelDriver(DynamixelDriverProtocol): def set_joints(self, joint_angles: Sequence[float]): if len(joint_angles) != len(self._ids): raise ValueError( - "The length of joint_angles must match the number of servos" - ) + "The length of joint_angles must match the number of servos") if not self._torque_enabled: raise RuntimeError("Torque must be enabled to set joint angles") self._joint_angles = np.array(joint_angles) @@ -91,9 +92,11 @@ class FakeDynamixelDriver(DynamixelDriverProtocol): class DynamixelDriver(DynamixelDriverProtocol): - def __init__( - self, ids: Sequence[int], port: str = "/dev/ttyUSB0", baudrate: int = 57600 - ): + + def __init__(self, + ids: Sequence[int], + port: str = "/dev/ttyUSB0", + baudrate: int = 57600): """Initialize the DynamixelDriver class. Args: @@ -132,8 +135,7 @@ class DynamixelDriver(DynamixelDriverProtocol): for dxl_id in self._ids: if not self._groupSyncRead.addParam(dxl_id): raise RuntimeError( - f"Failed to add parameter for Dynamixel with ID {dxl_id}" - ) + f"Failed to add parameter for Dynamixel with ID {dxl_id}") # Disable torque for each Dynamixel servo self._torque_enabled = False @@ -148,22 +150,21 @@ class DynamixelDriver(DynamixelDriverProtocol): def move_single_joint(self, dxl_id, joint_angle: float): if not self._torque_enabled: raise RuntimeError("Torque must be enabled to set joint angles") - + result, error = self._packetHandler.write4ByteTxRx( - self._portHandler, dxl_id, ADDR_GOAL_POSITION, int(joint_angle * 2048 / np.pi) - ) - + self._portHandler, dxl_id, ADDR_GOAL_POSITION, + int(joint_angle * 2048 / np.pi)) + if result != COMM_SUCCESS: - print(f"move_single_joint: {dxl_id}, {joint_angle}, {result}, {error}") + print( + f"move_single_joint: {dxl_id}, {joint_angle}, {result}, {error}" + ) # raise RuntimeError(f"Failed to write goal position for Dynamixel with ID {dxl_id}") - - def move_joints(self, joint_angles: Sequence[float]): if len(joint_angles) != len(self._ids): raise ValueError( - "The length of joint_angles must match the number of servos" - ) + "The length of joint_angles must match the number of servos") if not self._torque_enabled: raise RuntimeError("Torque must be enabled to set joint angles") # Write goal position @@ -171,12 +172,10 @@ class DynamixelDriver(DynamixelDriverProtocol): for dxl_id, angle in zip(self._ids, joint_angles): self.move_single_joint(dxl_id, angle) - def set_joints(self, joint_angles: Sequence[float]): if len(joint_angles) != len(self._ids): raise ValueError( - "The length of joint_angles must match the number of servos" - ) + "The length of joint_angles must match the number of servos") if not self._torque_enabled: raise RuntimeError("Torque must be enabled to set joint angles") @@ -194,12 +193,10 @@ class DynamixelDriver(DynamixelDriverProtocol): # Add goal position value to the Syncwrite parameter storage dxl_addparam_result = self._groupSyncWrite.addParam( - dxl_id, param_goal_position - ) + dxl_id, param_goal_position) if not dxl_addparam_result: raise RuntimeError( - f"Failed to set joint angle for Dynamixel with ID {dxl_id}" - ) + f"Failed to set joint angle for Dynamixel with ID {dxl_id}") # Syncwrite goal position dxl_comm_result = self._groupSyncWrite.txPacket() @@ -217,8 +214,7 @@ class DynamixelDriver(DynamixelDriverProtocol): with self._lock: for dxl_id in self._ids: dxl_comm_result, dxl_error = self._packetHandler.write1ByteTxRx( - self._portHandler, dxl_id, ADDR_TORQUE_ENABLE, torque_value - ) + self._portHandler, dxl_id, ADDR_TORQUE_ENABLE, torque_value) if dxl_comm_result != COMM_SUCCESS or dxl_error != 0: print(dxl_comm_result) print(dxl_error) @@ -241,15 +237,14 @@ class DynamixelDriver(DynamixelDriverProtocol): _joint_angles = np.zeros(len(self._ids), dtype=int) dxl_comm_result = self._groupSyncRead.txRxPacket() if dxl_comm_result != COMM_SUCCESS: - print(f"warning, comm failed: {dxl_comm_result}") + # print(f"warning, comm failed: {dxl_comm_result}") continue for i, dxl_id in enumerate(self._ids): - if self._groupSyncRead.isAvailable( - dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION - ): + if self._groupSyncRead.isAvailable(dxl_id, + ADDR_PRESENT_POSITION, + LEN_PRESENT_POSITION): angle = self._groupSyncRead.getData( - dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION - ) + dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION) angle = np.int32(np.uint32(angle)) _joint_angles[i] = angle else: @@ -275,7 +270,7 @@ class DynamixelDriver(DynamixelDriverProtocol): def main(): # Set the port, baudrate, and servo IDs - ids = [1,2,3,4,5,6,7] + ids = [1, 2, 3, 4, 5, 6, 7] # Create a DynamixelDriver instance try: @@ -289,7 +284,7 @@ def main(): # Test setting torque mode driver.set_torque_mode(True) # driver.set_torque_mode(False) - + # driver.move_single_joint(1, 4) # driver.move_single_joint(2, 0.2) # driver.move_single_joint(3, 2) @@ -298,7 +293,7 @@ def main(): # driver.move_single_joint(6, 2.5) # driver.move_single_joint(7, 0.25) - init_angles = np.array([4,0,2,2.5,1.5,3,3.5]) + init_angles = np.array([4, 0, 2, 2.5, 1.5, 3, 3.5]) driver.move_joints(init_angles) # Test reading the joint angles