#!/bin/python3 import os import json import logging from pathlib import Path import subprocess from tabulate import tabulate import click import dotenv from jinja2 import Environment, FileSystemLoader from ruamel.yaml import YAML from ruamel.yaml.constructor import SafeConstructor from ruamel.yaml.nodes import ScalarNode import re COMBINE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/combine.yml" CONFIG_FILE_NAME = 'alaka.yaml' CONFIGS = {} """Preserve quotes while parsing yaml files""" class MySafeConstructor(SafeConstructor): def construct_yaml_str(self, node): value = self.construct_scalar(node) if isinstance(node, ScalarNode) and (node.style == '"' or node.style == "'"): # If node was quoted, keep quotes in the returned value return '{}{}{}'.format(node.style, value, node.style) # Otherwise, return value as is return value MySafeConstructor.add_constructor( 'tag:yaml.org,2002:str', MySafeConstructor.construct_yaml_str) def read_config(filepath): filepath = Path(filepath).expanduser() if not filepath.exists(): logging.warning(f"config file {filepath} does not exist") return {} yaml = YAML(typ='safe', pure=True) # Set type to 'safe' and use pure Python mode yaml.Constructor = MySafeConstructor with open(filepath) as file: yaml_config = yaml.load(file) if not yaml_config: logging.warning(f"config file {filepath} is empty") return {} globals = yaml_config.get('GLOBALS') jinja = Environment(loader=FileSystemLoader( ['/', '.']), trim_blocks=True, lstrip_blocks=True) template = jinja.get_template(filepath.as_posix(), globals=globals) return yaml.load(template.render()) """ Get value from nested dicts, return None if one of the keys does not exists """ def get_value(dict, *keys): _element = dict for key in keys: try: _element = _element[key] except KeyError: return return _element def merge_dict(dict1, dict2): """ Merge two nested dicts recursively, the second overwrites the first one""" merged_dict = dict1.copy() for key, value in dict2.items(): if key in merged_dict and isinstance(value, dict) and isinstance(merged_dict[key], dict): merged_dict[key] = merge_dict(merged_dict[key], value) elif key in merged_dict and isinstance(value, list) and isinstance(merged_dict[key], list): merged_dict[key] = list(set(merged_dict[key] + value)) else: merged_dict[key] = value return merged_dict def merge_pool_configs(dir_path): dir_path = Path(dir_path).absolute() merged_configs = {} for root, _, files in os.walk(dir_path): no_config = True for file in files: if file == 'alaka.yaml': file_path = os.path.join(root, file) config = read_config(file_path) # Merge the config with the merged config from the parent dir if par_config := merged_configs.get(os.path.dirname(root)): merged_configs[root] = merge_dict(par_config, config) else: merged_configs[root] = config no_config = False if no_config: merged_configs[root] = merged_configs.get(os.path.dirname(root)) return merged_configs def merge_instance_configs(pool_config, instance_domain, instance_config): merged_config = {} for app, app_config in instance_config.items(): if not (server:= get_value(pool_config, 'GLOBALS', 'server')): server = instance_domain if app_config and pool_config.get(app): merged_config[app] = merge_dict(pool_config[app], app_config) elif app_config: merged_config[app] = app_config elif pool_config.get(app): merged_config[app] = pool_config[app] else: merged_config[app] = {} merged_config[app]['app_domain'] = map_subdomain(app, instance_domain, merged_config[app]) if not merged_config[app].get('server'): merged_config[app]['server'] = server return merged_config def map_subdomain(recipe, instance_domain, app_config): if subdomain:= app_config.get('subdomain'): domain = subdomain.replace("example.com", instance_domain) else: domain = f"{recipe}.{instance_domain}" return domain def get_merged_instance_configs(pool_path, pool_configs): pool_path = Path(pool_path).absolute() if pool_path.is_file(): parent_path = os.path.dirname(pool_path) instance_config = read_config(pool_path) domain = pool_path.name.removesuffix('.yml').removesuffix('.yaml') merged_config = merge_instance_configs(pool_configs[parent_path], domain, instance_config) return {pool_path.name: merged_config} instances = {} for root, _, files in os.walk(Path(pool_path)): for file in files: # This pattern matches for files of the format ".yml" or ".yaml" pattern = r"^(?:[A-Za-z0-9](?:[A-Za-z0-9\-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,6}(?:\.yaml|\.yml)$" if re.match(pattern, file): instance_config = read_config(f'{root}/{file}') domain = file.removesuffix('.yml').removesuffix('.yaml') merged_config = merge_instance_configs(pool_configs[root], domain, instance_config) instances[domain] = merged_config return instances def merge_connection_configs(configs): connection_config = read_config(COMBINE_PATH) extend_shared_secrets(connection_config) merged_configs = configs.copy() for _, instance_config in merged_configs.items(): for target_app, source_apps in connection_config.items(): for source_app, target_conf in source_apps.items(): if target_app in instance_config and source_app in instance_config: instance_config[target_app] = merge_dict(target_conf, instance_config[target_app]) return merged_configs """Add a layer containing the source app""" def extend_shared_secrets(connection_config): for _, source_apps in connection_config.items(): for source_app, target_conf in source_apps.items(): if shared_secrets:= target_conf.get('shared_secrets'): target_conf['shared_secrets'] = {source_app: shared_secrets} def abra(*args, machine_output=False, ignore_error=False): command = ["abra", *args] if machine_output: command.append("-m") logging.debug(f"run command: {' '.join(command)}") process = subprocess.run(command, capture_output=True) if process.stderr: logging.debug(process.stderr.decode()) if process.stdout: logging.debug(process.stdout.decode()) if process.returncode and not ignore_error: breakpoint() raise RuntimeError( f'{" ".join(command)} \n STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}') if machine_output: return json.loads(process.stdout.decode()) return process.stdout.decode() def write_env_header(path): logging.debug(f'write header to {path}') header = """################################################################################ # DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN # ################################################################################ """ with open(path, "r+") as file: old_content = file.read() file.seek(0) file.write(header + old_content) def new_app(recipe, domain, server): path = get_env_path(server, domain) if path.exists(): print(f'remove {path}') path.unlink() logging.info(f'create {recipe} config on {server} at {domain}') out = abra("app", "new", recipe, "-n", "-s", server, "-D", domain) if not "app has been created" in out: raise RuntimeError(f'App "{recipe}" creation failed') else: write_env_header(path) logging.info(f'{recipe} created on {server} at {domain}') def update_configs(path, config): if uncomment_keys := config.get("uncomment"): uncomment(uncomment_keys, path, True) if envs := config.get("env"): uncomment(envs.keys(), path) for key, value in envs.items(): logging.debug(f'set {key}={value} in {path}') dotenv.set_key(path, key, value, quote_mode="never") def generate_all_secrets(domain): stored_secrets = abra("app", "secret", "ls", domain).splitlines() if any("false" in line for line in stored_secrets): logging.info(f"Generate all secrets for {domain}") abra("app", "secret", "generate", "-a", domain) print(f"secrets for {domain} generated") def get_env_path(server, domain): return Path(f"~/.abra/servers/{server}/{domain}.env").expanduser() # def connect_apps(target_app): # path = get_env_path(target_app) # target_conf = COMBINE.get(target_app) # for source_app in INSTANCE["apps"]: # if target_conf and (configs := target_conf.get(source_app)): # logging.info(f'connect {target_app} with {source_app}') # update_configs(target_app, configs) # if shared_secrets := configs.get("shared_secrets"): # logging.info( # f'share secrets between {target_app} and {source_app}') # share_secrets(target_app, source_app, shared_secrets) # insert_domains(path, source_app) def exchange_secrets(app1, instance_config, apps): #TODO: check this function app1_config = instance_config[app1] app1_domain = app1_config['app_domain'] for app2 in apps: app2_config = instance_config[app2] app2_domain = app2_config['app_domain'] if app1_shared_secrets := get_value(app1_config, "shared_secrets", app2): logging.info(f'share secrets between {app1_domain} and {app2_domain}') share_secrets(app1_domain, app2_domain, app1_shared_secrets) if app2_shared_secrets := get_value(app2_config, "shared_secrets", app1): logging.info(f'share secrets between {app1_domain} and {app2_domain}') share_secrets(app2_domain, app1_domain, app2_shared_secrets) def str2bool(value): return value.lower() in ("yes", "true", "t", "1") def share_secrets(app1_domain, app2_domain, secrets): app1_stored_secrets = abra("app", "secret", "ls", app1_domain, machine_output=True) app1_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app1_stored_secrets} app2_stored_secrets = abra("app", "secret", "ls", app2_domain, machine_output=True) app2_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app2_stored_secrets} for app2_secret in secrets: app1_secret = secrets[app2_secret] # TODO: test if both apps have the secret available try: app1_secret_is_stored = app1_stored_secrets[app1_secret] except KeyError: logging.error(f"{app1_domain} does not contain secret {app1_secret}") continue try: app2_secret_is_stored = app2_stored_secrets[app2_secret] except KeyError: logging.error(f"{app2_domain} does not contain secret {app2_secret}") continue if app1_secret_is_stored and not app2_secret_is_stored: secret = get_secret(app1_domain, app1_secret) insert_secret(app2_domain, app2_secret, secret) elif app2_secret_is_stored and not app1_secret_is_stored: secret = get_secret(app2_domain, app2_secret) insert_secret(app1_domain, app1_secret, secret) elif not any([app1_secret_is_stored, app2_secret_is_stored]): secret = generate_secret(app1_domain, app1_secret) insert_secret(app2_domain, app2_secret, secret) def get_secret(domain, secret_name): # TODO: use "abra secret get " secret = abra("app", "run", domain, "worker", "cat", f"/var/run/secrets/{secret_name}") return secret def generate_secret(domain, secret_name): secret = abra("app", "secret", "generate", domain, secret_name, "v1", machine_output=True) return secret[0]['value'] def insert_secrets_from_conf(domain, config): logging.info(f"Insert secrets for {domain}") if secrets := config.get("secrets"): for secret_name, secret in secrets.items(): insert_secret(domain, secret_name, secret) def insert_secret(domain, secret_name, secret): # TODO parse json stored_secrets = abra("app", "secret", "ls", domain).splitlines() if not any(secret_name in line and "true" in line for line in stored_secrets): abra("app", "secret", "insert", domain, secret_name, "v1", secret) def uncomment(keys, path, match_all=False): logging.debug(f'Uncomment {keys} in {path}') with open(path, "r") as file: lines = file.readlines() with open(path, "w") as file: for line in lines: line_match = line.split("=")[0] # Match only keys if match_all: line_match = line if any(key in line_match for key in keys): line = line.lstrip("#").lstrip() file.write(line) def exchange_domains(instance_config, apps, path): for app in apps: domain = instance_config[app]['app_domain'] insert_domains(path, app, domain) def insert_domains(path, app, domain): logging.debug(f'replace all {app}.example.com with {domain} in {path}') with open(path, "r") as file: content = file.read() content = content.replace(f"{app}.example.com", domain) with open(path, "w") as file: file.write(content) def execute_cmds(app_config): domain = app_config['app_domain'] if not (all_cmds:= app_config.get('execute')): logging.info(f"No post deploy cmds for {domain}") return for cmd in all_cmds: container = cmd.split()[0] cmd = cmd.split()[1] print(f"Run '{cmd}' in {domain}:{container}") if container == "local": print(abra("app", "cmd", "--local", domain, cmd, ignore_error=True)) else: print(abra("app", "cmd", domain, container, cmd, ignore_error=True)) @click.group() @click.option('-l', '--log', 'loglevel') @click.option('-p', '--pool_path', 'pool_path') @click.option('-c', '--config_path', 'config_path', default=".") def cli(loglevel, pool_path, config_path): global CONFIGS #combine_config = read_config(COMBINE_PATH) pool_configs = merge_pool_configs(config_path) instance_configs = get_merged_instance_configs(pool_path, pool_configs) CONFIGS = merge_connection_configs(instance_configs) if loglevel: numeric_level = getattr(logging, loglevel.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % loglevel) logging.basicConfig(level=numeric_level) #@cli.command() #def init_server(): # """ Initialize the server """ # new_app("traefik") # new_app("backup-bot-two") @cli.command() @click.option('-a', '--apps', multiple=True) def setup(apps): pass @cli.command() @click.option('-a', '--apps', multiple=True) def config(apps): """ Configure the apps """ for instance, instance_config in CONFIGS.items(): if not apps: apps = instance_config.keys() for app in apps: if app not in instance_config: logging.error(f"Could not find any '{app}' configuration for instance {instance}") exit(1) app_config = instance_config[app] domain = app_config['app_domain'] server = app_config["server"] path = get_env_path(server, domain) print(f'Setup {app} config on {server} at {domain}') new_app(app, domain, server) logging.info(f'set configs for {app} at {instance}') update_configs(path, app_config) exchange_domains(instance_config, instance_config.keys(), path) @cli.command() @click.option('-a', '--apps', multiple=True) def secrets(apps): """ Configure the apps """ for instance, instance_config in CONFIGS.items(): if not apps: apps = instance_config.keys() for app in apps: if app not in instance_config: logging.error(f"Could not find any '{app}' configuration for instance {instance}") exit(1) app_config = instance_config[app] domain = app_config['app_domain'] print(f"Create secrets for {domain}") insert_secrets_from_conf(domain, app_config) exchange_secrets(app, instance_config, apps) generate_all_secrets(domain) def get_deployed_apps(apps): deployed_apps = [] processed_server = [] for _, instance_config in CONFIGS.items(): if not apps: apps = instance_config.keys() for app in apps: server = instance_config[app]['server'] if server in processed_server: continue processed_server.append(server) deployed = abra("app", "ls", "-S", "-s", server, "-m", machine_output=True) deployed_domains = [app["domain"] for app in deployed[server]["apps"] if app["status"] == "deployed"] deployed_apps.extend(deployed_domains) return deployed_apps @cli.command() @click.option('-a', '--apps', multiple=True) @click.option('-r', '--run-cmds', is_flag=True) def deploy(apps, run_cmds): """ Deploy all the apps """ deployed_domains = get_deployed_apps(apps) for _, instance_config in CONFIGS.items(): if not apps: apps = instance_config.keys() for app in apps: app_config = instance_config[app] domain = app_config['app_domain'] if domain in deployed_domains: print(f"{domain} is already deployed") continue version = app_config.get('version') if not version: version = 'latest' cmd = ["deploy", "-n"] if version == 'chaos': cmd.append("-chaos") if not run_cmds: cmd.append("--no-converge-checks") cmd.append(domain) if version not in ['latest', 'chaos']: cmd.append(version) print(f'deploy {domain} with version "{version}"') print(abra("app", *cmd)) if run_cmds: logging.info(f'execute commands for {domain}') execute_cmds(app_config) @cli.command() @click.option('-a', '--apps', multiple=True) def undeploy(apps): """ Undeploy all the apps """ deployed_domains = get_deployed_apps(apps) for _, instance_config in CONFIGS.items(): if not apps: apps = instance_config.keys() for app in apps: app_config = instance_config[app] domain = app_config['app_domain'] if domain not in deployed_domains: print(f"{domain} is not deployed") continue print(f'undeploy {domain}') print(abra("app", "undeploy", "-n", domain)) @cli.command() @click.option('-a', '--apps', multiple=True) def cmds(apps): """ execute all post deploy cmds """ deployed_domains = get_deployed_apps(apps) for _, instance_config in CONFIGS.items(): if not apps: apps = instance_config.keys() for app in apps: app_config = instance_config[app] domain = app_config['app_domain'] if domain not in deployed_domains: print(f"{domain} is not deployed") continue logging.info(f'execute commands for {domain}') execute_cmds(app_config) @cli.command() @click.option('-a', '--apps', multiple=True) def purge(apps): """ Completely remove all the apps """ # TODO: check for deployed apps pool_apps = print_all_apps(apps) domains = list(zip(*sum(pool_apps.values(), [])))[1] if input(f"Do you really want to purge these apps? Type YES: ") == "YES": for domain in domains: logging.info(f'purge {domain}') abra("app", "rm", "-n", domain) print(f"{domain} purged") @cli.command() @click.option('-a', '--apps', multiple=True) def ls(apps): """ List all the apps """ print_all_apps(apps) def print_all_apps(apps): pool_apps = list_apps(apps) for instance, instance_apps in pool_apps.items(): print(instance) print(tabulate(instance_apps)) print() return pool_apps def list_apps(apps): pool_apps = {} for instance, instance_config in CONFIGS.items(): pool_apps[instance] = [] if not apps: apps = instance_config.keys() for app in apps: domain = instance_config[app]['app_domain'] pool_apps[instance].append((app, domain)) return pool_apps @cli.command() @click.option('-a', '--apps', multiple=True) def purge_secrets(apps): """ Remove all the apps secrets """ # TODO: check for deployed apps pool_apps = print_all_apps(apps) domains = list(zip(*sum(pool_apps.values(), [])))[1] if input(f"Do you really want to purge the secrets for these apps? Type YES: ") == "YES": for domain in domains: logging.info(f'purge {domain}') abra("app","secret" ,"rm", "-a", domain) print(f"Secrets for {domain} purged") if __name__ == '__main__': cli()