0
0
Fork 0
alakazam/alakazam.py

559 lines
20 KiB
Python
Executable File

#!/bin/python3
import os
import json
import logging
from pathlib import Path
import subprocess
from tabulate import tabulate
import click
import dotenv
from jinja2 import Environment, FileSystemLoader
import yaml
import re
COMBINE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/combine.yml"
CONFIG_FILE_NAME = 'alaka.yaml'
CONFIGS = {}
def read_config(filepath):
filepath = Path(filepath).expanduser()
if not filepath.exists():
logging.warning(f"config file {filepath} does not exist")
return {}
with open(filepath) as file:
yaml_config = yaml.safe_load(file)
if not yaml_config:
logging.warning(f"config file {filepath} is empty")
return {}
globals = yaml_config.get('GLOBALS')
jinja = Environment(loader=FileSystemLoader(
['/', '.']), trim_blocks=True, lstrip_blocks=True)
template = jinja.get_template(filepath.as_posix(), globals=globals)
return yaml.safe_load(template.render())
""" Get value from nested dicts, return None if one of the keys does not exists """
def get_value(dict, *keys):
_element = dict
for key in keys:
try:
_element = _element[key]
except KeyError:
return
return _element
def merge_dict(dict1, dict2):
""" Merge two nested dicts recursively, the second overwrites the first one"""
merged_dict = dict1.copy()
for key, value in dict2.items():
if key in merged_dict and isinstance(value, dict) and isinstance(merged_dict[key], dict):
merged_dict[key] = merge_dict(merged_dict[key], value)
elif key in merged_dict and isinstance(value, list) and isinstance(merged_dict[key], list):
merged_dict[key] = list(set(merged_dict[key] + value))
else:
merged_dict[key] = value
return merged_dict
def merge_pool_configs(dir_path):
dir_path = Path(dir_path).absolute()
merged_configs = {}
for root, _, files in os.walk(dir_path):
no_config = True
for file in files:
if file == 'alaka.yaml':
file_path = os.path.join(root, file)
config = read_config(file_path)
# Merge the config with the merged config from the parent dir
if par_config := merged_configs.get(os.path.dirname(root)):
merged_configs[root] = merge_dict(par_config, config)
else:
merged_configs[root] = config
no_config = False
if no_config:
merged_configs[root] = merged_configs.get(os.path.dirname(root))
return merged_configs
def merge_instance_configs(pool_config, instance_domain, instance_config):
merged_config = {}
for app, app_config in instance_config.items():
if not (server:= get_value(pool_config, 'GLOBALS', 'server')):
server = instance_domain
if app_config and pool_config.get(app):
merged_config[app] = merge_dict(pool_config[app], app_config)
elif app_config:
merged_config[app] = app_config
elif pool_config.get(app):
merged_config[app] = pool_config[app]
else:
merged_config[app] = {}
merged_config[app]['app_domain'] = map_subdomain(app, instance_domain, merged_config[app])
if not merged_config[app].get('server'):
merged_config[app]['server'] = server
return merged_config
def map_subdomain(recipe, instance_domain, app_config):
if subdomain:= app_config.get('subdomain'):
domain = subdomain.replace("example.com", instance_domain)
else:
domain = f"{recipe}.{instance_domain}"
return domain
def get_merged_instance_configs(pool_path, pool_configs):
pool_path = Path(pool_path).absolute()
if pool_path.is_file():
parent_path = os.path.dirname(pool_path)
instance_config = read_config(pool_path)
domain = pool_path.name.removesuffix('.yml').removesuffix('.yaml')
merged_config = merge_instance_configs(pool_configs[parent_path], domain, instance_config)
return {pool_path.name: merged_config}
instances = {}
for root, _, files in os.walk(Path(pool_path)):
for file in files:
# This pattern matches for files of the format "<domain>.yml" or "<domain>.yaml"
pattern = r"^(?:[A-Za-z0-9](?:[A-Za-z0-9\-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,6}(?:\.yaml|\.yml)$"
if re.match(pattern, file):
instance_config = read_config(f'{root}/{file}')
domain = file.removesuffix('.yml').removesuffix('.yaml')
merged_config = merge_instance_configs(pool_configs[root], domain, instance_config)
instances[domain] = merged_config
return instances
def merge_connection_configs(configs):
connection_config = read_config(COMBINE_PATH)
extend_shared_secrets(connection_config)
merged_configs = configs.copy()
for _, instance_config in merged_configs.items():
for target_app, source_apps in connection_config.items():
for source_app, target_conf in source_apps.items():
if target_app in instance_config and source_app in instance_config:
instance_config[target_app] = merge_dict(target_conf, instance_config[target_app])
return merged_configs
"""Add a layer containing the source app"""
def extend_shared_secrets(connection_config):
for _, source_apps in connection_config.items():
for source_app, target_conf in source_apps.items():
if shared_secrets:= target_conf.get('shared_secrets'):
target_conf['shared_secrets'] = {source_app: shared_secrets}
def abra(*args, machine_output=False, ignore_error=False):
command = ["abra", *args]
if machine_output:
command.append("-m")
logging.debug(f"run command: {' '.join(command)}")
process = subprocess.run(command, capture_output=True)
if process.stderr:
logging.debug(process.stderr.decode())
if process.stdout:
logging.debug(process.stdout.decode())
if process.returncode and not ignore_error:
breakpoint()
raise RuntimeError(
f'{" ".join(command)} \n STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}')
if machine_output:
return json.loads(process.stdout.decode())
return process.stdout.decode()
def write_env_header(path):
logging.debug(f'write header to {path}')
header = """################################################################################
# DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN #
################################################################################
"""
with open(path, "r+") as file:
old_content = file.read()
file.seek(0)
file.write(header + old_content)
def new_app(recipe, domain, server):
path = get_env_path(server, domain)
if path.exists():
print(f'remove {path}')
path.unlink()
logging.info(f'create {recipe} config on {server} at {domain}')
out = abra("app", "new", recipe, "-n", "-s", server, "-D", domain)
if not "app has been created" in out:
raise RuntimeError(f'App "{recipe}" creation failed')
else:
write_env_header(path)
logging.info(f'{recipe} created on {server} at {domain}')
def update_configs(path, config):
if uncomment_keys := config.get("uncomment"):
uncomment(uncomment_keys, path, True)
if envs := config.get("env"):
uncomment(envs.keys(), path)
for key, value in envs.items():
logging.debug(f'set {key}={value} in {path}')
dotenv.set_key(path, key, value, quote_mode="never")
def generate_all_secrets(domain):
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
if any("false" in line for line in stored_secrets):
logging.info(f"Generate all secrets for {domain}")
abra("app", "secret", "generate", "-a", domain)
print(f"secrets for {domain} generated")
def get_env_path(server, domain):
return Path(f"~/.abra/servers/{server}/{domain}.env").expanduser()
# def connect_apps(target_app):
# path = get_env_path(target_app)
# target_conf = COMBINE.get(target_app)
# for source_app in INSTANCE["apps"]:
# if target_conf and (configs := target_conf.get(source_app)):
# logging.info(f'connect {target_app} with {source_app}')
# update_configs(target_app, configs)
# if shared_secrets := configs.get("shared_secrets"):
# logging.info(
# f'share secrets between {target_app} and {source_app}')
# share_secrets(target_app, source_app, shared_secrets)
# insert_domains(path, source_app)
def exchange_secrets(app1, instance_config, apps):
#TODO: check this function
app1_config = instance_config[app1]
app1_domain = app1_config['app_domain']
for app2 in apps:
app2_config = instance_config[app2]
app2_domain = app2_config['app_domain']
if app1_shared_secrets := get_value(app1_config, "shared_secrets", app2):
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
share_secrets(app1_domain, app2_domain, app1_shared_secrets)
if app2_shared_secrets := get_value(app2_config, "shared_secrets", app1):
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
share_secrets(app2_domain, app1_domain, app2_shared_secrets)
def str2bool(value):
return value.lower() in ("yes", "true", "t", "1")
def share_secrets(app1_domain, app2_domain, secrets):
app1_stored_secrets = abra("app", "secret", "ls", app1_domain, machine_output=True)
app1_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app1_stored_secrets}
app2_stored_secrets = abra("app", "secret", "ls", app2_domain, machine_output=True)
app2_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app2_stored_secrets}
for app2_secret in secrets:
app1_secret = secrets[app2_secret]
# TODO: test if both apps have the secret available
try:
app1_secret_is_stored = app1_stored_secrets[app1_secret]
except KeyError:
logging.error(f"{app1_domain} does not contain secret {app1_secret}")
continue
try:
app2_secret_is_stored = app2_stored_secrets[app2_secret]
except KeyError:
logging.error(f"{app2_domain} does not contain secret {app2_secret}")
continue
if app1_secret_is_stored and not app2_secret_is_stored:
secret = get_secret(app1_domain, app1_secret)
insert_secret(app2_domain, app2_secret, secret)
elif app2_secret_is_stored and not app1_secret_is_stored:
secret = get_secret(app2_domain, app2_secret)
insert_secret(app1_domain, app1_secret, secret)
elif not any([app1_secret_is_stored, app2_secret_is_stored]):
secret = generate_secret(app1_domain, app1_secret)
insert_secret(app2_domain, app2_secret, secret)
def get_secret(domain, secret_name):
# TODO: use "abra secret get <secret_name>"
secret = abra("app", "run", domain, "worker", "cat", f"/var/run/secrets/{secret_name}")
return secret
def generate_secret(domain, secret_name):
secret = abra("app", "secret", "generate", domain, secret_name, "v1", machine_output=True)
return secret[0]['value']
def insert_secrets_from_conf(domain, config):
logging.info(f"Insert secrets for {domain}")
if secrets := config.get("secrets"):
for secret_name, secret in secrets.items():
insert_secret(domain, secret_name, secret)
def insert_secret(domain, secret_name, secret):
# TODO parse json
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
if not any(secret_name in line and "true" in line for line in stored_secrets):
abra("app", "secret", "insert", domain, secret_name, "v1", secret)
def uncomment(keys, path, match_all=False):
logging.debug(f'Uncomment {keys} in {path}')
with open(path, "r") as file:
lines = file.readlines()
with open(path, "w") as file:
for line in lines:
line_match = line.split("=")[0] # Match only keys
if match_all:
line_match = line
if any(key in line_match for key in keys):
line = line.lstrip("#").lstrip()
file.write(line)
def exchange_domains(instance_config, apps, path):
for app in apps:
domain = instance_config[app]['app_domain']
insert_domains(path, app, domain)
def insert_domains(path, app, domain):
logging.debug(f'replace all {app}.example.com with {domain} in {path}')
with open(path, "r") as file:
content = file.read()
content = content.replace(f"{app}.example.com", domain)
with open(path, "w") as file:
file.write(content)
def execute_cmds(app_config):
domain = app_config['app_domain']
if not (all_cmds:= app_config.get('execute')):
logging.info(f"No post deploy cmds for {domain}")
return
for cmd in all_cmds:
container = cmd.split()[0]
cmd = cmd.split()[1]
print(f"Run '{cmd}' in {domain}:{container}")
if container == "local":
print(abra("app", "cmd", "--local", domain, cmd, ignore_error=True))
else:
print(abra("app", "cmd", domain, container, cmd, ignore_error=True))
@click.group()
@click.option('-l', '--log', 'loglevel')
@click.option('-p', '--pool_path', 'pool_path')
@click.option('-c', '--config_path', 'config_path', default=".")
def cli(loglevel, pool_path, config_path):
global CONFIGS
#combine_config = read_config(COMBINE_PATH)
pool_configs = merge_pool_configs(config_path)
instance_configs = get_merged_instance_configs(pool_path, pool_configs)
CONFIGS = merge_connection_configs(instance_configs)
if loglevel:
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(level=numeric_level)
#@cli.command()
#def init_server():
# """ Initialize the server """
# new_app("traefik")
# new_app("backup-bot-two")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def setup(apps):
pass
@cli.command()
@click.option('-a', '--apps', multiple=True)
def config(apps):
""" Configure the apps """
for instance, instance_config in CONFIGS.items():
if not apps:
apps = instance_config.keys()
for app in apps:
if app not in instance_config:
logging.error(f"Could not find any '{app}' configuration for instance {instance}")
exit(1)
app_config = instance_config[app]
domain = app_config['app_domain']
server = app_config["server"]
path = get_env_path(server, domain)
print(f'Setup {app} config on {server} at {domain}')
new_app(app, domain, server)
logging.info(f'set configs for {app} at {instance}')
update_configs(path, app_config)
exchange_domains(instance_config, instance_config.keys(), path)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def secrets(apps):
""" Configure the apps """
for instance, instance_config in CONFIGS.items():
if not apps:
apps = instance_config.keys()
for app in apps:
if app not in instance_config:
logging.error(f"Could not find any '{app}' configuration for instance {instance}")
exit(1)
app_config = instance_config[app]
domain = app_config['app_domain']
print(f"Create secrets for {domain}")
insert_secrets_from_conf(domain, app_config)
exchange_secrets(app, instance_config, apps)
generate_all_secrets(domain)
def get_deployed_apps(apps):
deployed_apps = []
processed_server = []
for _, instance_config in CONFIGS.items():
if not apps:
apps = instance_config.keys()
for app in apps:
server = instance_config[app]['server']
if server in processed_server:
continue
processed_server.append(server)
deployed = abra("app", "ls", "-S", "-s", server, "-m", machine_output=True)
deployed_domains = [app["domain"] for app in deployed[server]["apps"] if app["status"] == "deployed"]
deployed_apps.extend(deployed_domains)
return deployed_apps
@cli.command()
@click.option('-a', '--apps', multiple=True)
@click.option('-r', '--run-cmds', is_flag=True)
def deploy(apps, run_cmds):
""" Deploy all the apps """
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if not apps:
apps = instance_config.keys()
for app in apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain in deployed_domains:
print(f"{domain} is already deployed")
continue
version = app_config.get('version')
if not version:
version = 'latest'
cmd = ["deploy", "-n"]
if version == 'chaos':
cmd.append("-chaos")
if not run_cmds:
cmd.append("--no-converge-checks")
cmd.append(domain)
if version not in ['latest', 'chaos']:
cmd.append(version)
print(f'deploy {domain} with version "{version}"')
print(abra("app", *cmd))
if run_cmds:
logging.info(f'execute commands for {domain}')
execute_cmds(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def undeploy(apps):
""" Undeploy all the apps """
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if not apps:
apps = instance_config.keys()
for app in apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain not in deployed_domains:
print(f"{domain} is not deployed")
continue
print(f'undeploy {domain}')
print(abra("app", "undeploy", "-n", domain))
@cli.command()
@click.option('-a', '--apps', multiple=True)
def cmds(apps):
""" execute all post deploy cmds """
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if not apps:
apps = instance_config.keys()
for app in apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain not in deployed_domains:
print(f"{domain} is not deployed")
continue
logging.info(f'execute commands for {domain}')
execute_cmds(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def purge(apps):
""" Completely remove all the apps """
# TODO: check for deployed apps
pool_apps = print_all_apps(apps)
domains = list(zip(*sum(pool_apps.values(), [])))[1]
if input(f"Do you really want to purge these apps? Type YES: ") == "YES":
for domain in domains:
logging.info(f'purge {domain}')
abra("app", "rm", "-n", domain)
print(f"{domain} purged")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def ls(apps):
""" List all the apps """
print_all_apps(apps)
def print_all_apps(apps):
pool_apps = list_apps(apps)
for instance, instance_apps in pool_apps.items():
print(instance)
print(tabulate(instance_apps))
print()
return pool_apps
def list_apps(apps):
pool_apps = {}
for instance, instance_config in CONFIGS.items():
pool_apps[instance] = []
if not apps:
apps = instance_config.keys()
for app in apps:
domain = instance_config[app]['app_domain']
pool_apps[instance].append((app, domain))
return pool_apps
@cli.command()
@click.option('-a', '--apps', multiple=True)
def purge_secrets(apps):
""" Remove all the apps secrets """
# TODO: check for deployed apps
pool_apps = print_all_apps(apps)
domains = list(zip(*sum(pool_apps.values(), [])))[1]
if input(f"Do you really want to purge the secrets for these apps? Type YES: ") == "YES":
for domain in domains:
logging.info(f'purge {domain}')
abra("app","secret" ,"rm", "-a", domain)
print(f"Secrets for {domain} purged")
if __name__ == '__main__':
cli()