abra-bash/bin/app-json.py

215 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
# Usage: ./app-json.py
#
# Gather metadata from Co-op Cloud apps in $ABRA_DIR/apps (default
# ~/.abra/apps), and format it as JSON so that it can be hosted here:
# https://apps.coopcloud.tech
from json import dump
from os import chdir, listdir
from os.path import basename
from re import findall, search
from subprocess import DEVNULL
from requests import get
from abralib import (
CLONES_PATH,
REPOS_TO_SKIP,
SCRIPT_PATH,
YQ_PATH,
_run_cmd,
clone_all_apps,
get_repos_json,
log,
)
def get_published_apps_json():
"""Retrieve already published apps json."""
url = "https://apps.coopcloud.tech"
log.info(f"Retrieving {url}")
try:
return get(url, timeout=5).json()
except Exception as exception:
log.error(f"Failed to retrieve {url}, saw {str(exception)}")
return {}
def generate_apps_json(repos_json):
"""Generate the abra-apps.json application versions file."""
apps_json = {}
cached_apps_json = get_published_apps_json()
for app in listdir(CLONES_PATH):
if app in REPOS_TO_SKIP:
log.info(f"Skipping {app}")
continue
repo_details = next(filter(lambda x: x["name"] == app, repos_json), {})
app_path = f"{CLONES_PATH}/{app}"
chdir(app_path)
metadata = get_app_metadata(app_path)
name = metadata.pop("name", app)
log.info(f"Processing {app}")
apps_json[app] = {
"name": name,
"category": metadata.get("category", ""),
"repository": repo_details.get("clone_url", ""),
"default_branch": repo_details.get("default_branch", ""),
"description": repo_details.get("description", ""),
"website": repo_details.get("website", ""),
"features": metadata,
"versions": get_app_versions(app_path, cached_apps_json),
"icon": repo_details.get("avatar_url", ""),
}
return apps_json
def get_app_metadata(app_path):
"""Parse metadata from app repo README files."""
metadata = {}
chdir(app_path)
try:
with open(f"{app_path}/README.md", "r") as handle:
log.info(f"{app_path}/README.md")
contents = handle.read()
except Exception:
log.info(f"No {app_path}/README.md discovered, moving on")
return {}
try:
for match in findall(r"\*\*.*", contents):
title = search(r"(?<=\*\*).*(?=\*\*)", match).group().lower()
if title == "image":
value = {
"image": search(r"(?<=`).*(?=`)", match).group(),
"url": search(r"(?<=\().*(?=\))", match).group(),
"rating": match.split(",")[1].strip(),
"source": match.split(",")[-1].replace("*", "").strip(),
}
elif title == "status":
value = {"❶💚": 1, "❷💛": 2, "❸🍎": 3, "❹💣": 4, "?": 5, "": 5}[
match.split(":")[-1].replace("*", "").strip()
]
else:
value = match.split(":")[-1].replace("*", "").strip()
metadata[title] = value
metadata["name"] = findall(r"^# (.*)", contents)[0]
except (IndexError, AttributeError):
log.info(f"Can't parse {app_path}/README.md")
return {}
finally:
_run_cmd("git checkout HEAD")
log.info(f"Parsed {metadata}")
return metadata
def get_app_versions(app_path, cached_apps_json):
versions = {}
chdir(app_path)
tags = _run_cmd("git tag --list").split()
if not tags:
log.info("No tags discovered, moving on")
return {}
initial_branch = _run_cmd("git rev-parse --abbrev-ref HEAD")
app_name = basename(app_path)
try:
existing_tags = cached_apps_json[app_name]["versions"].keys()
except KeyError:
existing_tags = []
for tag in tags:
_run_cmd(f"git checkout {tag}", stderr=DEVNULL)
services_cmd = f"{YQ_PATH} e '.services | keys | .[]' compose*.yml"
services = _run_cmd(services_cmd, shell=True).split()
parsed_services = []
service_versions = {}
for service in services:
if service in ("null", "---"):
continue
if (
tag in existing_tags
and service in cached_apps_json[app_name]["versions"][tag]
):
log.info(f"Skipping {tag} because we've already processed it")
existing_versions = cached_apps_json[app_name]["versions"][tag][service]
service_versions[service] = existing_versions
_run_cmd(f"git checkout {initial_branch}")
continue
if service in parsed_services:
log.info(f"Skipped {service}, we've already parsed it locally")
continue
services_cmd = f"{YQ_PATH} e '.services.{service}.image' compose*.yml"
images = _run_cmd(services_cmd, shell=True).split()
for image in images:
if image in ("null", "---"):
continue
images_cmd = f"skopeo inspect docker://{image} | jq '.Digest'"
output = _run_cmd(images_cmd, shell=True)
service_version_info = {
"image": image.split(":")[0],
"tag": image.split(":")[-1],
"digest": output.split(":")[-1][:8],
}
log.info(f"Parsed {service_version_info}")
service_versions[service] = service_version_info
parsed_services.append(service)
versions[tag] = service_versions
_run_cmd(f"git checkout {initial_branch}")
return versions
def main():
"""Run the script."""
repos_json = get_repos_json()
clone_all_apps(repos_json)
target = f"{SCRIPT_PATH}/../deploy/apps.coopcloud.tech/apps.json"
with open(target, "w", encoding="utf-8") as handle:
dump(
generate_apps_json(repos_json),
handle,
ensure_ascii=False,
indent=4,
sort_keys=True,
)
log.info(f"Successfully generated {target}")
main()