#!/bin/python3 import os import json import logging from pathlib import Path import subprocess import re from tabulate import tabulate import click import dotenv from icecream import ic from jinja2 import Environment, FileSystemLoader from ruamel.yaml import YAML from ruamel.yaml.constructor import SafeConstructor from ruamel.yaml.nodes import ScalarNode COMBINE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/combine.yml" CONFIG_FILE_NAME = 'alaka.yaml' CONFIGS = {} """Preserve quotes while parsing yaml files""" class MySafeConstructor(SafeConstructor): def construct_yaml_str(self, node): value = self.construct_scalar(node) if isinstance(node, ScalarNode) and (node.style == '"' or node.style == "'"): # If node was quoted, keep quotes in the returned value return '{}{}{}'.format(node.style, value, node.style) # Otherwise, return value as is return value MySafeConstructor.add_constructor( 'tag:yaml.org,2002:str', MySafeConstructor.construct_yaml_str) def read_config(filepath): filepath = Path(filepath).expanduser() if not filepath.exists(): logging.warning(f"config file {filepath} does not exist") return {} yaml = YAML(typ='safe', pure=True) # Set type to 'safe' and use pure Python mode yaml.Constructor = MySafeConstructor with open(filepath) as file: yaml_config = yaml.load(file) if not yaml_config: logging.warning(f"config file {filepath} is empty") return {} globals = yaml_config.get('GLOBALS') jinja = Environment(loader=FileSystemLoader( ['/', '.']), trim_blocks=True, lstrip_blocks=True) template = jinja.get_template(filepath.as_posix(), globals=globals) return yaml.load(template.render()) """ Get value from nested dicts, return None if one of the keys does not exists """ def get_value(dict, *keys): _element = dict for key in keys: try: _element = _element[key] except KeyError: return return _element def merge_dict(dict1, dict2, reverse_list_order=False): """ Merge two nested dicts recursively, the second overwrites the first one""" merged_dict = dict1.copy() for key, value in dict2.items(): if key in merged_dict and isinstance(value, dict) and isinstance(merged_dict[key], dict): merged_dict[key] = merge_dict(merged_dict[key], value, reverse_list_order) elif key in merged_dict and isinstance(value, list) and isinstance(merged_dict[key], list): if reverse_list_order: merged_list = value + merged_dict[key] else: merged_list = merged_dict[key] + value unique_list = list(dict.fromkeys(merged_list)) # remove duplicates merged_dict[key] = unique_list else: merged_dict[key] = value return merged_dict def merge_pool_configs(dir_path): dir_path = Path(dir_path).absolute() merged_configs = {} for root, _, files in os.walk(dir_path): no_config = True for file in files: if file == 'alaka.yaml': file_path = os.path.join(root, file) config = read_config(file_path) # Merge the config with the merged config from the parent dir if par_config := merged_configs.get(os.path.dirname(root)): merged_configs[root] = merge_dict(par_config, config) else: merged_configs[root] = config no_config = False if no_config: merged_configs[root] = merged_configs.get(os.path.dirname(root)) return merged_configs def merge_instance_configs(pool_config, instance_domain, instance_config): merged_config = {} for app, app_config in instance_config.items(): if app_config and pool_config.get(app): merged_config[app] = merge_dict(pool_config[app], app_config) elif app_config: merged_config[app] = app_config elif pool_config.get(app): merged_config[app] = pool_config[app].copy() else: merged_config[app] = {} merged_config[app]['app_domain'] = map_subdomain(app, instance_domain, merged_config[app]) if not ((server:= get_value(merged_config, 'GLOBALS', 'server')) or (server:= get_value(pool_config, 'GLOBALS', 'server'))): server = instance_domain if not merged_config[app].get('server'): merged_config[app]['server'] = server if merged_config.get('GLOBALS'): merged_config.pop('GLOBALS') return merged_config def map_subdomain(recipe, instance_domain, app_config): if subdomain:= app_config.get('subdomain'): domain = subdomain.replace("example.com", instance_domain) else: domain = f"{recipe}.{instance_domain}" return domain def get_merged_instance_configs(pool_path, pool_configs): pool_path = Path(pool_path).absolute() if pool_path.is_file(): parent_path = os.path.dirname(pool_path) instance_config = read_config(pool_path) domain = pool_path.name.removesuffix('.yml').removesuffix('.yaml') merged_config = merge_instance_configs(pool_configs[parent_path], domain, instance_config) return {domain: merged_config} instances = {} for root, _, files in os.walk(Path(pool_path)): for file in files: # This pattern matches for files of the format ".yml" or ".yaml" pattern = r"^(?:[A-Za-z0-9](?:[A-Za-z0-9\-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,6}(?:\.yaml|\.yml)$" if re.match(pattern, file): instance_config = read_config(f'{root}/{file}') domain = file.removesuffix('.yml').removesuffix('.yaml') merged_config = merge_instance_configs(pool_configs[root], domain, instance_config) instances[domain] = merged_config return instances def merge_connection_configs(configs): connection_config = read_config(COMBINE_PATH) extend_shared_secrets(connection_config) merged_configs = configs.copy() for _, instance_config in merged_configs.items(): for target_app, source_apps in connection_config.items(): for source_app, target_conf in source_apps.items(): if target_app in instance_config and source_app in instance_config: instance_config[target_app] = merge_dict(target_conf, instance_config[target_app], reverse_list_order=True) return merged_configs """Add a layer containing the source app""" def extend_shared_secrets(connection_config): for _, source_apps in connection_config.items(): for source_app, target_conf in source_apps.items(): if shared_secrets:= target_conf.get('shared_secrets'): target_conf['shared_secrets'] = {source_app: shared_secrets} def abra(*args, machine_output=False, ignore_error=False): command = ["abra", *args] if machine_output: command.append("-m") logging.debug(f"run command: {' '.join(command)}") process = subprocess.run(command, capture_output=True) if process.stderr: logging.warning(process.stderr.decode()) if process.stdout: logging.debug(process.stdout.decode()) if process.returncode and not ignore_error: #breakpoint() raise RuntimeError( f'{" ".join(command)} \n STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}') if machine_output: return json.loads(process.stdout.decode()) return process.stdout.decode() def write_env_header(path): logging.debug(f'write header to {path}') header = """################################################################################ # DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN # ################################################################################ """ with open(path, "r+") as file: old_content = file.read() file.seek(0) file.write(header + old_content) def new_app(recipe, domain, server, version): path = get_env_path(server, domain) if path.exists(): print(f'remove {path}') path.unlink() logging.info(f'create {recipe} config on {server} at {domain}') out = abra("app", "new", recipe, "-n", "-s", server, "-D", domain, version) if not "app has been created" in out: raise RuntimeError(f'App "{recipe}" creation failed') else: write_env_header(path) logging.info(f'{recipe} created on {server} at {domain}') def update_configs(path, config): if uncomment_keys := config.get("uncomment"): uncomment(uncomment_keys, path, True) if comment_keys := config.get("comment"): comment(comment_keys, path, True) if envs := config.get("env"): uncomment(envs.keys(), path) for key, value in envs.items(): logging.debug(f'set {key}={value} in {path}') dotenv.set_key(path, key, value, quote_mode="never") def generate_all_secrets(domain): stored_secrets = abra("app", "secret", "ls", domain).splitlines() if any("false" in line for line in stored_secrets): logging.info(f"Generate all secrets for {domain}") generated_secrets = abra("app", "secret", "generate", "-a", domain) print(f"secrets for {domain} generated") print(generated_secrets) def get_env_path(server, domain): return Path(f"~/.abra/servers/{server}/{domain}.env").expanduser() # def connect_apps(target_app): # path = get_env_path(target_app) # target_conf = COMBINE.get(target_app) # for source_app in INSTANCE["apps"]: # if target_conf and (configs := target_conf.get(source_app)): # logging.info(f'connect {target_app} with {source_app}') # update_configs(target_app, configs) # if shared_secrets := configs.get("shared_secrets"): # logging.info( # f'share secrets between {target_app} and {source_app}') # share_secrets(target_app, source_app, shared_secrets) # insert_domains(path, source_app) def exchange_secrets(app1, instance_config, apps): #TODO: check this function app1_config = instance_config[app1] app1_domain = app1_config['app_domain'] for app2 in apps: app2_config = instance_config[app2] app2_domain = app2_config['app_domain'] if app1_shared_secrets := get_value(app1_config, "shared_secrets", app2): logging.info(f'share secrets between {app1_domain} and {app2_domain}') share_secrets(app1_domain, app2_domain, app1_shared_secrets) if app2_shared_secrets := get_value(app2_config, "shared_secrets", app1): logging.info(f'share secrets between {app1_domain} and {app2_domain}') share_secrets(app2_domain, app1_domain, app2_shared_secrets) def str2bool(value): return value.lower() in ("yes", "true", "t", "1") def share_secrets(app1_domain, app2_domain, secrets): app1_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app1_domain, machine_output=True) app1_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app1_stored_secrets} app2_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app2_domain, machine_output=True) app2_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app2_stored_secrets} for app2_secret in secrets: app1_secret = secrets[app2_secret] # TODO: test if both apps have the secret available try: app1_secret_is_stored = app1_stored_secrets[app1_secret] except KeyError: logging.error(f"{app1_domain} does not contain secret {app1_secret}") continue try: app2_secret_is_stored = app2_stored_secrets[app2_secret] except KeyError: logging.error(f"{app2_domain} does not contain secret {app2_secret}") continue if app1_secret_is_stored and not app2_secret_is_stored: secret = get_secret(app1_domain, app1_secret) insert_secret(app2_domain, app2_secret, secret) elif app2_secret_is_stored and not app1_secret_is_stored: secret = get_secret(app2_domain, app2_secret) insert_secret(app1_domain, app1_secret, secret) elif not any([app1_secret_is_stored, app2_secret_is_stored]): secret = generate_secret(app1_domain, app1_secret) insert_secret(app2_domain, app2_secret, secret) def get_secret(domain, secret_name): # TODO: use "abra secret get " secret = abra("app", "run", domain, "worker", "cat", f"/var/run/secrets/{secret_name}") return secret def generate_secret(domain, secret_name): secret = abra("app", "secret", "generate", domain, secret_name, "v1", machine_output=True) return secret[0]['value'] def insert_secrets_from_conf(domain, config): logging.info(f"Insert secrets for {domain}") if secrets := config.get("secrets"): for secret_name, secret in secrets.items(): insert_secret(domain, secret_name, secret) def unqote_strings(s): if s.startswith('"') and s.endswith('"'): return s[1:-1] elif s.startswith("'") and s.endswith("'"): return s[1:-1] else: return s def insert_secret(domain, secret_name, secret): # TODO parse json stored_secrets = abra("app", "secret", "ls", "-C", domain).splitlines() # Fix extra quotes around secrets secret = unqote_strings(secret) if not any(secret_name in line and "true" in line for line in stored_secrets): logging.info(f"Insert secret {secret_name}: {secret} into {domain}") abra("app", "secret", "insert", domain, secret_name, "v1", secret) def uncomment(keys, path, match_all=False): #TODO: fix variablennamen vs inline regex (siehe backupbot) logging.debug(f'Uncomment {keys} in {path}') with open(path, "r") as file: lines = file.readlines() with open(path, "w") as file: for line in lines: line_match = line.split("=")[0] # Match only keys if match_all: line_match = line if ('=' in line) and any(key in line_match for key in keys): line = line.lstrip("#").lstrip() file.write(line) def comment(keys, path, match_all=False): logging.debug(f'Comment {keys} in {path}') with open(path, "r") as file: lines = file.readlines() with open(path, "w") as file: for line in lines: line_match = line.split("=")[0] # Match only keys if match_all: line_match = line if any(key in line_match for key in keys): line = line.lstrip("#").lstrip() line = f"#{line}" file.write(line) def exchange_domains(instance, instance_config, path): for app in instance_config: old_app_domain = f'{app}.example.com' new_app_domain = instance_config[app]['app_domain'] replace_domains(path, old_app_domain, new_app_domain) replace_domains(path, 'example.com', instance) def replace_domains(path, old_domain, new_domain): logging.debug(f'replace all {old_domain} with {new_domain} in {path}') with open(path, "r") as file: content = file.read() content = content.replace(f"{old_domain}", new_domain) with open(path, "w") as file: file.write(content) def list_commands(app_config): domain = app_config['app_domain'] if not (all_cmds:= app_config.get('execute')): logging.info(f"No post deploy cmds for {domain}") return for cmd in all_cmds: container = cmd.split()[0] cmd = cmd.split()[1:] print(f"{domain}:{container} --> '{cmd}'") def execute_cmds(app_config): domain = app_config['app_domain'] if not (all_cmds:= app_config.get('execute')): logging.info(f"No post deploy cmds for {domain}") return for cmd in all_cmds: container = cmd.split()[0] cmd = cmd.split()[1:] print(f"Run '{cmd}' in {domain}:{container}") if container == "local": print(abra("app", "cmd", "--local", domain, *cmd, ignore_error=True)) else: print(abra("app", "cmd", domain, container, *cmd, ignore_error=True)) @click.group() @click.option('-l', '--log', 'loglevel') @click.option('-p', '--pool_path', 'pool_path') @click.option('-c', '--config_path', 'config_path', default=".") def cli(loglevel, pool_path, config_path): global CONFIGS pool_configs = merge_pool_configs(config_path) if not Path(pool_path).exists(): logging.error(f"{pool_path} does not exists! Are you in the correct directory?") exit(1) instance_configs = get_merged_instance_configs(pool_path, pool_configs) CONFIGS = merge_connection_configs(instance_configs) if loglevel: numeric_level = getattr(logging, loglevel.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % loglevel) logging.basicConfig(level=numeric_level) @cli.command() @click.option('-a', '--apps', multiple=True) def setup(apps): pass @cli.command() @click.option('-a', '--apps', multiple=True) def config(apps): """ Configure the apps """ for instance, instance_config in CONFIGS.items(): if apps: selected_apps = [] for app in apps: if app in instance_config.keys(): selected_apps.append(app) else: logging.warning(f' App config \'{app}\' not found for {instance}!') continue else: selected_apps = instance_config.keys() for app in selected_apps: app_config = instance_config[app] domain = app_config['app_domain'] server = app_config["server"] path = get_env_path(server, domain) version = app_config.get('version') print(f'Setup {app} ({version}) config on {server} at {domain}') if version in ['chaos', None]: version = '' new_app(app, domain, server, version) logging.info(f'set configs for {app} at {instance}') update_configs(path, app_config) exchange_domains(instance, instance_config, path) @cli.command() @click.option('-a', '--apps', multiple=True) def secrets(apps): """ Configure the apps """ for instance, instance_config in CONFIGS.items(): instance_apps = instance_config.keys() if apps: selected_apps = [app for app in apps if app in instance_config.keys()] else: selected_apps = instance_config.keys() for app in selected_apps: app_config = instance_config[app] domain = app_config['app_domain'] print(f"Create secrets for {domain}") insert_secrets_from_conf(domain, app_config) exchange_secrets(app, instance_config, instance_apps) generate_all_secrets(domain) def get_deployed_apps(apps): deployed_apps = {} processed_server = [] for _, instance_config in CONFIGS.items(): if apps: selected_apps = [app for app in apps if app in instance_config.keys()] else: selected_apps = instance_config.keys() for app in selected_apps: server = instance_config[app]['server'] if server in processed_server: continue processed_server.append(server) deployed = abra("app", "ls", "-S", "-s", server, "-m", machine_output=True) deployed_app_versions = {app["appName"]: app["version"] for app in deployed[server]["apps"] if app["status"] == "deployed"} deployed_apps.update(deployed_app_versions) return deployed_apps @cli.command() @click.option('-a', '--apps', multiple=True) @click.option('-r', '--run-cmds', is_flag=True) @click.option('-f', '--force', is_flag=True) @click.option('-c', '--converge-checks', is_flag=True) def deploy(apps, run_cmds, force, converge_checks): """ Deploy all the apps """ deployed_domains = get_deployed_apps(apps) for _, instance_config in CONFIGS.items(): if apps: selected_apps = [app for app in apps if app in instance_config.keys()] else: selected_apps = instance_config.keys() for app in selected_apps: app_config = instance_config[app] domain = app_config['app_domain'] if domain in deployed_domains and not force: print(f"{domain} is already deployed") continue version = app_config.get('version') if not version: version = 'latest' cmd = ["deploy", "-n"] if version == 'chaos': cmd.append("--chaos") if force: cmd.append("--force") if not run_cmds and not converge_checks: cmd.append("--no-converge-checks") cmd.append(domain) if version not in ['latest', 'chaos']: cmd.append(version) print(f'deploy {domain} with version "{version}"') print(abra("app", *cmd)) if run_cmds: logging.info(f'execute commands for {domain}') execute_cmds(app_config) @cli.command() @click.option('-a', '--apps', multiple=True) @click.option('-r', '--run-cmds', is_flag=True) @click.option('-d', '--dry-run', is_flag=True) def upgrade(apps, run_cmds, dry_run): """ Deploy all the apps """ deployed_domains = get_deployed_apps(apps) for _, instance_config in CONFIGS.items(): if apps: selected_apps = [app for app in apps if app in instance_config.keys()] else: selected_apps = instance_config.keys() for app in selected_apps: app_config = instance_config[app] domain = app_config['app_domain'] if domain not in deployed_domains: print(f"{domain} is not deployed") continue version = app_config.get('version') if not version: version = 'latest' cmd = ["upgrade", "-n"] if version == 'chaos': cmd.append("--chaos") if not run_cmds: cmd.append("--no-converge-checks") cmd.append(domain) if version not in ['latest', 'chaos']: cmd.append(version) deployed_version = deployed_domains[domain] if version == deployed_version: print(f"{domain} is already at version {version}") continue print(f'upgrade {domain} from version {deployed_version} to version "{version}"') if not dry_run: print(abra("app", *cmd)) if run_cmds: logging.info(f'execute commands for {domain}') execute_cmds(app_config) @cli.command() @click.option('-a', '--apps', multiple=True) def undeploy(apps): """ Undeploy all the apps """ deployed_domains = get_deployed_apps(apps) for _, instance_config in CONFIGS.items(): if apps: selected_apps = [app for app in apps if app in instance_config.keys()] else: selected_apps = instance_config.keys() for app in selected_apps: app_config = instance_config[app] domain = app_config['app_domain'] if domain not in deployed_domains: print(f"{domain} is not deployed") continue print(f'undeploy {domain}') print(abra("app", "undeploy", "-n", domain)) @cli.command() @click.option('-a', '--apps', multiple=True) def cmds(apps): """ execute all post deploy cmds """ deployed_domains = get_deployed_apps(apps) for _, instance_config in CONFIGS.items(): if apps: selected_apps = [app for app in apps if app in instance_config.keys()] else: selected_apps = instance_config.keys() for app in selected_apps: app_config = instance_config[app] domain = app_config['app_domain'] if domain not in deployed_domains: print(f"{domain} is not deployed") continue logging.info(f'execute commands for {domain}') execute_cmds(app_config) @cli.command() @click.option('-a', '--apps', multiple=True) def list_cmds(apps): """ execute all post deploy cmds """ for _, instance_config in CONFIGS.items(): if apps: selected_apps = [app for app in apps if app in instance_config.keys()] else: selected_apps = instance_config.keys() for app in selected_apps: app_config = instance_config[app] list_commands(app_config) @cli.command() @click.option('-a', '--apps', multiple=True) def purge(apps): """ Completely remove all the apps """ # TODO: check for deployed apps pool_apps = print_all_apps(apps) domains = list(zip(*sum(pool_apps.values(), [])))[1] if input(f"Do you really want to purge these apps? Type YES: ") == "YES": for domain in domains: logging.info(f'purge {domain}') abra("app", "rm", "-n", domain) print(f"{domain} purged") @cli.command() @click.option('-a', '--apps', multiple=True) def ls(apps): """ List all the apps """ print_all_apps(apps) def print_all_apps(apps): pool_apps = list_apps(apps) for instance, instance_apps in pool_apps.items(): print(instance) print(tabulate(instance_apps)) print() return pool_apps def list_apps(apps): pool_apps = {} for instance, instance_config in CONFIGS.items(): instance_app_domains = [] if apps: selected_apps = [app for app in apps if app in instance_config.keys()] else: selected_apps = instance_config.keys() for app in selected_apps: if app in instance_config: domain = instance_config[app]['app_domain'] instance_app_domains.append((app, domain)) if instance_app_domains: pool_apps[instance] = instance_app_domains return pool_apps @cli.command() @click.option('-a', '--apps', multiple=True) def purge_secrets(apps): """ Remove all the apps secrets """ # TODO: check for deployed apps pool_apps = print_all_apps(apps) domains = list(zip(*sum(pool_apps.values(), [])))[1] if input(f"Do you really want to purge the secrets for these apps? Type YES: ") == "YES": for domain in domains: logging.info(f'purge {domain}') abra("app","secret" ,"rm", "-a", domain) print(f"Secrets for {domain} purged") @cli.command() @click.option('-a', '--apps', multiple=True) def purge_volumes(apps): """ Remove all the apps secrets """ # TODO: check for deployed apps pool_apps = print_all_apps(apps) domains = list(zip(*sum(pool_apps.values(), [])))[1] if input(f"Do you really want to purge the volumes for these apps? Type YES: ") == "YES": for domain in domains: logging.info(f'purge {domain}') abra("app","volume" ,"rm", "-n", domain) print(f"Volumes for {domain} purged") if __name__ == '__main__': cli()