This commit is contained in:
Igor Brylev 2025-06-04 17:23:37 +03:00
commit f2204f3cb4
16 changed files with 591 additions and 0 deletions

22
LICENSE Normal file
View file

@ -0,0 +1,22 @@
MIT License
Original Copyright (c) 2023 Philipp Wu
Modified Copyright (c) 2025 Franka Robotics GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

7
config/gello_config.yaml Normal file
View file

@ -0,0 +1,7 @@
usb-FTDI_USB__-__Serial_Converter_FTA7NPA1-if00-port0:
side: "sample-config"
num_joints: 7
joint_signs: [1, 1, 1, 1, 1, -1, 1]
gripper: true
best_offsets: [6.283, 4.712, 3.142, 3.142, 3.142, 3.142, 4.712]
gripper_range_rad: [2.77856, 3.50931]

View file

View file

@ -0,0 +1,274 @@
import time
from threading import Event, Lock, Thread
from typing import Protocol, Sequence
import numpy as np
from dynamixel_sdk.group_sync_read import GroupSyncRead
from dynamixel_sdk.group_sync_write import GroupSyncWrite
from dynamixel_sdk.packet_handler import PacketHandler
from dynamixel_sdk.port_handler import PortHandler
from dynamixel_sdk.robotis_def import (
COMM_SUCCESS,
DXL_HIBYTE,
DXL_HIWORD,
DXL_LOBYTE,
DXL_LOWORD,
)
# Constants
ADDR_TORQUE_ENABLE = 64
ADDR_GOAL_POSITION = 116
LEN_GOAL_POSITION = 4
ADDR_PRESENT_POSITION = 132
LEN_PRESENT_POSITION = 4
TORQUE_ENABLE = 1
TORQUE_DISABLE = 0
class DynamixelDriverProtocol(Protocol):
def set_joints(self, joint_angles: Sequence[float]):
"""Set the joint angles for the Dynamixel servos.
Args:
joint_angles (Sequence[float]): A list of joint angles.
"""
...
def torque_enabled(self) -> bool:
"""Check if torque is enabled for the Dynamixel servos.
Returns:
bool: True if torque is enabled, False if it is disabled.
"""
...
def set_torque_mode(self, enable: bool):
"""Set the torque mode for the Dynamixel servos.
Args:
enable (bool): True to enable torque, False to disable.
"""
...
def get_joints(self) -> np.ndarray:
"""Get the current joint angles in radians.
Returns:
np.ndarray: An array of joint angles.
"""
...
def close(self):
"""Close the driver."""
class FakeDynamixelDriver(DynamixelDriverProtocol):
def __init__(self, ids: Sequence[int]):
self._ids = ids
self._joint_angles = np.zeros(len(ids), dtype=int)
self._torque_enabled = False
def set_joints(self, joint_angles: Sequence[float]):
if len(joint_angles) != len(self._ids):
raise ValueError(
"The length of joint_angles must match the number of servos"
)
if not self._torque_enabled:
raise RuntimeError("Torque must be enabled to set joint angles")
self._joint_angles = np.array(joint_angles)
def torque_enabled(self) -> bool:
return self._torque_enabled
def set_torque_mode(self, enable: bool):
self._torque_enabled = enable
def get_joints(self) -> np.ndarray:
return self._joint_angles.copy()
def close(self):
pass
class DynamixelDriver(DynamixelDriverProtocol):
def __init__(
self, ids: Sequence[int], port: str = "/dev/ttyUSB0", baudrate: int = 2000000
):
"""Initialize the DynamixelDriver class.
Args:
ids (Sequence[int]): A list of IDs for the Dynamixel servos.
port (str): The USB port to connect to the arm.
baudrate (int): The baudrate for communication.
"""
self._ids = ids
self._joint_angles = None
self._lock = Lock()
# Initialize the port handler, packet handler, and group sync read/write
self._portHandler = PortHandler(port)
self._packetHandler = PacketHandler(2.0)
self._groupSyncRead = GroupSyncRead(
self._portHandler,
self._packetHandler,
ADDR_PRESENT_POSITION,
LEN_PRESENT_POSITION,
)
self._groupSyncWrite = GroupSyncWrite(
self._portHandler,
self._packetHandler,
ADDR_GOAL_POSITION,
LEN_GOAL_POSITION,
)
# Open the port and set the baudrate
if not self._portHandler.openPort():
raise RuntimeError("Failed to open the port")
if not self._portHandler.setBaudRate(baudrate):
raise RuntimeError(f"Failed to change the baudrate, {baudrate}")
# Add parameters for each Dynamixel servo to the group sync read
for dxl_id in self._ids:
if not self._groupSyncRead.addParam(dxl_id):
raise RuntimeError(
f"Failed to add parameter for Dynamixel with ID {dxl_id}"
)
# Disable torque for each Dynamixel servo
self._torque_enabled = False
try:
self.set_torque_mode(self._torque_enabled)
except Exception as e:
print(f"port: {port}, {e}")
self._stop_thread = Event()
self._start_reading_thread()
def set_joints(self, joint_angles: Sequence[float]):
if len(joint_angles) != len(self._ids):
raise ValueError(
"The length of joint_angles must match the number of servos"
)
if not self._torque_enabled:
raise RuntimeError("Torque must be enabled to set joint angles")
for dxl_id, angle in zip(self._ids, joint_angles):
# Convert the angle to the appropriate value for the servo
position_value = int(angle * 2048 / np.pi)
# Allocate goal position value into byte array
param_goal_position = [
DXL_LOBYTE(DXL_LOWORD(position_value)),
DXL_HIBYTE(DXL_LOWORD(position_value)),
DXL_LOBYTE(DXL_HIWORD(position_value)),
DXL_HIBYTE(DXL_HIWORD(position_value)),
]
# Add goal position value to the Syncwrite parameter storage
dxl_addparam_result = self._groupSyncWrite.addParam(
dxl_id, param_goal_position
)
if not dxl_addparam_result:
raise RuntimeError(
f"Failed to set joint angle for Dynamixel with ID {dxl_id}"
)
# Syncwrite goal position
dxl_comm_result = self._groupSyncWrite.txPacket()
if dxl_comm_result != COMM_SUCCESS:
raise RuntimeError("Failed to syncwrite goal position")
# Clear syncwrite parameter storage
self._groupSyncWrite.clearParam()
def torque_enabled(self) -> bool:
return self._torque_enabled
def set_torque_mode(self, enable: bool):
torque_value = TORQUE_ENABLE if enable else TORQUE_DISABLE
with self._lock:
for dxl_id in self._ids:
dxl_comm_result, dxl_error = self._packetHandler.write1ByteTxRx(
self._portHandler, dxl_id, ADDR_TORQUE_ENABLE, torque_value
)
if dxl_comm_result != COMM_SUCCESS or dxl_error != 0:
print(dxl_comm_result)
print(dxl_error)
raise RuntimeError(
f"Failed to set torque mode for Dynamixel with ID {dxl_id}"
)
self._torque_enabled = enable
def _start_reading_thread(self):
self._reading_thread = Thread(target=self._read_joint_angles)
self._reading_thread.daemon = True
self._reading_thread.start()
def _read_joint_angles(self):
# Continuously read joint angles and update the joint_angles array
while not self._stop_thread.is_set():
time.sleep(0.001)
with self._lock:
_joint_angles = np.zeros(len(self._ids), dtype=int)
dxl_comm_result = self._groupSyncRead.txRxPacket()
if dxl_comm_result != COMM_SUCCESS:
print(f"warning, comm failed: {dxl_comm_result}")
continue
for i, dxl_id in enumerate(self._ids):
if self._groupSyncRead.isAvailable(
dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION
):
angle = self._groupSyncRead.getData(
dxl_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION
)
angle = np.int32(np.uint32(angle))
_joint_angles[i] = angle
else:
raise RuntimeError(
f"Failed to get joint angles for Dynamixel with ID {dxl_id}"
)
self._joint_angles = _joint_angles
# self._groupSyncRead.clearParam() # TODO what does this do? should i add it
def get_joints(self) -> np.ndarray:
# Return a copy of the joint_angles array to avoid race conditions
while self._joint_angles is None:
time.sleep(0.1)
# with self._lock:
_j = self._joint_angles.copy()
return _j / 2048.0 * np.pi
def close(self):
self._stop_thread.set()
self._reading_thread.join()
self._portHandler.closePort()
def main():
# Set the port, baudrate, and servo IDs
ids = [1]
# Create a DynamixelDriver instance
try:
driver = DynamixelDriver(ids)
except FileNotFoundError:
driver = DynamixelDriver(ids, port="/dev/cu.usbserial-FT7WBMUB")
# Test setting torque mode
driver.set_torque_mode(True)
driver.set_torque_mode(False)
# Test reading the joint angles
try:
while True:
joint_angles = driver.get_joints()
print(f"Joint angles for IDs {ids}: {joint_angles}")
# print(f"Joint angles for IDs {ids[1]}: {joint_angles[1]}")
except KeyboardInterrupt:
driver.close()
if __name__ == "__main__":
main() # Test the driver

