alakazam/alakazam.py

330 lines
11 KiB
Python
Executable File

#!/bin/python3
import os
import json
import logging
from pathlib import Path
import subprocess
import click
import dotenv
from jinja2 import Environment, FileSystemLoader
import yaml
def read_config(filepath):
filepath = Path(filepath).expanduser()
if not filepath.exists():
logging.warning(f"config file {filepath} does not exist")
return {}
with open(filepath) as file:
yaml_config = yaml.safe_load(file)
if not yaml_config:
logging.warning(f"config file {filepath} is empty")
return {}
globals = yaml_config.get('GLOBALS')
jinja = Environment(loader=FileSystemLoader(
['/', '.']), trim_blocks=True, lstrip_blocks=True)
template = jinja.get_template(filepath.as_posix(), globals=globals)
return yaml.safe_load(template.render())
INSTANCE = read_config("config.yml")
DEFAULTS = read_config("~/.abra/defaults.yml")
COMBINE = read_config(os.path.dirname(
os.path.realpath(__file__)) + "/combine.yml")
def abra(*args, machine_output=False, ignore_error=False):
command = ["abra", *args]
if machine_output:
command.append("-m")
logging.debug(f"run command: {' '.join(command)}")
process = subprocess.run(command, capture_output=True)
if process.stderr:
logging.debug(process.stderr.decode())
if process.returncode and not ignore_error:
raise RuntimeError(
f'{" ".join(command)} \n STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}')
if process.stderr:
logging.error(process.stderr.decode())
if machine_output:
return json.loads(process.stdout.decode())
return process.stdout.decode()
def write_env_header(path):
logging.debug(f'write header to {path}')
header = """################################################################################
# DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN #
################################################################################
"""
with open(path, "r+") as file:
old_content = file.read()
file.seek(0)
file.write(header + old_content)
def new_app(recipe):
domain = map_subdomain(recipe)
server = INSTANCE["server"] if INSTANCE.get(
"server") else INSTANCE["domain"]
print(f'create {recipe} config on {server} at {domain}')
path = get_env_path(recipe)
out = abra("app", "new", recipe, "-n", "-s", server, "-D", domain)
if not "app has been created" in out:
raise RuntimeError(f'App "{recipe}" creation failed')
else:
write_env_header(path)
logging.info(f'{recipe} created on {server} at {domain}')
def map_subdomain(recipe):
if ((INSTANCE.get('apps') and (app := INSTANCE['apps'].get(recipe)) and (subdomain := app.get('subdomain'))) or
((app := DEFAULTS.get(recipe)) and (subdomain := app.get('subdomain')))):
domain = subdomain.replace("example.com", INSTANCE["domain"])
else:
domain = f"{recipe}.{INSTANCE['domain']}"
return domain
def generate_secrets(recipe):
domain = map_subdomain(recipe)
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
if any("false" in line for line in stored_secrets):
abra("app", "secret", "generate", "-a", domain)
print(f"secrets for {domain} generated")
def get_env_path(recipe):
domain = map_subdomain(recipe)
server = INSTANCE["server"] if INSTANCE.get(
"server") else INSTANCE["domain"]
return Path(f"~/.abra/servers/{server}/{domain}.env").expanduser()
def connect_apps(target_app):
path = get_env_path(target_app)
target_conf = COMBINE.get(target_app)
for source_app in INSTANCE["apps"]:
if target_conf and (configs := target_conf.get(source_app)):
logging.info(f'connect {target_app} with {source_app}')
set_configs(target_app, configs)
if shared_secrets := configs.get("shared_secrets"):
logging.info(
f'share secrets between {target_app} and {source_app}')
share_secrets(target_app, source_app, shared_secrets)
insert_domains(path, source_app)
def set_configs(recipe, configs):
path = get_env_path(recipe)
if uncomment_keys := configs.get("uncomment"):
uncomment(uncomment_keys, path, True)
if envs := configs.get("env"):
uncomment(envs.keys(), path)
for key, value in envs.items():
logging.debug(f'set {key}={value} in {path}')
dotenv.set_key(path, key, value, quote_mode="never")
if secrets := configs.get("secrets"):
domain = map_subdomain(recipe)
for secret_name, secret in secrets.items():
insert_secret(domain, secret_name, secret)
def share_secrets(target, source, secrets):
target_domain = map_subdomain(target)
source_domain = map_subdomain(source)
stored_secrets = abra("app", "secret", "ls", target_domain).splitlines()
for source_secret in secrets:
target_secret = secrets[source_secret]
if not any(target_secret in line and "true" in line for line in stored_secrets):
secret = get_generated_secret(source_domain, source_secret)
target_secret = secrets[source_secret]
insert_secret(target_domain, target_secret, secret)
def insert_secret(domain, secret_name, secret):
# TODO parse json
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
if not any(secret_name in line and "true" in line for line in stored_secrets):
abra("app", "secret", "insert", domain, secret_name, "v1", secret)
def get_generated_secret(domain, secret_name):
# TODO: use "abra secret get <secret_name>"
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
if not any(secret_name in line and "true" in line for line in stored_secrets):
secret = abra("app", "secret", "generate", domain, secret_name, "v1")
secret = secret.splitlines()[3].split("|")[2].strip()
else:
secret = abra(
"app", "run", domain, "app", f"cat /var/run/secrets/{secret_name}"
)
return secret
def uncomment(keys, path, match_all=False):
logging.debug(f'Uncomment {keys} in {path}')
with open(path, "r") as file:
lines = file.readlines()
with open(path, "w") as file:
for line in lines:
line_match = line.split("=")[0] # Match only keys
if match_all:
line_match = line
if any(key in line_match for key in keys):
line = line.lstrip("#").lstrip()
file.write(line)
def insert_domains(path, app):
domain = map_subdomain(app)
logging.debug(f'replace all {app}.example.com with {domain} in {path}')
with open(path, "r") as file:
content = file.read()
content = content.replace(f"{app}.example.com", domain)
with open(path, "w") as file:
file.write(content)
def execute_cmds(app):
domain = map_subdomain(app)
all_cmds = []
if (app_conf := DEFAULTS.get(app)) and (cmds := app_conf.get("execute")):
all_cmds += cmds
if COMBINE.get(app):
for target_app, target_conf in COMBINE[app].items():
if target_app in INSTANCE["apps"] and (cmds := target_conf.get("execute")):
all_cmds += cmds
if (app_conf := INSTANCE["apps"][app]) and (cmds := app_conf.get("execute")):
all_cmds += cmds
for cmd in all_cmds:
container = cmd.split()[0]
cmd = cmd.split()[1]
if container == "local":
print(abra("app", "cmd", "--local", domain, cmd, ignore_error=True))
else:
print(abra("app", "cmd", domain, container, cmd, ignore_error=True))
@click.group()
@click.option('-l', '--log', 'loglevel')
@click.option('-c', '--config', 'config')
def cli(loglevel, config):
global INSTANCE
if loglevel:
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(level=numeric_level)
if config:
INSTANCE = read_config(config)
@cli.command()
def init_server():
""" Initialize the server """
new_app("traefik")
new_app("backup-bot-two")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def setup_apps(apps):
""" Configure and connect the apps """
if not apps:
apps = INSTANCE["apps"]
for app in apps:
new_app(app)
if configs := DEFAULTS.get(app):
logging.info(f'set defaults for {app}')
set_configs(app, configs)
if configs := INSTANCE["apps"].get(app):
logging.info(f'set extra configs for {app}')
set_configs(app, configs)
for app in apps:
print(f'create connections for {app}')
connect_apps(app)
for app in apps:
logging.info(f'generate secrets for {app}')
generate_secrets(app)
def get_deployed_apps():
server = INSTANCE["server"] if INSTANCE.get("server") else INSTANCE["domain"]
deployed = abra("app", "ls", "-S", "-s", server, "-m", machine_output=True)
deployed_apps = filter(
lambda x: x["status"] == "deployed", deployed[server]["apps"])
deployed_domains = [app["domain"] for app in deployed_apps]
return deployed_domains
@cli.command()
@click.option('-a', '--apps', multiple=True)
def deploy_apps(apps):
""" Deploy all the apps """
deployed_domains = get_deployed_apps()
for app in INSTANCE["apps"]:
domain = map_subdomain(app)
if (apps and app in apps) or domain not in deployed_domains:
print(f'deploy {domain}')
print(abra("app", "deploy", "-C", "-n", domain))
logging.info(f'execute commands for {domain}')
execute_cmds(app)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def undeploy_apps(apps):
""" Undeploy all the apps """
deployed_domains = get_deployed_apps()
if not apps:
apps = INSTANCE["apps"]
for app in apps:
domain = map_subdomain(app)
print(f'undeploy {domain}')
if domain in deployed_domains:
print(abra("app", "undeploy", "-n", domain))
@cli.command()
@click.option('-a', '--apps', multiple=True)
def cmds(apps):
""" execute all post deploy cmds """
deployed_domains = get_deployed_apps()
if not apps:
apps = INSTANCE["apps"]
for app in apps:
domain = map_subdomain(app)
if domain in deployed_domains:
logging.info(f'execute commands for {domain}')
execute_cmds(app)
@cli.command()
def purge_apps():
""" Completely remove all the apps """
for recipe in INSTANCE["apps"]:
domain = map_subdomain(recipe)
logging.info(f'purge {domain}')
abra("app", "rm", "-n", domain)
print(f"{domain} purged")
@cli.command()
def purge_secrets():
""" Remove all the apps secrets """
for recipe in INSTANCE["apps"]:
domain = map_subdomain(recipe)
logging.info(f'purge secrets from {domain}')
stored_secrets = abra("app", "secret", "ls", domain)
if "true" in stored_secrets:
abra("app", "secret", "rm", "-a", domain)
print(f"secrets removed for {domain}")
if __name__ == '__main__':
cli()