import argparse import os import json import subprocess import signal import time from ros2cli.node.strategy import NodeStrategy from ros2topic.api import get_topic_names_and_types OUTPUT_FILE = "topics.json" TOPICS_FILTER = ["/parameter_events", "/rosout"] def get_script_args(cfg): args = cfg["command"].split() args.append(cfg["package"]) args.append(cfg["executable"]) return args def get_topics(filename, path): jsonpath = os.path.join(path, filename) with NodeStrategy({}) as node: topic_names_and_types = get_topic_names_and_types(node=node, include_hidden_topics=False) topic_info = [] for (topic_name, topic_types) in topic_names_and_types: if not topic_name in TOPICS_FILTER: topic_info.append({"name": topic_name, "type": topic_types[0]}) print(f"---> number of topics: {len(topic_info)}") j_data = {"topics": topic_info} with open(jsonpath, "w") as fh: json.dump(j_data, fh, indent=2) for topic in topic_info: print(topic["name"]) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--package", required=True, help="Json-string/file with package parameters") parser.add_argument("--path", default="", help="Output path") parser.add_argument("--json", default=OUTPUT_FILE, help="Output file name in json-format") parser.add_argument('--delay', default=5, type=int, help="Delay in seconds") args = parser.parse_args() if args.package[-5:] == ".json": if not os.path.isfile(args.package): print(f"Error: no such file '{args.package}'") exit(-1) with open(args.package, "r") as f: j_data = f.read() else: j_data = args.package try: cfg = json.loads(j_data) except json.JSONDecodeError as e: print(f"JSon error: {e}") exit(-2) cmd = get_script_args(cfg) process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) time.sleep(args.delay) get_topics(args.json, args.path) process.send_signal(signal.SIGINT)