abra/bin/app-json.py

214 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
# Usage: ./app-json.py
#
# Gather metadata from Co-op Cloud apps in $ABRA_DIR/apps (default
# ~/.abra/apps), and format it as JSON so that it can be hosted here:
# https://apps.coopcloud.tech
from json import dump
from os import chdir, getcwd, listdir
from os.path import basename
from re import findall, search
from subprocess import DEVNULL
from requests import get
from abralib import (
CLONES_PATH,
REPOS_TO_SKIP,
YQ_PATH,
_run_cmd,
clone_all_apps,
get_repos_json,
log,
)
def get_published_apps_json():
"""Retrieve already published apps json."""
url = "https://apps.coopcloud.tech"
log.info(f"Retrieving {url}")
try:
return get(url, timeout=5).json()
except Exception as exception:
log.error(f"Failed to retrieve {url}, saw {str(exception)}")
return {}
def generate_apps_json(repos_json):
"""Generate the abra-apps.json application versions file."""
apps_json = {}
cached_apps_json = get_published_apps_json()
for app in listdir(CLONES_PATH):
if app in REPOS_TO_SKIP:
log.info(f"Skipping {app}")
continue
repo_details = next(filter(lambda x: x["name"] == app, repos_json), {})
app_path = f"{CLONES_PATH}/{app}"
chdir(app_path)
metadata = get_app_metadata(app_path)
name = metadata.pop("name", app)
log.info(f"Processing {app}")
apps_json[app] = {
"name": name,
"category": metadata.get("category", ""),
"repository": repo_details.get("clone_url", ""),
"default_branch": repo_details.get("default_branch", ""),
"description": repo_details.get("description", ""),
"website": repo_details.get("website", ""),
"features": metadata,
"versions": get_app_versions(app_path, cached_apps_json),
"icon": repo_details.get("avatar_url", ""),
}
return apps_json
def get_app_metadata(app_path):
"""Parse metadata from app repo README files."""
metadata = {}
chdir(app_path)
try:
with open(f"{app_path}/README.md", "r") as handle:
log.info(f"{app_path}/README.md")
contents = handle.read()
except Exception:
log.info(f"No {app_path}/README.md discovered, moving on")
return {}
try:
for match in findall(r"\*\*.*", contents):
title = search(r"(?<=\*\*).*(?=\*\*)", match).group().lower()
if title == "image":
value = {
"image": search(r"(?<=`).*(?=`)", match).group(),
"url": search(r"(?<=\().*(?=\))", match).group(),
"rating": match.split(",")[1].strip(),
"source": match.split(",")[-1].replace("*", "").strip(),
}
elif title == "status":
value = {"❶💚": 1, "❷💛": 2, "❸🍎": 3, "❹💣": 4, "?": 5, "": 5}[
match.split(":")[-1].replace("*", "").strip()
]
else:
value = match.split(":")[-1].replace("*", "").strip()
metadata[title] = value
metadata["name"] = findall(r"^# (.*)", contents)[0]
except (IndexError, AttributeError):
log.info(f"Can't parse {app_path}/README.md")
return {}
finally:
_run_cmd("git checkout HEAD")
log.info(f"Parsed {metadata}")
return metadata
def get_app_versions(app_path, cached_apps_json):
versions = {}
chdir(app_path)
tags = _run_cmd("git tag --list").split()
if not tags:
log.info("No tags discovered, moving on")
return {}
initial_branch = _run_cmd("git rev-parse --abbrev-ref HEAD")
app_name = basename(app_path)
try:
existing_tags = cached_apps_json[app_name]["versions"].keys()
except KeyError:
existing_tags = []
for tag in tags:
_run_cmd(f"git checkout {tag}", stderr=DEVNULL)
services_cmd = f"{YQ_PATH} e '.services | keys | .[]' compose*.yml"
services = _run_cmd(services_cmd, shell=True).split()
parsed_services = []
service_versions = {}
for service in services:
if service in ("null", "---"):
continue
if (
tag in existing_tags
and service in cached_apps_json[app_name]["versions"][tag]
):
log.info(f"Skipping {tag} because we've already processed it")
existing_versions = cached_apps_json[app_name]["versions"][tag][service]
service_versions[service] = existing_versions
_run_cmd(f"git checkout {initial_branch}")
continue
if service in parsed_services:
log.info(f"Skipped {service}, we've already parsed it locally")
continue
services_cmd = f"{YQ_PATH} e '.services.{service}.image' compose*.yml"
images = _run_cmd(services_cmd, shell=True).split()
for image in images:
if image in ("null", "---"):
continue
images_cmd = f"skopeo inspect docker://{image} | jq '.Digest'"
output = _run_cmd(images_cmd, shell=True)
service_version_info = {
"image": image.split(":")[0],
"tag": image.split(":")[-1],
"digest": output.split(":")[-1][:8],
}
log.info(f"Parsed {service_version_info}")
service_versions[service] = service_version_info
parsed_services.append(service)
versions[tag] = service_versions
_run_cmd(f"git checkout {initial_branch}")
return versions
def main():
"""Run the script."""
repos_json = get_repos_json()
clone_all_apps(repos_json)
target = f"{getcwd()}/apps.json"
with open(target, "w", encoding="utf-8") as handle:
dump(
generate_apps_json(repos_json),
handle,
ensure_ascii=False,
indent=4,
sort_keys=True,
)
log.info(f"Successfully generated {target}")
main()