0
0
Fork 0
alakazam/alakazam.py

719 lines
27 KiB
Python
Executable File

#!/bin/python3
import os
import json
import logging
from pathlib import Path
import subprocess
import re
from tabulate import tabulate
import click
import dotenv
from icecream import ic
from jinja2 import Environment, FileSystemLoader
from ruamel.yaml import YAML
from ruamel.yaml.constructor import SafeConstructor
from ruamel.yaml.nodes import ScalarNode
COMBINE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/combine.yml"
CONFIG_FILE_NAME = 'alaka.yaml'
CONFIGS = {}
"""Preserve quotes while parsing yaml files"""
class MySafeConstructor(SafeConstructor):
def construct_yaml_str(self, node):
value = self.construct_scalar(node)
if isinstance(node, ScalarNode) and (node.style == '"' or node.style == "'"):
# If node was quoted, keep quotes in the returned value
return '{}{}{}'.format(node.style, value, node.style)
# Otherwise, return value as is
return value
MySafeConstructor.add_constructor(
'tag:yaml.org,2002:str',
MySafeConstructor.construct_yaml_str)
def read_config(filepath):
filepath = Path(filepath).expanduser()
if not filepath.exists():
logging.warning(f"config file {filepath} does not exist")
return {}
yaml = YAML(typ='safe', pure=True) # Set type to 'safe' and use pure Python mode
yaml.Constructor = MySafeConstructor
with open(filepath) as file:
yaml_config = yaml.load(file)
if not yaml_config:
logging.warning(f"config file {filepath} is empty")
return {}
globals = yaml_config.get('GLOBALS')
jinja = Environment(loader=FileSystemLoader(
['/', '.']), trim_blocks=True, lstrip_blocks=True)
template = jinja.get_template(filepath.as_posix(), globals=globals)
return yaml.load(template.render())
""" Get value from nested dicts, return None if one of the keys does not exists """
def get_value(dict, *keys):
_element = dict
for key in keys:
try:
_element = _element[key]
except KeyError:
return
return _element
def merge_dict(dict1, dict2, reverse_list_order=False):
""" Merge two nested dicts recursively, the second overwrites the first one"""
merged_dict = dict1.copy()
for key, value in dict2.items():
if key in merged_dict and isinstance(value, dict) and isinstance(merged_dict[key], dict):
merged_dict[key] = merge_dict(merged_dict[key], value, reverse_list_order)
elif key in merged_dict and isinstance(value, list) and isinstance(merged_dict[key], list):
if reverse_list_order:
merged_list = value + merged_dict[key]
else:
merged_list = merged_dict[key] + value
unique_list = list(dict.fromkeys(merged_list)) # remove duplicates
merged_dict[key] = unique_list
else:
merged_dict[key] = value
return merged_dict
def merge_pool_configs(dir_path):
dir_path = Path(dir_path).absolute()
merged_configs = {}
for root, _, files in os.walk(dir_path):
no_config = True
for file in files:
if file == 'alaka.yaml':
file_path = os.path.join(root, file)
config = read_config(file_path)
# Merge the config with the merged config from the parent dir
if par_config := merged_configs.get(os.path.dirname(root)):
merged_configs[root] = merge_dict(par_config, config)
else:
merged_configs[root] = config
no_config = False
if no_config:
merged_configs[root] = merged_configs.get(os.path.dirname(root))
return merged_configs
def merge_instance_configs(pool_config, instance_domain, instance_config):
merged_config = {}
for app, app_config in instance_config.items():
if app_config and pool_config.get(app):
merged_config[app] = merge_dict(pool_config[app], app_config)
elif app_config:
merged_config[app] = app_config
elif pool_config.get(app):
merged_config[app] = pool_config[app].copy()
else:
merged_config[app] = {}
merged_config[app]['app_domain'] = map_subdomain(app, instance_domain, merged_config[app])
if not ((server:= get_value(merged_config, 'GLOBALS', 'server')) or (server:= get_value(pool_config, 'GLOBALS', 'server'))):
server = instance_domain
if not merged_config[app].get('server'):
merged_config[app]['server'] = server
if merged_config.get('GLOBALS'):
merged_config.pop('GLOBALS')
return merged_config
def map_subdomain(recipe, instance_domain, app_config):
if subdomain:= app_config.get('subdomain'):
domain = subdomain.replace("example.com", instance_domain)
else:
domain = f"{recipe}.{instance_domain}"
return domain
def get_merged_instance_configs(pool_path, pool_configs):
pool_path = Path(pool_path).absolute()
if pool_path.is_file():
parent_path = os.path.dirname(pool_path)
instance_config = read_config(pool_path)
domain = pool_path.name.removesuffix('.yml').removesuffix('.yaml')
merged_config = merge_instance_configs(pool_configs[parent_path], domain, instance_config)
return {domain: merged_config}
instances = {}
for root, _, files in os.walk(Path(pool_path)):
for file in files:
# This pattern matches for files of the format "<domain>.yml" or "<domain>.yaml"
pattern = r"^(?:[A-Za-z0-9](?:[A-Za-z0-9\-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,6}(?:\.yaml|\.yml)$"
if re.match(pattern, file):
instance_config = read_config(f'{root}/{file}')
domain = file.removesuffix('.yml').removesuffix('.yaml')
merged_config = merge_instance_configs(pool_configs[root], domain, instance_config)
instances[domain] = merged_config
return instances
def merge_connection_configs(configs):
connection_config = read_config(COMBINE_PATH)
extend_shared_secrets(connection_config)
merged_configs = configs.copy()
for _, instance_config in merged_configs.items():
for target_app, source_apps in connection_config.items():
for source_app, target_conf in source_apps.items():
if target_app in instance_config and source_app in instance_config:
instance_config[target_app] = merge_dict(target_conf, instance_config[target_app], reverse_list_order=True)
return merged_configs
"""Add a layer containing the source app"""
def extend_shared_secrets(connection_config):
for _, source_apps in connection_config.items():
for source_app, target_conf in source_apps.items():
if shared_secrets:= target_conf.get('shared_secrets'):
target_conf['shared_secrets'] = {source_app: shared_secrets}
def abra(*args, machine_output=False, ignore_error=False):
command = ["abra", *args]
if machine_output:
command.append("-m")
logging.debug(f"run command: {' '.join(command)}")
process = subprocess.run(command, capture_output=True)
if process.stderr:
logging.warning(process.stderr.decode())
if process.stdout:
logging.debug(process.stdout.decode())
if process.returncode and not ignore_error:
#breakpoint()
raise RuntimeError(
f'{" ".join(command)} \n STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}')
if machine_output:
return json.loads(process.stdout.decode())
return process.stdout.decode()
def write_env_header(path):
logging.debug(f'write header to {path}')
header = """################################################################################
# DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN #
################################################################################
"""
with open(path, "r+") as file:
old_content = file.read()
file.seek(0)
file.write(header + old_content)
def new_app(recipe, domain, server):
path = get_env_path(server, domain)
if path.exists():
print(f'remove {path}')
path.unlink()
logging.info(f'create {recipe} config on {server} at {domain}')
out = abra("app", "new", recipe, "-n", "-s", server, "-D", domain)
if not "app has been created" in out:
raise RuntimeError(f'App "{recipe}" creation failed')
else:
write_env_header(path)
logging.info(f'{recipe} created on {server} at {domain}')
def update_configs(path, config):
if uncomment_keys := config.get("uncomment"):
uncomment(uncomment_keys, path, True)
if comment_keys := config.get("comment"):
comment(comment_keys, path, True)
if envs := config.get("env"):
uncomment(envs.keys(), path)
for key, value in envs.items():
logging.debug(f'set {key}={value} in {path}')
dotenv.set_key(path, key, value, quote_mode="never")
def generate_all_secrets(domain):
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
if any("false" in line for line in stored_secrets):
logging.info(f"Generate all secrets for {domain}")
generated_secrets = abra("app", "secret", "generate", "-a", domain)
print(f"secrets for {domain} generated")
print(generated_secrets)
def get_env_path(server, domain):
return Path(f"~/.abra/servers/{server}/{domain}.env").expanduser()
# def connect_apps(target_app):
# path = get_env_path(target_app)
# target_conf = COMBINE.get(target_app)
# for source_app in INSTANCE["apps"]:
# if target_conf and (configs := target_conf.get(source_app)):
# logging.info(f'connect {target_app} with {source_app}')
# update_configs(target_app, configs)
# if shared_secrets := configs.get("shared_secrets"):
# logging.info(
# f'share secrets between {target_app} and {source_app}')
# share_secrets(target_app, source_app, shared_secrets)
# insert_domains(path, source_app)
def exchange_secrets(app1, instance_config, apps):
#TODO: check this function
app1_config = instance_config[app1]
app1_domain = app1_config['app_domain']
for app2 in apps:
app2_config = instance_config[app2]
app2_domain = app2_config['app_domain']
if app1_shared_secrets := get_value(app1_config, "shared_secrets", app2):
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
share_secrets(app1_domain, app2_domain, app1_shared_secrets)
if app2_shared_secrets := get_value(app2_config, "shared_secrets", app1):
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
share_secrets(app2_domain, app1_domain, app2_shared_secrets)
def str2bool(value):
return value.lower() in ("yes", "true", "t", "1")
def share_secrets(app1_domain, app2_domain, secrets):
app1_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app1_domain, machine_output=True)
app1_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app1_stored_secrets}
app2_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app2_domain, machine_output=True)
app2_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app2_stored_secrets}
for app2_secret in secrets:
app1_secret = secrets[app2_secret]
# TODO: test if both apps have the secret available
try:
app1_secret_is_stored = app1_stored_secrets[app1_secret]
except KeyError:
logging.error(f"{app1_domain} does not contain secret {app1_secret}")
continue
try:
app2_secret_is_stored = app2_stored_secrets[app2_secret]
except KeyError:
logging.error(f"{app2_domain} does not contain secret {app2_secret}")
continue
if app1_secret_is_stored and not app2_secret_is_stored:
secret = get_secret(app1_domain, app1_secret)
insert_secret(app2_domain, app2_secret, secret)
elif app2_secret_is_stored and not app1_secret_is_stored:
secret = get_secret(app2_domain, app2_secret)
insert_secret(app1_domain, app1_secret, secret)
elif not any([app1_secret_is_stored, app2_secret_is_stored]):
secret = generate_secret(app1_domain, app1_secret)
insert_secret(app2_domain, app2_secret, secret)
def get_secret(domain, secret_name):
# TODO: use "abra secret get <secret_name>"
secret = abra("app", "run", domain, "worker", "cat", f"/var/run/secrets/{secret_name}")
return secret
def generate_secret(domain, secret_name):
secret = abra("app", "secret", "generate", domain, secret_name, "v1", machine_output=True)
return secret[0]['value']
def insert_secrets_from_conf(domain, config):
logging.info(f"Insert secrets for {domain}")
if secrets := config.get("secrets"):
for secret_name, secret in secrets.items():
insert_secret(domain, secret_name, secret)
def unqote_strings(s):
if s.startswith('"') and s.endswith('"'):
return s[1:-1]
elif s.startswith("'") and s.endswith("'"):
return s[1:-1]
else:
return s
def insert_secret(domain, secret_name, secret):
# TODO parse json
stored_secrets = abra("app", "secret", "ls", "-C", domain).splitlines()
# Fix extra quotes around secrets
secret = unqote_strings(secret)
if not any(secret_name in line and "true" in line for line in stored_secrets):
logging.info(f"Insert secret {secret_name}: {secret} into {domain}")
abra("app", "secret", "insert", domain, secret_name, "v1", secret)
def uncomment(keys, path, match_all=False):
#TODO: fix variablennamen vs inline regex (siehe backupbot)
logging.debug(f'Uncomment {keys} in {path}')
with open(path, "r") as file:
lines = file.readlines()
with open(path, "w") as file:
for line in lines:
line_match = line.split("=")[0] # Match only keys
if match_all:
line_match = line
if ('=' in line) and any(key in line_match for key in keys):
line = line.lstrip("#").lstrip()
file.write(line)
def comment(keys, path, match_all=False):
logging.debug(f'Comment {keys} in {path}')
with open(path, "r") as file:
lines = file.readlines()
with open(path, "w") as file:
for line in lines:
line_match = line.split("=")[0] # Match only keys
if match_all:
line_match = line
if any(key in line_match for key in keys):
line = line.lstrip("#").lstrip()
line = f"#{line}"
file.write(line)
def exchange_domains(instance, instance_config, path):
for app in instance_config:
old_app_domain = f'{app}.example.com'
new_app_domain = instance_config[app]['app_domain']
replace_domains(path, old_app_domain, new_app_domain)
replace_domains(path, 'example.com', instance)
def replace_domains(path, old_domain, new_domain):
logging.debug(f'replace all {old_domain} with {new_domain} in {path}')
with open(path, "r") as file:
content = file.read()
content = content.replace(f"{old_domain}", new_domain)
with open(path, "w") as file:
file.write(content)
def list_commands(app_config):
domain = app_config['app_domain']
if not (all_cmds:= app_config.get('execute')):
logging.info(f"No post deploy cmds for {domain}")
return
for cmd in all_cmds:
container = cmd.split()[0]
cmd = cmd.split()[1:]
print(f"{domain}:{container} --> '{cmd}'")
def execute_cmds(app_config):
domain = app_config['app_domain']
if not (all_cmds:= app_config.get('execute')):
logging.info(f"No post deploy cmds for {domain}")
return
for cmd in all_cmds:
container = cmd.split()[0]
cmd = cmd.split()[1:]
print(f"Run '{cmd}' in {domain}:{container}")
if container == "local":
print(abra("app", "cmd", "--local", domain, *cmd, ignore_error=True))
else:
print(abra("app", "cmd", domain, container, *cmd, ignore_error=True))
@click.group()
@click.option('-l', '--log', 'loglevel')
@click.option('-p', '--pool_path', 'pool_path')
@click.option('-c', '--config_path', 'config_path', default=".")
def cli(loglevel, pool_path, config_path):
global CONFIGS
pool_configs = merge_pool_configs(config_path)
if not Path(pool_path).exists():
logging.error(f"{pool_path} does not exists! Are you in the correct directory?")
exit(1)
instance_configs = get_merged_instance_configs(pool_path, pool_configs)
CONFIGS = merge_connection_configs(instance_configs)
if loglevel:
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(level=numeric_level)
#@cli.command()
#def init_server():
# """ Initialize the server """
# new_app("traefik")
# new_app("backup-bot-two")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def setup(apps):
pass
@cli.command()
@click.option('-a', '--apps', multiple=True)
def config(apps):
""" Configure the apps """
for instance, instance_config in CONFIGS.items():
if apps:
selected_apps = []
for app in apps:
if app in instance_config.keys():
selected_apps.append(app)
else:
logging.error(f' App config \'{app}\' not found for {instance}!')
exit(1)
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
server = app_config["server"]
path = get_env_path(server, domain)
print(f'Setup {app} config on {server} at {domain}')
new_app(app, domain, server)
logging.info(f'set configs for {app} at {instance}')
update_configs(path, app_config)
exchange_domains(instance, instance_config, path)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def secrets(apps):
""" Configure the apps """
for instance, instance_config in CONFIGS.items():
instance_apps = instance_config.keys()
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
print(f"Create secrets for {domain}")
insert_secrets_from_conf(domain, app_config)
exchange_secrets(app, instance_config, instance_apps)
generate_all_secrets(domain)
def get_deployed_apps(apps):
deployed_apps = {}
processed_server = []
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
server = instance_config[app]['server']
if server in processed_server:
continue
processed_server.append(server)
deployed = abra("app", "ls", "-S", "-s", server, "-m", machine_output=True)
deployed_app_versions = {app["appName"]: app["version"] for app in deployed[server]["apps"] if app["status"] == "deployed"}
deployed_apps.update(deployed_app_versions)
return deployed_apps
@cli.command()
@click.option('-a', '--apps', multiple=True)
@click.option('-r', '--run-cmds', is_flag=True)
@click.option('-f', '--force', is_flag=True)
def deploy(apps, run_cmds, force):
""" Deploy all the apps """
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain in deployed_domains and not force:
print(f"{domain} is already deployed")
continue
version = app_config.get('version')
if not version:
version = 'latest'
cmd = ["deploy", "-n"]
if version == 'chaos':
cmd.append("--chaos")
if force:
cmd.append("--force")
if not run_cmds:
cmd.append("--no-converge-checks")
cmd.append(domain)
if version not in ['latest', 'chaos']:
cmd.append(version)
print(f'deploy {domain} with version "{version}"')
print(abra("app", *cmd))
if run_cmds:
logging.info(f'execute commands for {domain}')
execute_cmds(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
@click.option('-r', '--run-cmds', is_flag=True)
def upgrade(apps, run_cmds):
""" Deploy all the apps """
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain not in deployed_domains:
print(f"{domain} is not deployed")
continue
version = app_config.get('version')
if not version:
version = 'latest'
cmd = ["upgrade", "-n"]
if version == 'chaos':
cmd.append("--chaos")
if not run_cmds:
cmd.append("--no-converge-checks")
cmd.append(domain)
if version not in ['latest', 'chaos']:
cmd.append(version)
deployed_version = deployed_domains[domain]
if version == deployed_version:
print(f"{domain} is already at version {version}")
continue
print(f'upgrade {domain} from version {deployed_version} to version "{version}"')
#print(abra("app", *cmd))
if run_cmds:
logging.info(f'execute commands for {domain}')
execute_cmds(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def undeploy(apps):
""" Undeploy all the apps """
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain not in deployed_domains:
print(f"{domain} is not deployed")
continue
print(f'undeploy {domain}')
print(abra("app", "undeploy", "-n", domain))
@cli.command()
@click.option('-a', '--apps', multiple=True)
def cmds(apps):
""" execute all post deploy cmds """
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain not in deployed_domains:
print(f"{domain} is not deployed")
continue
logging.info(f'execute commands for {domain}')
execute_cmds(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def list_cmds(apps):
""" execute all post deploy cmds """
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
list_commands(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def purge(apps):
""" Completely remove all the apps """
# TODO: check for deployed apps
pool_apps = print_all_apps(apps)
domains = list(zip(*sum(pool_apps.values(), [])))[1]
if input(f"Do you really want to purge these apps? Type YES: ") == "YES":
for domain in domains:
logging.info(f'purge {domain}')
abra("app", "rm", "-n", domain)
print(f"{domain} purged")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def ls(apps):
""" List all the apps """
print_all_apps(apps)
def print_all_apps(apps):
pool_apps = list_apps(apps)
for instance, instance_apps in pool_apps.items():
print(instance)
print(tabulate(instance_apps))
print()
return pool_apps
def list_apps(apps):
pool_apps = {}
for instance, instance_config in CONFIGS.items():
instance_app_domains = []
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
if app in instance_config:
domain = instance_config[app]['app_domain']
instance_app_domains.append((app, domain))
if instance_app_domains:
pool_apps[instance] = instance_app_domains
return pool_apps
@cli.command()
@click.option('-a', '--apps', multiple=True)
def purge_secrets(apps):
""" Remove all the apps secrets """
# TODO: check for deployed apps
pool_apps = print_all_apps(apps)
domains = list(zip(*sum(pool_apps.values(), [])))[1]
if input(f"Do you really want to purge the secrets for these apps? Type YES: ") == "YES":
for domain in domains:
logging.info(f'purge {domain}')
abra("app","secret" ,"rm", "-a", domain)
print(f"Secrets for {domain} purged")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def purge_volumes(apps):
""" Remove all the apps secrets """
# TODO: check for deployed apps
pool_apps = print_all_apps(apps)
domains = list(zip(*sum(pool_apps.values(), [])))[1]
if input(f"Do you really want to purge the volumes for these apps? Type YES: ") == "YES":
for domain in domains:
logging.info(f'purge {domain}')
abra("app","volume" ,"rm", "-n", domain)
print(f"Volumes for {domain} purged")
if __name__ == '__main__':
cli()