forked from moritz/alakazam
readme
This commit is contained in:
302
alakazam.py
Executable file
302
alakazam.py
Executable file
@ -0,0 +1,302 @@
|
||||
#!/bin/python3
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
|
||||
import click
|
||||
import dotenv
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
import yaml
|
||||
|
||||
def read_config(filename):
|
||||
with open(filename) as file:
|
||||
DEFAULTS = yaml.safe_load(file)
|
||||
jinja = Environment(loader = FileSystemLoader('.'), trim_blocks=True, lstrip_blocks=True)
|
||||
template = jinja.get_template(filename, globals=DEFAULTS.get('GLOBALS'))
|
||||
return yaml.safe_load(template.render())
|
||||
|
||||
CONFIG = read_config("config.yml")
|
||||
DOMAINS = read_config("domains.yml")
|
||||
DEFAULTS = read_config("defaults.yml")
|
||||
COMBINE = read_config("combine.yml")
|
||||
|
||||
|
||||
def abra(*args, machine_output=False, ignore_error=False):
|
||||
command = ["abra", *args]
|
||||
if machine_output:
|
||||
command.append("-m")
|
||||
logging.debug(f"run command: {' '.join(command)}")
|
||||
process = subprocess.run(command, capture_output=True)
|
||||
if process.stderr:
|
||||
logging.debug(process.stderr.decode())
|
||||
if process.returncode and not ignore_error:
|
||||
raise RuntimeError(f'STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}')
|
||||
if process.stderr:
|
||||
logging.error(process.stderr.decode())
|
||||
if machine_output:
|
||||
return json.loads(process.stdout.decode())
|
||||
return process.stdout.decode()
|
||||
|
||||
|
||||
def write_env_header(path):
|
||||
logging.debug(f'write header to {path}')
|
||||
header = """################################################################################
|
||||
# DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN #
|
||||
################################################################################
|
||||
|
||||
"""
|
||||
with open(path, "r+") as file:
|
||||
old_content = file.read()
|
||||
file.seek(0)
|
||||
file.write(header + old_content)
|
||||
|
||||
|
||||
def new_app(recipe):
|
||||
domain = map_domain(recipe)
|
||||
path = get_env_path(recipe)
|
||||
out = abra("app", "new", recipe, "-n", "-s",
|
||||
CONFIG["server"], "-D", domain)
|
||||
if not "app has been created" in out:
|
||||
raise RuntimeError(f'App "{recipe}" creation failed')
|
||||
else:
|
||||
write_env_header(path)
|
||||
print(f'{recipe} created on {CONFIG["server"]} at {domain}')
|
||||
|
||||
|
||||
def map_domain(recipe):
|
||||
if subdomain := DOMAINS.get(recipe):
|
||||
domain = subdomain.replace("example.com", CONFIG["domain"])
|
||||
else:
|
||||
domain = f"{recipe}.{CONFIG['domain']}"
|
||||
return domain
|
||||
|
||||
|
||||
def generate_secrets(app):
|
||||
domain = map_domain(app)
|
||||
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
|
||||
if any("false" in line for line in stored_secrets):
|
||||
abra("app", "secret", "generate", "-a", domain)
|
||||
print(f"secrets for {domain} generated")
|
||||
|
||||
|
||||
def get_env_path(recipe):
|
||||
domain = map_domain(recipe)
|
||||
return Path(f"~/.abra/servers/{CONFIG['server']}/{domain}.env").expanduser()
|
||||
|
||||
|
||||
def connect_apps(target_app):
|
||||
path = get_env_path(target_app)
|
||||
target_conf = COMBINE.get(target_app)
|
||||
for source_app in CONFIG["apps"]:
|
||||
if target_conf and (configs := target_conf.get(source_app)):
|
||||
logging.info(f'connect {target_app} with {source_app}')
|
||||
set_configs(target_app, configs)
|
||||
if shared_secrets := configs.get("shared_secrets"):
|
||||
logging.info(
|
||||
f'share secrets between {target_app} and {source_app}')
|
||||
share_secrets(target_app, source_app, shared_secrets)
|
||||
insert_domains(path, source_app)
|
||||
|
||||
|
||||
def set_configs(recipe, configs):
|
||||
path = get_env_path(recipe)
|
||||
if uncomment_keys := configs.get("uncomment"):
|
||||
uncomment(uncomment_keys, path, True)
|
||||
if envs := configs.get("env"):
|
||||
uncomment(envs.keys(), path)
|
||||
for key, value in envs.items():
|
||||
logging.debug(f'set {key}={value} in {path}')
|
||||
dotenv.set_key(path, key, value, quote_mode="never")
|
||||
if secrets := configs.get("secrets"):
|
||||
domain = map_domain(recipe)
|
||||
for secret_name, secret in secrets.items():
|
||||
insert_secret(domain, secret_name, secret)
|
||||
|
||||
|
||||
def share_secrets(target, source, secrets):
|
||||
target_domain = map_domain(target)
|
||||
source_domain = map_domain(source)
|
||||
stored_secrets = abra("app", "secret", "ls", target_domain).splitlines()
|
||||
for source_secret in secrets:
|
||||
target_secret = secrets[source_secret]
|
||||
if not any(target_secret in line and "true" in line for line in stored_secrets):
|
||||
secret = get_generated_secret(source_domain, source_secret)
|
||||
target_secret = secrets[source_secret]
|
||||
insert_secret(target_domain, target_secret, secret)
|
||||
|
||||
|
||||
def insert_secret(domain, secret_name, secret):
|
||||
# TODO parse json
|
||||
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
|
||||
if not any(secret_name in line and "true" in line for line in stored_secrets):
|
||||
abra("app", "secret", "insert", domain, secret_name, "v1", secret)
|
||||
|
||||
|
||||
def get_generated_secret(domain, secret_name):
|
||||
# TODO: use "abra secret get <secret_name>"
|
||||
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
|
||||
if not any(secret_name in line and "true" in line for line in stored_secrets):
|
||||
secret = abra("app", "secret", "generate", domain, secret_name, "v1")
|
||||
secret = secret.splitlines()[3].split("|")[2].strip()
|
||||
else:
|
||||
secret = abra(
|
||||
"app", "run", domain, "app", f"cat /var/run/secrets/{secret_name}"
|
||||
)
|
||||
return secret
|
||||
|
||||
|
||||
def uncomment(keys, path, match_all=False):
|
||||
logging.debug(f'Uncomment {keys} in {path}')
|
||||
with open(path, "r") as file:
|
||||
lines = file.readlines()
|
||||
with open(path, "w") as file:
|
||||
for line in lines:
|
||||
line_match = line.split("=")[0] # Match only keys
|
||||
if match_all:
|
||||
line_match = line
|
||||
if any(key in line_match for key in keys):
|
||||
line = line.lstrip("#").lstrip()
|
||||
file.write(line)
|
||||
|
||||
|
||||
def insert_domains(path, app):
|
||||
domain = map_domain(app)
|
||||
logging.debug(f'replace all {app}.example.com with {domain} in {path}')
|
||||
with open(path, "r") as file:
|
||||
content = file.read()
|
||||
content = content.replace(f"{app}.example.com", domain)
|
||||
with open(path, "w") as file:
|
||||
file.write(content)
|
||||
|
||||
|
||||
def execute_cmds(app):
|
||||
domain = map_domain(app)
|
||||
all_cmds = []
|
||||
if (app_conf := DEFAULTS.get(app)) and (cmds := app_conf.get("execute")):
|
||||
all_cmds += cmds
|
||||
if COMBINE.get(app):
|
||||
for target_app, target_conf in COMBINE[app].items():
|
||||
if target_app in CONFIG["apps"] and (cmds := target_conf.get("execute")):
|
||||
all_cmds += cmds
|
||||
if (app_conf := CONFIG["apps"][app]) and (cmds := app_conf.get("execute")):
|
||||
all_cmds += cmds
|
||||
for cmd in all_cmds:
|
||||
container = cmd.split()[0]
|
||||
cmd = cmd.split()[1]
|
||||
if container == "local":
|
||||
print(abra("app", "cmd", "--local", domain, cmd, ignore_error=True))
|
||||
else:
|
||||
print(abra("app", "cmd", domain, container, cmd, ignore_error=True))
|
||||
|
||||
@click.group()
|
||||
@click.option('-l','--log','loglevel')
|
||||
def cli(loglevel):
|
||||
if loglevel:
|
||||
numeric_level = getattr(logging, loglevel.upper(), None)
|
||||
if not isinstance(numeric_level, int):
|
||||
raise ValueError('Invalid log level: %s' % loglevel)
|
||||
logging.basicConfig(level=numeric_level)
|
||||
|
||||
|
||||
@cli.command()
|
||||
def init_server():
|
||||
""" Initialize the server """
|
||||
new_app("traefik")
|
||||
new_app("backup-bot-two")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option('-a', '--apps', multiple=True)
|
||||
def setup_apps(apps):
|
||||
""" Configure and connect the apps """
|
||||
if not apps:
|
||||
apps = CONFIG["apps"]
|
||||
for app in apps:
|
||||
new_app(app)
|
||||
if configs := DEFAULTS.get(app):
|
||||
logging.info(f'set defaults for {app}')
|
||||
set_configs(app, configs)
|
||||
if configs := CONFIG["apps"].get(app):
|
||||
logging.info(f'set extra configs for {app}')
|
||||
set_configs(app, configs)
|
||||
for app in apps:
|
||||
connect_apps(app)
|
||||
for app in apps:
|
||||
logging.info(f'generate secrets for {app}')
|
||||
generate_secrets(app)
|
||||
|
||||
def get_deployed_apps():
|
||||
deployed = abra(
|
||||
"app", "ls", "-S", "-s", CONFIG["server"], "-m", machine_output=True
|
||||
)
|
||||
deployed_apps = filter(
|
||||
lambda x: x["status"] == "deployed", deployed[CONFIG["server"]]["apps"]
|
||||
)
|
||||
deployed_domains = [app["domain"] for app in deployed_apps]
|
||||
return deployed_domains
|
||||
|
||||
@cli.command()
|
||||
@click.option('-a', '--apps', multiple=True)
|
||||
def deploy_apps(apps):
|
||||
""" Deploy all the apps """
|
||||
deployed_domains = get_deployed_apps()
|
||||
for app in CONFIG["apps"]:
|
||||
domain = map_domain(app)
|
||||
if (apps and app in apps) or domain not in deployed_domains:
|
||||
logging.info(f'deploy {domain}')
|
||||
print(abra("app", "deploy", "-C", "-n", domain))
|
||||
logging.info(f'execute commands for {domain}')
|
||||
execute_cmds(app)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option('-a', '--apps', multiple=True)
|
||||
def undeploy_apps(apps):
|
||||
""" Undeploy all the apps """
|
||||
deployed_domains = get_deployed_apps()
|
||||
if not apps:
|
||||
apps = CONFIG["apps"]
|
||||
for app in apps:
|
||||
domain = map_domain(app)
|
||||
logging.info(f'undeploy {domain}')
|
||||
if domain in deployed_domains:
|
||||
print(abra("app", "undeploy", "-n", domain))
|
||||
|
||||
@cli.command()
|
||||
@click.option('-a', '--apps', multiple=True)
|
||||
def cmds(apps):
|
||||
""" execute all post deploy cmds """
|
||||
deployed_domains = get_deployed_apps()
|
||||
if not apps:
|
||||
apps = CONFIG["apps"]
|
||||
for app in apps:
|
||||
domain = map_domain(app)
|
||||
if domain in deployed_domains:
|
||||
logging.info(f'execute commands for {domain}')
|
||||
execute_cmds(app)
|
||||
|
||||
@cli.command()
|
||||
def purge_apps():
|
||||
""" Completely remove all the apps """
|
||||
for recipe in CONFIG["apps"]:
|
||||
domain = map_domain(recipe)
|
||||
logging.info(f'purge {domain}')
|
||||
abra("app", "rm", "-n", domain)
|
||||
print(f"{domain} purged")
|
||||
|
||||
@cli.command()
|
||||
def purge_secrets():
|
||||
""" Remove all the apps secrets """
|
||||
for recipe in CONFIG["apps"]:
|
||||
domain = map_domain(recipe)
|
||||
logging.info(f'purge secrets from {domain}')
|
||||
stored_secrets = abra("app", "secret", "ls", domain)
|
||||
if "true" in stored_secrets:
|
||||
abra("app", "secret", "rm", "-a", domain)
|
||||
print(f"secrets removed for {domain}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cli()
|
||||
Reference in New Issue
Block a user