commit f2204f3cb47ed703d0cc7e897ebd15d7352bff22 Author: Igor Brylev Date: Wed Jun 4 17:23:37 2025 +0300 init diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..54aac19 --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +MIT License + +Original Copyright (c) 2023 Philipp Wu +Modified Copyright (c) 2025 Franka Robotics GmbH + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/config/gello_config.yaml b/config/gello_config.yaml new file mode 100644 index 0000000..1203ee1 --- /dev/null +++ b/config/gello_config.yaml @@ -0,0 +1,7 @@ +usb-FTDI_USB__-__Serial_Converter_FTA7NPA1-if00-port0: + side: "sample-config" + num_joints: 7 + joint_signs: [1, 1, 1, 1, 1, -1, 1] + gripper: true + best_offsets: [6.283, 4.712, 3.142, 3.142, 3.142, 3.142, 4.712] + gripper_range_rad: [2.77856, 3.50931] \ No newline at end of file diff --git a/franka_gello_state_publisher/__init__.py b/franka_gello_state_publisher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/franka_gello_state_publisher/__pycache__/__init__.cpython-312.pyc b/franka_gello_state_publisher/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..bce0c22 Binary files /dev/null and b/franka_gello_state_publisher/__pycache__/__init__.cpython-312.pyc differ diff --git a/franka_gello_state_publisher/__pycache__/driver.cpython-312.pyc b/franka_gello_state_publisher/__pycache__/driver.cpython-312.pyc new file mode 100644 index 0000000..8a6369e Binary files /dev/null and b/franka_gello_state_publisher/__pycache__/driver.cpython-312.pyc differ diff --git a/franka_gello_state_publisher/__pycache__/gello_publisher.cpython-312.pyc b/franka_gello_state_publisher/__pycache__/gello_publisher.cpython-312.pyc new file mode 100644 index 0000000..e745e64 Binary files /dev/null and b/franka_gello_state_publisher/__pycache__/gello_publisher.cpython-312.pyc differ diff --git a/franka_gello_state_publisher/driver.py b/franka_gello_state_publisher/driver.py new file mode 100644 index 0000000..3b6b282 --- /dev/null +++ b/franka_gello_state_publisher/driver.py @@ -0,0 +1,274 @@ +import time +from threading import Event, Lock, Thread +from typing import Protocol, Sequence + +import numpy as np +from dynamixel_sdk.group_sync_read import GroupSyncRead +from dynamixel_sdk.group_sync_write import GroupSyncWrite +from dynamixel_sdk.packet_handler import PacketHandler +from dynamixel_sdk.port_handler import PortHandler +from dynamixel_sdk.robotis_def import ( + COMM_SUCCESS, + DXL_HIBYTE, + DXL_HIWORD, + DXL_LOBYTE, + DXL_LOWORD, +) + +# Constants +ADDR_TORQUE_ENABLE = 64 +ADDR_GOAL_POSITION = 116 +LEN_GOAL_POSITION = 4 +ADDR_PRESENT_POSITION = 132 +LEN_PRESENT_POSITION = 4 +TORQUE_ENABLE = 1 +TORQUE_DISABLE = 0 + + +class DynamixelDriverProtocol(Protocol): + def set_joints(self, joint_angles: Sequence[float]): + """Set the joint angles for the Dynamixel servos. + + Args: + joint_angles (Sequence[float]): A list of joint angles. + """ + ... + + def torque_enabled(self) -> bool: + """Check if torque is enabled for the Dynamixel servos. + + Returns: + bool: True if torque is enabled, False if it is disabled. + """ + ... + + def set_torque_mode(self, enable: bool): + """Set the torque mode for the Dynamixel servos. + + Args: + enable (bool): True to enable torque, False to disable. + """ + ... + + def get_joints(self) -> np.ndarray: + """Get the current joint angles in radians. + + Returns: + np.ndarray: An array of joint angles. + """ + ... + + def close(self): + """Close the driver.""" + + +class FakeDynamixelDriver(DynamixelDriverProtocol): + def __init__(self, ids: Sequence[int]): + self._ids = ids + self._joint_angles = np.zeros(len(ids), dtype=int) + self._torque_enabled = False + + def set_joints(self, joint_angles: Sequence[float]): + if len(joint_angles) != len(self._ids): + raise ValueError( + "The length of joint_angles must match the number of servos" + ) + if not self._torque_enabled: + raise RuntimeError("Torque must be enabled to set joint angles") + self._joint_angles = np.array(joint_angles) + + def torque_enabled(self) -> bool: + return self._torque_enabled + + def set_torque_mode(self, enable: bool): + self._torque_enabled = enable + + def get_joints(self) -> np.ndarray: + return self._joint_angles.copy() + + def close(self): + pass + + +class DynamixelDriver(DynamixelDriverProtocol): + def __init__( + self, ids: Sequence[int], port: str = "/dev/ttyUSB0", baudrate: int = 2000000 + ): + """Initialize the DynamixelDriver class. + + Args: + ids (Sequence[int]): A list of IDs for the Dynamixel servos. + port (str): The USB port to connect to the arm. + baudrate (int): The baudrate for communication. + """ + self._ids = ids + self._joint_angles = None + self._lock = Lock() + + # Initialize the port handler, packet handler, and group sync read/write + self._portHandler = PortHandler(port) + self._packetHandler = PacketHandler(2.0) + self._groupSyncRead = GroupSyncRead( + self._portHandler, + self._packetHandler, + ADDR_PRESENT_POSITION, + LEN_PRESENT_POSITION, + ) + self._groupSyncWrite = GroupSyncWrite( + self._portHandler, + self._packetHandler, + ADDR_GOAL_POSITION, + LEN_GOAL_POSITION, + ) + + # Open the port and set the baudrate + if not self._portHandler.openPort(): + raise RuntimeError("Failed to open the port") + + if not self._portHandler.setBaudRate(baudrate): + raise RuntimeError(f"Failed to change the baudrate, {baudrate}") + + # Add parameters for each Dynamixel servo to the group sync read + for dxl_id in self._ids: + if not self._groupSyncRead.addParam(dxl_id): + raise RuntimeError( + f"Failed to add parameter for Dynamixel with ID {dxl_id}" + ) + + # Disable torque for each Dynamixel servo + self._torque_enabled = False + try: + self.set_torque_mode(self._torque_enabled) + except Exception as e: + print(f"port: {port}, {e}") + + self._stop_thread = Event() + self._start_reading_thread() + + def set_joints(self, joint_angles: Sequence[float]): + if len(joint_angles) != len(self._ids): + raise ValueError( + "The length of joint_angles must match the number of servos" + ) + if not self._torque_enabled: + raise RuntimeError("Torque must be enabled to set joint angles") + + for dxl_id, angle in zip(self._ids, joint_angles): + # Convert the angle to the appropriate value for the servo + position_value = int(angle * 2048 / np.pi) + + # Allocate goal position value into byte array + param_goal_position = [ + DXL_LOBYTE(DXL_LOWORD(position_value)), + DXL_HIBYTE(DXL_LOWORD(position_value)), + DXL_LOBYTE(DXL_HIWORD(position_value)), + DXL_HIBYTE(DXL_HIWORD(position_value)), + ] + + # Add goal position value to the Syncwrite parameter storage + dxl_addparam_result = self._groupSyncWrite.addParam( + dxl_id, param_goal_position + ) + if not dxl_addparam_result: + raise RuntimeError( + f"Failed to set joint angle for Dynamixel with ID {dxl_id}" + ) + + # Syncwrite goal position + dxl_comm_result = self._groupSyncWrite.txPacket() + if dxl_comm_result != COMM_SUCCESS: + raise RuntimeError("Failed to syncwrite goal position") + + # Clear syncwrite parameter storage + self._groupSyncWrite.clearParam() + + def torque_enabled(self) -> bool: + return self._torque_enabled + + def set_torque_mode(self, enable: bool): + torque_value = TORQUE_ENABLE if enable else TORQUE_DISABLE + with self._lock: + for dxl_id in self._ids: + dxl_comm_result, dxl_error = self._packetHandler.write1ByteTxRx( + self._portHandler, dxl_id, ADDR_TORQUE_ENABLE, torque_value + ) + if dxl_comm_result != COMM_SUCCESS or dxl_error != 0: + print(dxl_comm_result) + print(dxl_error) + raise RuntimeError( + f"Failed to set torque mode for Dynamixel with ID {dxl_id}" + ) + + self._torque_enabled = enable + + def _start_reading_thread(self): + self._reading_thread = Thread(target=self._read_joint_angles) + self._reading_thread.daemon = True + self._reading_thread.start() + + def _read_joint_angles(self): + # Continuously read joint angles and update the joint_angles array + while not self._stop_thread.is_set(): + time.sleep(0.001) + with self._lock: + _joint_angles = np.zeros(len(self._ids), dtype=int) + dxl_comm_result = self._groupSyncRead.txRxPacket() + if dxl_comm_result != COMM_SUCCESS: + print(f"warning, comm failed: {dxl_comm_result}") + continue + for i, dxl_id in enumerate(self._ids): + if self._groupSyncRead.isAvailable( + dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION + ): + angle = self._groupSyncRead.getData( + dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION + ) + angle = np.int32(np.uint32(angle)) + _joint_angles[i] = angle + else: + raise RuntimeError( + f"Failed to get joint angles for Dynamixel with ID {dxl_id}" + ) + self._joint_angles = _joint_angles + # self._groupSyncRead.clearParam() # TODO what does this do? should i add it + + def get_joints(self) -> np.ndarray: + # Return a copy of the joint_angles array to avoid race conditions + while self._joint_angles is None: + time.sleep(0.1) + # with self._lock: + _j = self._joint_angles.copy() + return _j / 2048.0 * np.pi + + def close(self): + self._stop_thread.set() + self._reading_thread.join() + self._portHandler.closePort() + + +def main(): + # Set the port, baudrate, and servo IDs + ids = [1] + + # Create a DynamixelDriver instance + try: + driver = DynamixelDriver(ids) + except FileNotFoundError: + driver = DynamixelDriver(ids, port="/dev/cu.usbserial-FT7WBMUB") + + # Test setting torque mode + driver.set_torque_mode(True) + driver.set_torque_mode(False) + + # Test reading the joint angles + try: + while True: + joint_angles = driver.get_joints() + print(f"Joint angles for IDs {ids}: {joint_angles}") + # print(f"Joint angles for IDs {ids[1]}: {joint_angles[1]}") + except KeyboardInterrupt: + driver.close() + + +if __name__ == "__main__": + main() # Test the driver diff --git a/franka_gello_state_publisher/gello_publisher.py b/franka_gello_state_publisher/gello_publisher.py new file mode 100644 index 0000000..c1a9e57 --- /dev/null +++ b/franka_gello_state_publisher/gello_publisher.py @@ -0,0 +1,129 @@ +import os +import glob +from typing import Tuple +import rclpy +from rclpy.node import Node +import numpy as np + +from .driver import DynamixelDriver +from sensor_msgs.msg import JointState +from std_msgs.msg import Float32 +import yaml +from ament_index_python.packages import get_package_share_directory + + +class GelloPublisher(Node): + def __init__(self): + super().__init__("gello_publisher") + + default_com_port = self.determine_default_com_port() + self.declare_parameter("com_port", default_com_port) + self.com_port = self.get_parameter("com_port").get_parameter_value().string_value + self.port = self.com_port.split("/")[-1] + """The port that GELLO is connected to.""" + + config_path = os.path.join( + get_package_share_directory("franka_gello_state_publisher"), + "config", + "gello_config.yaml", + ) + self.get_values_from_config(config_path) + + self.robot_joint_publisher = self.create_publisher(JointState, "/gello/joint_states", 10) + self.gripper_joint_publisher = self.create_publisher( + Float32, "/gripper_client/target_gripper_width_percent", 10 + ) + + self.timer = self.create_timer(1 / 25, self.publish_joint_jog) + + self.joint_names = [ + "fr3_joint1", + "fr3_joint2", + "fr3_joint3", + "fr3_joint4", + "fr3_joint5", + "fr3_joint6", + "fr3_joint7", + ] + + def determine_default_com_port(self) -> str: + matches = glob.glob("/dev/serial/by-id/usb-FTDI_USB__-__Serial_Converter*") + if matches: + self.get_logger().info(f"Auto-detected com_ports: {matches}") + return matches[0] + else: + self.get_logger().warn("No com_ports detected. Please specify the com_port manually.") + return "INVALID_COM_PORT" + + def get_values_from_config(self, config_file: str): + with open(config_file, "r") as file: + config = yaml.safe_load(file) + + self.num_robot_joints: int = config[self.port]["num_joints"] + """The number of joints in the robot.""" + + self.joint_signs: Tuple[float, ...] = config[self.port]["joint_signs"] + """Depending on how the motor is mounted on the Gello, its rotation direction can be reversed.""" + + self.gripper: bool = config[self.port]["gripper"] + """Whether or not the gripper is attached.""" + + joint_ids = list(range(1, self.num_joints)) + self.driver = DynamixelDriver(joint_ids, port=self.com_port, baudrate=2000000) + """The driver for the Dynamixel motors.""" + + self.best_offsets = np.array(config[self.port]["best_offsets"]) + """The best offsets for the joints.""" + + self.gripper_range_rad: Tuple[float, float] = config[self.port]["gripper_range_rad"] + """The range of the gripper in radians.""" + + self.__post_init__() + + def __post_init__(self): + assert len(self.joint_signs) == self.num_robot_joints + for idx, j in enumerate(self.joint_signs): + assert j == -1 or j == 1, f"Joint idx: {idx} should be -1 or 1, but got {j}." + + @property + def num_joints(self) -> int: + extra_joints = 1 if self.gripper else 0 + return self.num_robot_joints + extra_joints + + def publish_joint_jog(self): + current_joints = self.driver.get_joints() + current_robot_joints = current_joints[: self.num_robot_joints] + current_joints_corrected = (current_robot_joints - self.best_offsets) * self.joint_signs + + robot_joint_states = JointState() + robot_joint_states.header.stamp = self.get_clock().now().to_msg() + robot_joint_states.name = self.joint_names + robot_joint_states.header.frame_id = "fr3_link0" + robot_joint_states.position = [float(joint) for joint in current_joints_corrected] + + gripper_joint_states = Float32() + if self.gripper: + gripper_position = current_joints[-1] + gripper_joint_states.data = self.gripper_readout_to_percent(gripper_position) + else: + gripper_joint_states.position = 0.0 + self.robot_joint_publisher.publish(robot_joint_states) + self.gripper_joint_publisher.publish(gripper_joint_states) + + def gripper_readout_to_percent(self, gripper_position: float) -> float: + gripper_percent = (gripper_position - self.gripper_range_rad[0]) / ( + self.gripper_range_rad[1] - self.gripper_range_rad[0] + ) + return max(0.0, min(1.0, gripper_percent)) + + +def main(args=None): + rclpy.init(args=args) + gello_publisher = GelloPublisher() + rclpy.spin(gello_publisher) + gello_publisher.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/launch/main.launch.py b/launch/main.launch.py new file mode 100755 index 0000000..9d6c8d2 --- /dev/null +++ b/launch/main.launch.py @@ -0,0 +1,22 @@ +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch_ros.actions import Node +from launch.substitutions import LaunchConfiguration + + +def generate_launch_description(): + args = [] + args.append( + DeclareLaunchArgument(name="com_port", default_value=None, description="Default COM port") + ) + nodes = [ + Node( + package="franka_gello_state_publisher", + executable="gello_publisher", + name="gello_publisher", + output="screen", + parameters=[{"com_port": LaunchConfiguration("com_port")}], + ), + ] + + return LaunchDescription(args + nodes) diff --git a/package.xml b/package.xml new file mode 100644 index 0000000..43cdf8a --- /dev/null +++ b/package.xml @@ -0,0 +1,24 @@ + + + + franka_gello_state_publisher + 0.1.0 + Publisher of joint states of gello + Franka Robotics GmbH + MIT + + rclpy + std_msgs + sensor_msgs + control_msgs + geometry_msgs + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/resource/franka_gello_state_publisher b/resource/franka_gello_state_publisher new file mode 100644 index 0000000..e69de29 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..0b65838 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/franka_gello_state_publisher +[install] +install_scripts=$base/lib/franka_gello_state_publisher diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..ae7f096 --- /dev/null +++ b/setup.py @@ -0,0 +1,26 @@ +from setuptools import find_packages, setup +import glob + +package_name = "franka_gello_state_publisher" + +setup( + name=package_name, + version="0.1.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), + ("share/" + package_name + "/config", glob.glob("config/*.yaml")), + ("share/" + package_name + "/launch", ["launch/main.launch.py"]), + ], + install_requires=["setuptools", "dynamixel_sdk"], + zip_safe=True, + maintainer="Franka Robotics GmbH", + maintainer_email="support@franka.de", + description="Publishes the state of the GELLO teleoperation device.", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": ["gello_publisher = franka_gello_state_publisher.gello_publisher:main"], + }, +) diff --git a/test/test_copyright.py b/test/test_copyright.py new file mode 100644 index 0000000..95f0381 --- /dev/null +++ b/test/test_copyright.py @@ -0,0 +1,25 @@ +# Copyright 2015 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_copyright.main import main +import pytest + + +# Remove the `skip` decorator once the source file(s) have a copyright header +@pytest.mark.skip(reason="No copyright header has been placed in the generated source file.") +@pytest.mark.copyright +@pytest.mark.linter +def test_copyright(): + rc = main(argv=[".", "test"]) + assert rc == 0, "Found errors" diff --git a/test/test_flake8.py b/test/test_flake8.py new file mode 100644 index 0000000..75c3f19 --- /dev/null +++ b/test/test_flake8.py @@ -0,0 +1,33 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_flake8.main import main_with_errors +import pytest +from pathlib import Path + + +@pytest.mark.flake8 +@pytest.mark.linter +def test_flake8(): + config_path = Path(__file__).resolve().parents[3] / ".flake8" + excluded_file = Path(__file__).resolve().parents[1] / "franka_gello_state_publisher/driver.py" + print(excluded_file) + rc, errors = main_with_errors( + argv=["--config", str(config_path), "--exclude", str(excluded_file)] + ) + assert rc == 0, "Found %d code style errors / warnings:\n" % len(errors) + "\n".join(errors) + + +if __name__ == "__main__": + test_flake8() diff --git a/test/test_pep257.py b/test/test_pep257.py new file mode 100644 index 0000000..647b3c3 --- /dev/null +++ b/test/test_pep257.py @@ -0,0 +1,25 @@ +# Copyright 2015 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_pep257.main import main +import pytest +from pathlib import Path + + +@pytest.mark.linter +@pytest.mark.pep257 +def test_pep257(): + excluded_file = Path(__file__).resolve().parents[1] / "franka_gello_state_publisher/driver.py" + rc = main(argv=[".", "test", "--exclude", str(excluded_file)]) + assert rc == 0, "Found code style errors / warnings"