View file

@ -0,0 +1,129 @@
import os
import glob
from typing import Tuple
import rclpy
from rclpy.node import Node
import numpy as np
from .driver import DynamixelDriver
from sensor_msgs.msg import JointState
from std_msgs.msg import Float32
import yaml
from ament_index_python.packages import get_package_share_directory
class GelloPublisher(Node):
def __init__(self):
super().__init__("gello_publisher")
default_com_port = self.determine_default_com_port()
self.declare_parameter("com_port", default_com_port)
self.com_port = self.get_parameter("com_port").get_parameter_value().string_value
self.port = self.com_port.split("/")[-1]
"""The port that GELLO is connected to."""
config_path = os.path.join(
get_package_share_directory("franka_gello_state_publisher"),
"config",
"gello_config.yaml",
)
self.get_values_from_config(config_path)
self.robot_joint_publisher = self.create_publisher(JointState, "/gello/joint_states", 10)
self.gripper_joint_publisher = self.create_publisher(
Float32, "/gripper_client/target_gripper_width_percent", 10
)
self.timer = self.create_timer(1 / 25, self.publish_joint_jog)
self.joint_names = [
"fr3_joint1",
"fr3_joint2",
"fr3_joint3",
"fr3_joint4",
"fr3_joint5",
"fr3_joint6",
"fr3_joint7",
]
def determine_default_com_port(self) -> str:
matches = glob.glob("/dev/serial/by-id/usb-FTDI_USB__-__Serial_Converter*")
if matches:
self.get_logger().info(f"Auto-detected com_ports: {matches}")
return matches[0]
else:
self.get_logger().warn("No com_ports detected. Please specify the com_port manually.")
return "INVALID_COM_PORT"
def get_values_from_config(self, config_file: str):
with open(config_file, "r") as file:
config = yaml.safe_load(file)
self.num_robot_joints: int = config[self.port]["num_joints"]
"""The number of joints in the robot."""
self.joint_signs: Tuple[float, ...] = config[self.port]["joint_signs"]
"""Depending on how the motor is mounted on the Gello, its rotation direction can be reversed."""
self.gripper: bool = config[self.port]["gripper"]
"""Whether or not the gripper is attached."""
joint_ids = list(range(1, self.num_joints))
self.driver = DynamixelDriver(joint_ids, port=self.com_port, baudrate=2000000)
"""The driver for the Dynamixel motors."""
self.best_offsets = np.array(config[self.port]["best_offsets"])
"""The best offsets for the joints."""
self.gripper_range_rad: Tuple[float, float] = config[self.port]["gripper_range_rad"]
"""The range of the gripper in radians."""
self.__post_init__()
def __post_init__(self):
assert len(self.joint_signs) == self.num_robot_joints
for idx, j in enumerate(self.joint_signs):
assert j == -1 or j == 1, f"Joint idx: {idx} should be -1 or 1, but got {j}."
@property
def num_joints(self) -> int:
extra_joints = 1 if self.gripper else 0
return self.num_robot_joints + extra_joints
def publish_joint_jog(self):
current_joints = self.driver.get_joints()
current_robot_joints = current_joints[: self.num_robot_joints]
current_joints_corrected = (current_robot_joints - self.best_offsets) * self.joint_signs
robot_joint_states = JointState()
robot_joint_states.header.stamp = self.get_clock().now().to_msg()
robot_joint_states.name = self.joint_names
robot_joint_states.header.frame_id = "fr3_link0"
robot_joint_states.position = [float(joint) for joint in current_joints_corrected]
gripper_joint_states = Float32()
if self.gripper:
gripper_position = current_joints[-1]
gripper_joint_states.data = self.gripper_readout_to_percent(gripper_position)
else:
gripper_joint_states.position = 0.0
self.robot_joint_publisher.publish(robot_joint_states)
self.gripper_joint_publisher.publish(gripper_joint_states)
def gripper_readout_to_percent(self, gripper_position: float) -> float:
gripper_percent = (gripper_position - self.gripper_range_rad[0]) / (
self.gripper_range_rad[1] - self.gripper_range_rad[0]
)
return max(0.0, min(1.0, gripper_percent))
def main(args=None):
rclpy.init(args=args)
gello_publisher = GelloPublisher()
rclpy.spin(gello_publisher)
gello_publisher.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

