133 lines
3.8 KiB
Python
133 lines
3.8 KiB
Python
from typing import Dict
|
|
|
|
import numpy as np
|
|
|
|
from gello.robots.robot import Robot
|
|
|
|
|
|
class URRobot(Robot):
|
|
"""A class representing a UR robot."""
|
|
|
|
def __init__(self, robot_ip: str = "192.168.1.10", no_gripper: bool = False):
|
|
import rtde_control
|
|
import rtde_receive
|
|
|
|
[print("in ur robot") for _ in range(4)]
|
|
try:
|
|
self.robot = rtde_control.RTDEControlInterface(robot_ip)
|
|
except Exception as e:
|
|
print(e)
|
|
print(robot_ip)
|
|
|
|
self.r_inter = rtde_receive.RTDEReceiveInterface(robot_ip)
|
|
if not no_gripper:
|
|
from gello.robots.robotiq_gripper import RobotiqGripper
|
|
|
|
self.gripper = RobotiqGripper()
|
|
self.gripper.connect(hostname=robot_ip, port=63352)
|
|
print("gripper connected")
|
|
# gripper.activate()
|
|
|
|
[print("connect") for _ in range(4)]
|
|
|
|
self._free_drive = False
|
|
self.robot.endFreedriveMode()
|
|
self._use_gripper = not no_gripper
|
|
|
|
def num_dofs(self) -> int:
|
|
"""Get the number of joints of the robot.
|
|
|
|
Returns:
|
|
int: The number of joints of the robot.
|
|
"""
|
|
if self._use_gripper:
|
|
return 7
|
|
return 6
|
|
|
|
def _get_gripper_pos(self) -> float:
|
|
import time
|
|
|
|
time.sleep(0.01)
|
|
gripper_pos = self.gripper.get_current_position()
|
|
assert 0 <= gripper_pos <= 255, "Gripper position must be between 0 and 255"
|
|
return gripper_pos / 255
|
|
|
|
def get_joint_state(self) -> np.ndarray:
|
|
"""Get the current state of the leader robot.
|
|
|
|
Returns:
|
|
T: The current state of the leader robot.
|
|
"""
|
|
robot_joints = self.r_inter.getActualQ()
|
|
if self._use_gripper:
|
|
gripper_pos = self._get_gripper_pos()
|
|
pos = np.append(robot_joints, gripper_pos)
|
|
else:
|
|
pos = robot_joints
|
|
return pos
|
|
|
|
def command_joint_state(self, joint_state: np.ndarray) -> None:
|
|
"""Command the leader robot to a given state.
|
|
|
|
Args:
|
|
joint_state (np.ndarray): The state to command the leader robot to.
|
|
"""
|
|
velocity = 0.5
|
|
acceleration = 0.5
|
|
dt = 1.0 / 500 # 2ms
|
|
lookahead_time = 0.2
|
|
gain = 100
|
|
|
|
robot_joints = joint_state[:6]
|
|
t_start = self.robot.initPeriod()
|
|
self.robot.servoJ(
|
|
robot_joints, velocity, acceleration, dt, lookahead_time, gain
|
|
)
|
|
if self._use_gripper:
|
|
gripper_pos = joint_state[-1] * 255
|
|
self.gripper.move(gripper_pos, 255, 10)
|
|
self.robot.waitPeriod(t_start)
|
|
|
|
def freedrive_enabled(self) -> bool:
|
|
"""Check if the robot is in freedrive mode.
|
|
|
|
Returns:
|
|
bool: True if the robot is in freedrive mode, False otherwise.
|
|
"""
|
|
return self._free_drive
|
|
|
|
def set_freedrive_mode(self, enable: bool) -> None:
|
|
"""Set the freedrive mode of the robot.
|
|
|
|
Args:
|
|
enable (bool): True to enable freedrive mode, False to disable it.
|
|
"""
|
|
if enable and not self._free_drive:
|
|
self._free_drive = True
|
|
self.robot.freedriveMode()
|
|
elif not enable and self._free_drive:
|
|
self._free_drive = False
|
|
self.robot.endFreedriveMode()
|
|
|
|
def get_observations(self) -> Dict[str, np.ndarray]:
|
|
joints = self.get_joint_state()
|
|
pos_quat = np.zeros(7)
|
|
gripper_pos = np.array([joints[-1]])
|
|
return {
|
|
"joint_positions": joints,
|
|
"joint_velocities": joints,
|
|
"ee_pos_quat": pos_quat,
|
|
"gripper_position": gripper_pos,
|
|
}
|
|
|
|
|
|
def main():
|
|
robot_ip = "192.168.1.11"
|
|
ur = URRobot(robot_ip, no_gripper=True)
|
|
print(ur)
|
|
ur.set_freedrive_mode(True)
|
|
print(ur.get_observations())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|