forked from moritz/alakazam
719 lines
27 KiB
Python
Executable File
719 lines
27 KiB
Python
Executable File
#!/bin/python3
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
import subprocess
|
|
import re
|
|
|
|
from tabulate import tabulate
|
|
import click
|
|
import dotenv
|
|
from icecream import ic
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from ruamel.yaml import YAML
|
|
from ruamel.yaml.constructor import SafeConstructor
|
|
from ruamel.yaml.nodes import ScalarNode
|
|
|
|
COMBINE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/combine.yml"
|
|
CONFIG_FILE_NAME = 'alaka.yaml'
|
|
CONFIGS = {}
|
|
|
|
|
|
"""Preserve quotes while parsing yaml files"""
|
|
class MySafeConstructor(SafeConstructor):
|
|
def construct_yaml_str(self, node):
|
|
value = self.construct_scalar(node)
|
|
if isinstance(node, ScalarNode) and (node.style == '"' or node.style == "'"):
|
|
# If node was quoted, keep quotes in the returned value
|
|
return '{}{}{}'.format(node.style, value, node.style)
|
|
# Otherwise, return value as is
|
|
return value
|
|
|
|
MySafeConstructor.add_constructor(
|
|
'tag:yaml.org,2002:str',
|
|
MySafeConstructor.construct_yaml_str)
|
|
|
|
|
|
def read_config(filepath):
|
|
filepath = Path(filepath).expanduser()
|
|
if not filepath.exists():
|
|
logging.warning(f"config file {filepath} does not exist")
|
|
return {}
|
|
yaml = YAML(typ='safe', pure=True) # Set type to 'safe' and use pure Python mode
|
|
yaml.Constructor = MySafeConstructor
|
|
with open(filepath) as file:
|
|
yaml_config = yaml.load(file)
|
|
if not yaml_config:
|
|
logging.warning(f"config file {filepath} is empty")
|
|
return {}
|
|
globals = yaml_config.get('GLOBALS')
|
|
jinja = Environment(loader=FileSystemLoader(
|
|
['/', '.']), trim_blocks=True, lstrip_blocks=True)
|
|
template = jinja.get_template(filepath.as_posix(), globals=globals)
|
|
return yaml.load(template.render())
|
|
|
|
|
|
""" Get value from nested dicts, return None if one of the keys does not exists """
|
|
def get_value(dict, *keys):
|
|
_element = dict
|
|
for key in keys:
|
|
try:
|
|
_element = _element[key]
|
|
except KeyError:
|
|
return
|
|
return _element
|
|
|
|
|
|
def merge_dict(dict1, dict2, reverse_list_order=False):
|
|
""" Merge two nested dicts recursively, the second overwrites the first one"""
|
|
merged_dict = dict1.copy()
|
|
for key, value in dict2.items():
|
|
if key in merged_dict and isinstance(value, dict) and isinstance(merged_dict[key], dict):
|
|
merged_dict[key] = merge_dict(merged_dict[key], value, reverse_list_order)
|
|
elif key in merged_dict and isinstance(value, list) and isinstance(merged_dict[key], list):
|
|
if reverse_list_order:
|
|
merged_list = value + merged_dict[key]
|
|
else:
|
|
merged_list = merged_dict[key] + value
|
|
unique_list = list(dict.fromkeys(merged_list)) # remove duplicates
|
|
merged_dict[key] = unique_list
|
|
else:
|
|
merged_dict[key] = value
|
|
return merged_dict
|
|
|
|
|
|
def merge_pool_configs(dir_path):
|
|
dir_path = Path(dir_path).absolute()
|
|
merged_configs = {}
|
|
for root, _, files in os.walk(dir_path):
|
|
no_config = True
|
|
for file in files:
|
|
if file == 'alaka.yaml':
|
|
file_path = os.path.join(root, file)
|
|
config = read_config(file_path)
|
|
# Merge the config with the merged config from the parent dir
|
|
if par_config := merged_configs.get(os.path.dirname(root)):
|
|
merged_configs[root] = merge_dict(par_config, config)
|
|
else:
|
|
merged_configs[root] = config
|
|
no_config = False
|
|
if no_config:
|
|
merged_configs[root] = merged_configs.get(os.path.dirname(root))
|
|
return merged_configs
|
|
|
|
def merge_instance_configs(pool_config, instance_domain, instance_config):
|
|
merged_config = {}
|
|
for app, app_config in instance_config.items():
|
|
if app_config and pool_config.get(app):
|
|
merged_config[app] = merge_dict(pool_config[app], app_config)
|
|
elif app_config:
|
|
merged_config[app] = app_config
|
|
elif pool_config.get(app):
|
|
merged_config[app] = pool_config[app].copy()
|
|
else:
|
|
merged_config[app] = {}
|
|
merged_config[app]['app_domain'] = map_subdomain(app, instance_domain, merged_config[app])
|
|
if not ((server:= get_value(merged_config, 'GLOBALS', 'server')) or (server:= get_value(pool_config, 'GLOBALS', 'server'))):
|
|
server = instance_domain
|
|
if not merged_config[app].get('server'):
|
|
merged_config[app]['server'] = server
|
|
if merged_config.get('GLOBALS'):
|
|
merged_config.pop('GLOBALS')
|
|
return merged_config
|
|
|
|
|
|
def map_subdomain(recipe, instance_domain, app_config):
|
|
if subdomain:= app_config.get('subdomain'):
|
|
domain = subdomain.replace("example.com", instance_domain)
|
|
else:
|
|
domain = f"{recipe}.{instance_domain}"
|
|
return domain
|
|
|
|
|
|
def get_merged_instance_configs(pool_path, pool_configs):
|
|
pool_path = Path(pool_path).absolute()
|
|
if pool_path.is_file():
|
|
parent_path = os.path.dirname(pool_path)
|
|
instance_config = read_config(pool_path)
|
|
domain = pool_path.name.removesuffix('.yml').removesuffix('.yaml')
|
|
merged_config = merge_instance_configs(pool_configs[parent_path], domain, instance_config)
|
|
return {domain: merged_config}
|
|
instances = {}
|
|
for root, _, files in os.walk(Path(pool_path)):
|
|
for file in files:
|
|
# This pattern matches for files of the format "<domain>.yml" or "<domain>.yaml"
|
|
pattern = r"^(?:[A-Za-z0-9](?:[A-Za-z0-9\-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,6}(?:\.yaml|\.yml)$"
|
|
if re.match(pattern, file):
|
|
instance_config = read_config(f'{root}/{file}')
|
|
domain = file.removesuffix('.yml').removesuffix('.yaml')
|
|
merged_config = merge_instance_configs(pool_configs[root], domain, instance_config)
|
|
instances[domain] = merged_config
|
|
return instances
|
|
|
|
|
|
def merge_connection_configs(configs):
|
|
connection_config = read_config(COMBINE_PATH)
|
|
extend_shared_secrets(connection_config)
|
|
merged_configs = configs.copy()
|
|
for _, instance_config in merged_configs.items():
|
|
for target_app, source_apps in connection_config.items():
|
|
for source_app, target_conf in source_apps.items():
|
|
if target_app in instance_config and source_app in instance_config:
|
|
instance_config[target_app] = merge_dict(target_conf, instance_config[target_app], reverse_list_order=True)
|
|
return merged_configs
|
|
|
|
|
|
"""Add a layer containing the source app"""
|
|
def extend_shared_secrets(connection_config):
|
|
for _, source_apps in connection_config.items():
|
|
for source_app, target_conf in source_apps.items():
|
|
if shared_secrets:= target_conf.get('shared_secrets'):
|
|
target_conf['shared_secrets'] = {source_app: shared_secrets}
|
|
|
|
|
|
def abra(*args, machine_output=False, ignore_error=False):
|
|
command = ["abra", *args]
|
|
if machine_output:
|
|
command.append("-m")
|
|
logging.debug(f"run command: {' '.join(command)}")
|
|
process = subprocess.run(command, capture_output=True)
|
|
if process.stderr:
|
|
logging.warning(process.stderr.decode())
|
|
if process.stdout:
|
|
logging.debug(process.stdout.decode())
|
|
if process.returncode and not ignore_error:
|
|
#breakpoint()
|
|
raise RuntimeError(
|
|
f'{" ".join(command)} \n STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}')
|
|
if machine_output:
|
|
return json.loads(process.stdout.decode())
|
|
return process.stdout.decode()
|
|
|
|
|
|
def write_env_header(path):
|
|
logging.debug(f'write header to {path}')
|
|
header = """################################################################################
|
|
# DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN #
|
|
################################################################################
|
|
|
|
"""
|
|
with open(path, "r+") as file:
|
|
old_content = file.read()
|
|
file.seek(0)
|
|
file.write(header + old_content)
|
|
|
|
|
|
def new_app(recipe, domain, server):
|
|
path = get_env_path(server, domain)
|
|
if path.exists():
|
|
print(f'remove {path}')
|
|
path.unlink()
|
|
logging.info(f'create {recipe} config on {server} at {domain}')
|
|
out = abra("app", "new", recipe, "-n", "-s", server, "-D", domain)
|
|
if not "app has been created" in out:
|
|
raise RuntimeError(f'App "{recipe}" creation failed')
|
|
else:
|
|
write_env_header(path)
|
|
logging.info(f'{recipe} created on {server} at {domain}')
|
|
|
|
|
|
def update_configs(path, config):
|
|
if uncomment_keys := config.get("uncomment"):
|
|
uncomment(uncomment_keys, path, True)
|
|
if comment_keys := config.get("comment"):
|
|
comment(comment_keys, path, True)
|
|
if envs := config.get("env"):
|
|
uncomment(envs.keys(), path)
|
|
for key, value in envs.items():
|
|
logging.debug(f'set {key}={value} in {path}')
|
|
dotenv.set_key(path, key, value, quote_mode="never")
|
|
|
|
|
|
def generate_all_secrets(domain):
|
|
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
|
|
if any("false" in line for line in stored_secrets):
|
|
logging.info(f"Generate all secrets for {domain}")
|
|
generated_secrets = abra("app", "secret", "generate", "-a", domain)
|
|
print(f"secrets for {domain} generated")
|
|
print(generated_secrets)
|
|
|
|
|
|
def get_env_path(server, domain):
|
|
return Path(f"~/.abra/servers/{server}/{domain}.env").expanduser()
|
|
|
|
|
|
# def connect_apps(target_app):
|
|
# path = get_env_path(target_app)
|
|
# target_conf = COMBINE.get(target_app)
|
|
# for source_app in INSTANCE["apps"]:
|
|
# if target_conf and (configs := target_conf.get(source_app)):
|
|
# logging.info(f'connect {target_app} with {source_app}')
|
|
# update_configs(target_app, configs)
|
|
# if shared_secrets := configs.get("shared_secrets"):
|
|
# logging.info(
|
|
# f'share secrets between {target_app} and {source_app}')
|
|
# share_secrets(target_app, source_app, shared_secrets)
|
|
# insert_domains(path, source_app)
|
|
|
|
|
|
def exchange_secrets(app1, instance_config, apps):
|
|
#TODO: check this function
|
|
app1_config = instance_config[app1]
|
|
app1_domain = app1_config['app_domain']
|
|
for app2 in apps:
|
|
app2_config = instance_config[app2]
|
|
app2_domain = app2_config['app_domain']
|
|
if app1_shared_secrets := get_value(app1_config, "shared_secrets", app2):
|
|
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
|
|
share_secrets(app1_domain, app2_domain, app1_shared_secrets)
|
|
if app2_shared_secrets := get_value(app2_config, "shared_secrets", app1):
|
|
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
|
|
share_secrets(app2_domain, app1_domain, app2_shared_secrets)
|
|
|
|
|
|
def str2bool(value):
|
|
return value.lower() in ("yes", "true", "t", "1")
|
|
|
|
def share_secrets(app1_domain, app2_domain, secrets):
|
|
app1_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app1_domain, machine_output=True)
|
|
app1_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app1_stored_secrets}
|
|
app2_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app2_domain, machine_output=True)
|
|
app2_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app2_stored_secrets}
|
|
for app2_secret in secrets:
|
|
app1_secret = secrets[app2_secret]
|
|
# TODO: test if both apps have the secret available
|
|
try:
|
|
app1_secret_is_stored = app1_stored_secrets[app1_secret]
|
|
except KeyError:
|
|
logging.error(f"{app1_domain} does not contain secret {app1_secret}")
|
|
continue
|
|
try:
|
|
app2_secret_is_stored = app2_stored_secrets[app2_secret]
|
|
except KeyError:
|
|
logging.error(f"{app2_domain} does not contain secret {app2_secret}")
|
|
continue
|
|
if app1_secret_is_stored and not app2_secret_is_stored:
|
|
secret = get_secret(app1_domain, app1_secret)
|
|
insert_secret(app2_domain, app2_secret, secret)
|
|
elif app2_secret_is_stored and not app1_secret_is_stored:
|
|
secret = get_secret(app2_domain, app2_secret)
|
|
insert_secret(app1_domain, app1_secret, secret)
|
|
elif not any([app1_secret_is_stored, app2_secret_is_stored]):
|
|
secret = generate_secret(app1_domain, app1_secret)
|
|
insert_secret(app2_domain, app2_secret, secret)
|
|
|
|
|
|
def get_secret(domain, secret_name):
|
|
# TODO: use "abra secret get <secret_name>"
|
|
secret = abra("app", "run", domain, "worker", "cat", f"/var/run/secrets/{secret_name}")
|
|
return secret
|
|
|
|
def generate_secret(domain, secret_name):
|
|
secret = abra("app", "secret", "generate", domain, secret_name, "v1", machine_output=True)
|
|
return secret[0]['value']
|
|
|
|
|
|
def insert_secrets_from_conf(domain, config):
|
|
logging.info(f"Insert secrets for {domain}")
|
|
if secrets := config.get("secrets"):
|
|
for secret_name, secret in secrets.items():
|
|
insert_secret(domain, secret_name, secret)
|
|
|
|
|
|
def unqote_strings(s):
|
|
if s.startswith('"') and s.endswith('"'):
|
|
return s[1:-1]
|
|
elif s.startswith("'") and s.endswith("'"):
|
|
return s[1:-1]
|
|
else:
|
|
return s
|
|
|
|
|
|
def insert_secret(domain, secret_name, secret):
|
|
# TODO parse json
|
|
stored_secrets = abra("app", "secret", "ls", "-C", domain).splitlines()
|
|
# Fix extra quotes around secrets
|
|
secret = unqote_strings(secret)
|
|
if not any(secret_name in line and "true" in line for line in stored_secrets):
|
|
logging.info(f"Insert secret {secret_name}: {secret} into {domain}")
|
|
abra("app", "secret", "insert", domain, secret_name, "v1", secret)
|
|
|
|
|
|
def uncomment(keys, path, match_all=False):
|
|
#TODO: fix variablennamen vs inline regex (siehe backupbot)
|
|
logging.debug(f'Uncomment {keys} in {path}')
|
|
with open(path, "r") as file:
|
|
lines = file.readlines()
|
|
with open(path, "w") as file:
|
|
for line in lines:
|
|
line_match = line.split("=")[0] # Match only keys
|
|
if match_all:
|
|
line_match = line
|
|
if ('=' in line) and any(key in line_match for key in keys):
|
|
line = line.lstrip("#").lstrip()
|
|
file.write(line)
|
|
|
|
|
|
def comment(keys, path, match_all=False):
|
|
logging.debug(f'Comment {keys} in {path}')
|
|
with open(path, "r") as file:
|
|
lines = file.readlines()
|
|
with open(path, "w") as file:
|
|
for line in lines:
|
|
line_match = line.split("=")[0] # Match only keys
|
|
if match_all:
|
|
line_match = line
|
|
if any(key in line_match for key in keys):
|
|
line = line.lstrip("#").lstrip()
|
|
line = f"#{line}"
|
|
file.write(line)
|
|
|
|
|
|
def exchange_domains(instance, instance_config, path):
|
|
for app in instance_config:
|
|
old_app_domain = f'{app}.example.com'
|
|
new_app_domain = instance_config[app]['app_domain']
|
|
replace_domains(path, old_app_domain, new_app_domain)
|
|
replace_domains(path, 'example.com', instance)
|
|
|
|
|
|
|
|
def replace_domains(path, old_domain, new_domain):
|
|
logging.debug(f'replace all {old_domain} with {new_domain} in {path}')
|
|
with open(path, "r") as file:
|
|
content = file.read()
|
|
content = content.replace(f"{old_domain}", new_domain)
|
|
with open(path, "w") as file:
|
|
file.write(content)
|
|
|
|
|
|
def list_commands(app_config):
|
|
domain = app_config['app_domain']
|
|
if not (all_cmds:= app_config.get('execute')):
|
|
logging.info(f"No post deploy cmds for {domain}")
|
|
return
|
|
for cmd in all_cmds:
|
|
container = cmd.split()[0]
|
|
cmd = cmd.split()[1:]
|
|
print(f"{domain}:{container} --> '{cmd}'")
|
|
|
|
|
|
def execute_cmds(app_config):
|
|
domain = app_config['app_domain']
|
|
if not (all_cmds:= app_config.get('execute')):
|
|
logging.info(f"No post deploy cmds for {domain}")
|
|
return
|
|
for cmd in all_cmds:
|
|
container = cmd.split()[0]
|
|
cmd = cmd.split()[1:]
|
|
print(f"Run '{cmd}' in {domain}:{container}")
|
|
if container == "local":
|
|
print(abra("app", "cmd", "--local", domain, *cmd, ignore_error=True))
|
|
else:
|
|
print(abra("app", "cmd", domain, container, *cmd, ignore_error=True))
|
|
|
|
|
|
@click.group()
|
|
@click.option('-l', '--log', 'loglevel')
|
|
@click.option('-p', '--pool_path', 'pool_path')
|
|
@click.option('-c', '--config_path', 'config_path', default=".")
|
|
def cli(loglevel, pool_path, config_path):
|
|
global CONFIGS
|
|
pool_configs = merge_pool_configs(config_path)
|
|
if not Path(pool_path).exists():
|
|
logging.error(f"{pool_path} does not exists! Are you in the correct directory?")
|
|
exit(1)
|
|
instance_configs = get_merged_instance_configs(pool_path, pool_configs)
|
|
CONFIGS = merge_connection_configs(instance_configs)
|
|
if loglevel:
|
|
numeric_level = getattr(logging, loglevel.upper(), None)
|
|
if not isinstance(numeric_level, int):
|
|
raise ValueError('Invalid log level: %s' % loglevel)
|
|
logging.basicConfig(level=numeric_level)
|
|
|
|
|
|
#@cli.command()
|
|
#def init_server():
|
|
# """ Initialize the server """
|
|
# new_app("traefik")
|
|
# new_app("backup-bot-two")
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def setup(apps):
|
|
pass
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def config(apps):
|
|
""" Configure the apps """
|
|
for instance, instance_config in CONFIGS.items():
|
|
if apps:
|
|
selected_apps = []
|
|
for app in apps:
|
|
if app in instance_config.keys():
|
|
selected_apps.append(app)
|
|
else:
|
|
logging.error(f' App config \'{app}\' not found for {instance}!')
|
|
exit(1)
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
server = app_config["server"]
|
|
path = get_env_path(server, domain)
|
|
print(f'Setup {app} config on {server} at {domain}')
|
|
new_app(app, domain, server)
|
|
logging.info(f'set configs for {app} at {instance}')
|
|
update_configs(path, app_config)
|
|
exchange_domains(instance, instance_config, path)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def secrets(apps):
|
|
""" Configure the apps """
|
|
for instance, instance_config in CONFIGS.items():
|
|
instance_apps = instance_config.keys()
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
print(f"Create secrets for {domain}")
|
|
insert_secrets_from_conf(domain, app_config)
|
|
exchange_secrets(app, instance_config, instance_apps)
|
|
generate_all_secrets(domain)
|
|
|
|
|
|
def get_deployed_apps(apps):
|
|
deployed_apps = {}
|
|
processed_server = []
|
|
for _, instance_config in CONFIGS.items():
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
server = instance_config[app]['server']
|
|
if server in processed_server:
|
|
continue
|
|
processed_server.append(server)
|
|
deployed = abra("app", "ls", "-S", "-s", server, "-m", machine_output=True)
|
|
deployed_app_versions = {app["appName"]: app["version"] for app in deployed[server]["apps"] if app["status"] == "deployed"}
|
|
deployed_apps.update(deployed_app_versions)
|
|
return deployed_apps
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
@click.option('-r', '--run-cmds', is_flag=True)
|
|
@click.option('-f', '--force', is_flag=True)
|
|
def deploy(apps, run_cmds, force):
|
|
""" Deploy all the apps """
|
|
deployed_domains = get_deployed_apps(apps)
|
|
for _, instance_config in CONFIGS.items():
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
if domain in deployed_domains and not force:
|
|
print(f"{domain} is already deployed")
|
|
continue
|
|
version = app_config.get('version')
|
|
if not version:
|
|
version = 'latest'
|
|
cmd = ["deploy", "-n"]
|
|
if version == 'chaos':
|
|
cmd.append("--chaos")
|
|
if force:
|
|
cmd.append("--force")
|
|
if not run_cmds:
|
|
cmd.append("--no-converge-checks")
|
|
cmd.append(domain)
|
|
if version not in ['latest', 'chaos']:
|
|
cmd.append(version)
|
|
print(f'deploy {domain} with version "{version}"')
|
|
print(abra("app", *cmd))
|
|
if run_cmds:
|
|
logging.info(f'execute commands for {domain}')
|
|
execute_cmds(app_config)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
@click.option('-r', '--run-cmds', is_flag=True)
|
|
def upgrade(apps, run_cmds):
|
|
""" Deploy all the apps """
|
|
deployed_domains = get_deployed_apps(apps)
|
|
for _, instance_config in CONFIGS.items():
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
if domain not in deployed_domains:
|
|
print(f"{domain} is not deployed")
|
|
continue
|
|
version = app_config.get('version')
|
|
if not version:
|
|
version = 'latest'
|
|
cmd = ["upgrade", "-n"]
|
|
if version == 'chaos':
|
|
cmd.append("--chaos")
|
|
if not run_cmds:
|
|
cmd.append("--no-converge-checks")
|
|
cmd.append(domain)
|
|
if version not in ['latest', 'chaos']:
|
|
cmd.append(version)
|
|
deployed_version = deployed_domains[domain]
|
|
if version == deployed_version:
|
|
print(f"{domain} is already at version {version}")
|
|
continue
|
|
print(f'upgrade {domain} from version {deployed_version} to version "{version}"')
|
|
#print(abra("app", *cmd))
|
|
if run_cmds:
|
|
logging.info(f'execute commands for {domain}')
|
|
execute_cmds(app_config)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def undeploy(apps):
|
|
""" Undeploy all the apps """
|
|
deployed_domains = get_deployed_apps(apps)
|
|
for _, instance_config in CONFIGS.items():
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
if domain not in deployed_domains:
|
|
print(f"{domain} is not deployed")
|
|
continue
|
|
print(f'undeploy {domain}')
|
|
print(abra("app", "undeploy", "-n", domain))
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def cmds(apps):
|
|
""" execute all post deploy cmds """
|
|
deployed_domains = get_deployed_apps(apps)
|
|
for _, instance_config in CONFIGS.items():
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
if domain not in deployed_domains:
|
|
print(f"{domain} is not deployed")
|
|
continue
|
|
logging.info(f'execute commands for {domain}')
|
|
execute_cmds(app_config)
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def list_cmds(apps):
|
|
""" execute all post deploy cmds """
|
|
for _, instance_config in CONFIGS.items():
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
list_commands(app_config)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def purge(apps):
|
|
""" Completely remove all the apps """
|
|
# TODO: check for deployed apps
|
|
pool_apps = print_all_apps(apps)
|
|
domains = list(zip(*sum(pool_apps.values(), [])))[1]
|
|
if input(f"Do you really want to purge these apps? Type YES: ") == "YES":
|
|
for domain in domains:
|
|
logging.info(f'purge {domain}')
|
|
abra("app", "rm", "-n", domain)
|
|
print(f"{domain} purged")
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def ls(apps):
|
|
""" List all the apps """
|
|
print_all_apps(apps)
|
|
|
|
|
|
def print_all_apps(apps):
|
|
pool_apps = list_apps(apps)
|
|
for instance, instance_apps in pool_apps.items():
|
|
print(instance)
|
|
print(tabulate(instance_apps))
|
|
print()
|
|
return pool_apps
|
|
|
|
def list_apps(apps):
|
|
pool_apps = {}
|
|
for instance, instance_config in CONFIGS.items():
|
|
instance_app_domains = []
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
if app in instance_config:
|
|
domain = instance_config[app]['app_domain']
|
|
instance_app_domains.append((app, domain))
|
|
if instance_app_domains:
|
|
pool_apps[instance] = instance_app_domains
|
|
return pool_apps
|
|
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def purge_secrets(apps):
|
|
""" Remove all the apps secrets """
|
|
# TODO: check for deployed apps
|
|
pool_apps = print_all_apps(apps)
|
|
domains = list(zip(*sum(pool_apps.values(), [])))[1]
|
|
if input(f"Do you really want to purge the secrets for these apps? Type YES: ") == "YES":
|
|
for domain in domains:
|
|
logging.info(f'purge {domain}')
|
|
abra("app","secret" ,"rm", "-a", domain)
|
|
print(f"Secrets for {domain} purged")
|
|
|
|
@cli.command()
|
|
@click.option('-a', '--apps', multiple=True)
|
|
def purge_volumes(apps):
|
|
""" Remove all the apps secrets """
|
|
# TODO: check for deployed apps
|
|
pool_apps = print_all_apps(apps)
|
|
domains = list(zip(*sum(pool_apps.values(), [])))[1]
|
|
if input(f"Do you really want to purge the volumes for these apps? Type YES: ") == "YES":
|
|
for domain in domains:
|
|
logging.info(f'purge {domain}')
|
|
abra("app","volume" ,"rm", "-n", domain)
|
|
print(f"Volumes for {domain} purged")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
cli()
|