68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
import argparse
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import signal
|
|
import time
|
|
|
|
from ros2cli.node.strategy import NodeStrategy
|
|
from ros2topic.api import get_topic_names_and_types
|
|
|
|
OUTPUT_FILE = "topics.json"
|
|
TOPICS_FILTER = ["/parameter_events", "/rosout"]
|
|
|
|
def get_script_args(cfg):
|
|
args = cfg["command"].split()
|
|
args.append(cfg["package"])
|
|
args.append(cfg["executable"])
|
|
return args
|
|
|
|
def get_topics(filename, path):
|
|
jsonpath = os.path.join(path, filename)
|
|
|
|
with NodeStrategy({}) as node:
|
|
topic_names_and_types = get_topic_names_and_types(node=node, include_hidden_topics=False)
|
|
|
|
topic_info = []
|
|
for (topic_name, topic_types) in topic_names_and_types:
|
|
if not topic_name in TOPICS_FILTER:
|
|
topic_info.append({"name": topic_name, "type": topic_types[0]})
|
|
|
|
print(f"---> number of topics: {len(topic_info)}")
|
|
|
|
j_data = {"topics": topic_info}
|
|
with open(jsonpath, "w") as fh:
|
|
json.dump(j_data, fh, indent=2)
|
|
|
|
for topic in topic_info:
|
|
print(topic["name"])
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--package", required=True, help="Json-string/file with package parameters")
|
|
parser.add_argument("--path", default="", help="Output path")
|
|
parser.add_argument("--json", default=OUTPUT_FILE, help="Output file name in json-format")
|
|
parser.add_argument('--delay', default=5, type=int, help="Delay in seconds")
|
|
args = parser.parse_args()
|
|
|
|
if args.package[-5:] == ".json":
|
|
if not os.path.isfile(args.package):
|
|
print(f"Error: no such file '{args.package}'")
|
|
exit(-1)
|
|
with open(args.package, "r") as f:
|
|
j_data = f.read()
|
|
else:
|
|
j_data = args.package
|
|
try:
|
|
cfg = json.loads(j_data)
|
|
except json.JSONDecodeError as e:
|
|
print(f"JSon error: {e}")
|
|
exit(-2)
|
|
|
|
cmd = get_script_args(cfg)
|
|
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
time.sleep(args.delay)
|
|
get_topics(args.json, args.path)
|
|
|
|
process.send_signal(signal.SIGINT)
|