Add support for getting sources

Either from local paths or by calling fetches.
This commit is contained in:
Michal Sojka 2024-09-15 00:16:10 +02:00
parent 6e8ad35fe7
commit f1ab404292
2 changed files with 116 additions and 35 deletions

View file

@ -7,7 +7,8 @@ import os
import argparse
import itertools
import subprocess
from catkin_pkg.package import parse_package_string
from textwrap import dedent, indent
from catkin_pkg.package import parse_package_string, Package
from rosinstall_generator.distro import get_distro
from superflore.PackageMetadata import PackageMetadata
from superflore.exceptions import UnresolvedDependency
@ -16,11 +17,13 @@ from .nix_expression import NixExpression, NixLicense
from superflore.utils import (download_file, get_distro_condition_context,
get_distros, get_pkg_version, info, resolve_dep,
retry_on_exception, warn)
from typing import Dict, Iterable, Set
from typing import Dict, Iterable, Set, reveal_type
from superflore.utils import err
from superflore.utils import ok
from superflore.utils import warn
import urllib.parse
import re
import json
def resolve_dependencies(deps: Iterable[str]) -> Set[str]:
return set(itertools.chain.from_iterable(
@ -49,41 +52,80 @@ def get_dependencies_as_set(pkg, dep_type):
return set([d.name for d in deps[dep_type] if d.evaluated_condition is not False])
def get_output_file_name(pkg, args):
def get_output_file_name(source: str, pkg: Package, args):
if args.output_as_ros_pkg_name:
fn = f"{pkg.name}.nix"
elif args.output_as_nix_pkg_name:
fn = f"{NixPackage.normalize_name(pkg.name)}.nix"
else:
fn = args.output
dir = args.output_dir if args.output_dir is not None else os.path.dirname(source)
return os.path.join(dir, fn)
return os.path.join(args.output_dir, fn)
def generate_overlay(expressions: dict[str, str], args):
with open("overlay.nix", "w") as f:
print("self: super:\n{", file=f)
for pkg in sorted(expressions):
print(f" {pkg} = super.callPackage {expressions[pkg]} {{}};", file=f)
print("}", file=f)
def generate_default(args):
with open("default.nix", "w") as f:
f.write('''{
nix-ros-overlay ? builtins.fetchTarball "https://github.com/lopsided98/nix-ros-overlay/archive/master.tar.gz",
}:
let
applyDistroOverlay =
rosOverlay: rosPackages:
rosPackages
// builtins.mapAttrs (
rosDistro: rosPkgs: if rosPkgs ? overrideScope then rosPkgs.overrideScope rosOverlay else rosPkgs
) rosPackages;
rosDistroOverlays = self: super: {
# Apply the overlay to multiple ROS distributions
rosPackages = applyDistroOverlay (import ./overlay.nix) super.rosPackages;
};
in
import nix-ros-overlay {
overlays = [ rosDistroOverlays ];
}
''')
def ros2nix(args):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("source", nargs="+", help="Path to package.xml") # TODO or a directory containing package.xml or an ")
parser.add_argument("source", nargs="+", help="Path to package.xml")
group = parser.add_mutually_exclusive_group()
group.add_argument("--output", default="package.nix", help="Output filename")
group.add_argument("--output-as-ros-pkg-name", action="store_true", help="Name output file based on ROS package name, e.g., package_name.nix")
group.add_argument("--output-as-nix-pkg-name", action="store_true", help="Name output file based on Nix package name, e.g., package-name.nix")
group.add_argument("--output-as-ros-pkg-name", action="store_true", help="Name output file based on ROS package name, e.g., package_name.nix. Implies --output-dir=.")
group.add_argument("--output-as-nix-pkg-name", action="store_true", help="Name output file based on Nix package name, e.g., package-name.nix. Implies --output-dir=.")
parser.add_argument("--output-dir", default=".", help="Directory to store output files in")
parser.add_argument("--output-dir", help="Directory to generate output files in (by default, files are stored next to their corresponding package.xml)")
parser.add_argument("--fetch", action="store_true", help="Use fetches like fetchFromGitHub for src attribute. "
"The fetch function and its parameters are determined from the local git work tree."
"sourceRoot is set if needed and not overridden by --source-root.")
parser.add_argument("--distro", default="rolling",
help="ROS distro (used as a context for evaluation of conditions in package.xml and in the name of the Nix expression)")
parser.add_argument("--src-param",
help="Parameter name in arguments of the generated function to be used as a src attribute")
parser.add_argument("--source-root",
help="sourceRoot attribute value in the generated Nix expression. Substring '{package_name}' gets replaced with package name.")
help="Set sourceRoot attribute value in the generated Nix expression. "
"Substring '{package_name}' gets replaced with the package name.")
parser.add_argument("--nixfmt", action="store_true", help="Format the resulting expression with nixfmt")
parser.add_argument("--nixfmt", action="store_true", help="Format the resulting expressions with nixfmt")
parser.add_argument("--copyright-holder")
parser.add_argument("--license", help="License of the generated Nix expression, e.g. 'BSD'")
args = parser.parse_args()
if args.output_dir is None and (args.output_as_nix_pkg_name or args.output_as_ros_pkg_name):
args.output_dir = "."
expressions: dict[str, str] = {}
git_cache = {}
for source in args.source:
try:
with open(source, 'r') as f:
@ -121,9 +163,59 @@ def ros2nix(args):
if args.src_param:
kwargs["src_param"] = args.src_param
kwargs["src_expr"] = args.src_param
elif args.fetch:
srcdir = os.path.dirname(source)
url = subprocess.check_output(
"git config remote.origin.url".split(), cwd=srcdir
).decode().strip()
prefix = subprocess.check_output(
"git rev-parse --show-prefix".split(), cwd=srcdir
).decode().strip()
toplevel = subprocess.check_output(
"git rev-parse --show-toplevel".split(), cwd=srcdir
).decode().strip()
if toplevel in git_cache:
info = git_cache[toplevel]
else:
info = json.loads(
subprocess.check_output(
["nix-prefetch-git", "--quiet", toplevel],
).decode()
)
git_cache[toplevel] = info
match = re.match("https://github.com/(?P<owner>[^/]*)/(?P<repo>.*?)(.git)?$", url)
if match is not None:
kwargs["src_param"] = "fetchFromGitHub";
kwargs["src_expr"] = dedent(f'''
fetchFromGitHub {{
owner = "{match["owner"]}";
repo = "{match["repo"]}";
rev = "{info["rev"]}";
sha256 = "{info["sha256"]}";
}}''').strip()
else:
kwargs["src_param"] = "fetchgit";
kwargs["src_expr"] = dedent(f'''
fetchgit {{
url = "{url}";
rev = "{info["rev"]}";
sha256 = "{info["sha256"]}";
}}''').strip()
if prefix:
#kwargs["src_expr"] = f'''let fullSrc = {kwargs["src_expr"]}; in "${{fullSrc}}/{prefix}"'''
kwargs["source_root"] = f"${{src.name}}/{prefix}";
else:
kwargs["src_url"] = "src_uri", # TODO
kwargs["src_sha256"] = "src_sha256",
if args.output_dir is None:
kwargs["src_expr"] = "./."
else:
kwargs["src_expr"] = f"./{os.path.dirname(source)}"
if args.source_root:
kwargs["source_root"] = args.source_root.replace('{package_name}', pkg.name)
@ -162,14 +254,19 @@ def ros2nix(args):
derivation_text, _ = nixfmt.communicate(input=derivation_text)
try:
output_file_name = get_output_file_name(pkg, args)
output_file_name = get_output_file_name(source, pkg, args)
with open(output_file_name, "w") as recipe_file:
recipe_file.write(derivation_text)
ok(f"Successfully generated derivation for package '{pkg.name}' as '{output_file_name}'.")
expressions[NixPackage.normalize_name(pkg.name)] = output_file_name
except Exception as e:
err("Failed to write derivation to disk!")
raise e
generate_overlay(expressions, args)
generate_default(args)
def main():
import sys
ros2nix(sys.argv[1:])