22
launch/main.launch.py Executable file
View file

@ -0,0 +1,22 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
args = []
args.append(
DeclareLaunchArgument(name="com_port", default_value=None, description="Default COM port")
)
nodes = [
Node(
package="franka_gello_state_publisher",
executable="gello_publisher",
name="gello_publisher",
output="screen",
parameters=[{"com_port": LaunchConfiguration("com_port")}],
),
]
return LaunchDescription(args + nodes)

24
package.xml Normal file
View file

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>franka_gello_state_publisher</name>
<version>0.1.0</version>
<description>Publisher of joint states of gello</description>
<maintainer email="support@franka.de">Franka Robotics GmbH</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>control_msgs</depend>
<depend>geometry_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View file

4
setup.cfg Normal file
View file

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/franka_gello_state_publisher
[install]
install_scripts=$base/lib/franka_gello_state_publisher

26
setup.py Normal file
View file

@ -0,0 +1,26 @@
from setuptools import find_packages, setup
import glob
package_name = "franka_gello_state_publisher"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
("share/" + package_name + "/config", glob.glob("config/*.yaml")),
("share/" + package_name + "/launch", ["launch/main.launch.py"]),
],
install_requires=["setuptools", "dynamixel_sdk"],
zip_safe=True,
maintainer="Franka Robotics GmbH",
maintainer_email="support@franka.de",
description="Publishes the state of the GELLO teleoperation device.",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": ["gello_publisher = franka_gello_state_publisher.gello_publisher:main"],
},
)

25
test/test_copyright.py Normal file
View file

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason="No copyright header has been placed in the generated source file.")
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=[".", "test"])
assert rc == 0, "Found errors"

33
test/test_flake8.py Normal file
View file

@ -0,0 +1,33 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
from pathlib import Path
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
config_path = Path(__file__).resolve().parents[3] / ".flake8"
excluded_file = Path(__file__).resolve().parents[1] / "franka_gello_state_publisher/driver.py"
print(excluded_file)
rc, errors = main_with_errors(
argv=["--config", str(config_path), "--exclude", str(excluded_file)]
)
assert rc == 0, "Found %d code style errors / warnings:\n" % len(errors) + "\n".join(errors)
if __name__ == "__main__":
test_flake8()

25
test/test_pep257.py Normal file
View file

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
from pathlib import Path
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
excluded_file = Path(__file__).resolve().parents[1] / "franka_gello_state_publisher/driver.py"
rc = main(argv=[".", "test", "--exclude", str(excluded_file)])
assert rc == 0, "Found code style errors / warnings"