alakazam/alakazam.py

1147 lines
48 KiB
Python
Executable File

#!/bin/python3
import os
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import subprocess
import re
from tabulate import tabulate
import click
import dotenv
from icecream import ic
from jinja2 import Template
from ruamel.yaml import YAML
from ruamel.yaml.constructor import SafeConstructor
from ruamel.yaml.nodes import ScalarNode
COMBINE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/combine.yml"
CONFIGS = {}
class MySafeConstructor(SafeConstructor):
"""
A custom YAML constructor that preserves quotes in YAML strings when parsing.
"""
def construct_yaml_str(self, node):
"""
Overrides the base scalar constructor to preserve the quotes around the YAML strings based on the original style of the node.
"""
value = self.construct_scalar(node)
if isinstance(node, ScalarNode) and (node.style == '"' or node.style == "'"):
# If node was quoted, keep quotes in the returned value
return '{}{}{}'.format(node.style, value, node.style)
# Otherwise, return value as is
return value
MySafeConstructor.add_constructor(
'tag:yaml.org,2002:str',
MySafeConstructor.construct_yaml_str)
def read_config(configpath: str) -> Dict[str, Any]:
"""
Reads a YAML configuration file from the specified configpath and returns the configuration as a dictionary, including processing any Jinja2 templating specified within the file. It handles the loading of YAML files, processes any Jinja2 templating using the 'GLOBALS' section for variables, and converts the file into a dictionary format.
Args:
configpath (str): The path to the YAML configuration file. The file should contain valid YAML and optional Jinja2 templating.
Returns:
dict: The configuration data extracted and processed from the YAML file, structured as a dictionary.
"""
filepath = Path(configpath).expanduser()
if not filepath.exists():
logging.warning(f"config file {filepath} does not exist")
return {}
yaml = YAML(typ='safe', pure=True) # Set type to 'safe' and use pure Python mode
yaml.Constructor = MySafeConstructor
with open(filepath) as file:
yaml_config = yaml.load(file)
if not yaml_config:
logging.warning(f"config file {filepath} is empty")
return {}
return yaml_config
# globals = yaml_config.get('GLOBALS')
# jinja = Environment(loader=FileSystemLoader(
# ['/', '.']), trim_blocks=True, lstrip_blocks=True)
# template = jinja.get_template(filepath.as_posix(), globals=globals)
# return yaml.load(template.render())
def get_value(dictionary: Dict[Any, Any], *keys: str) -> Optional[Any]:
"""
Retrieves a nested value from a dictionary using a series of keys. This function is useful for fetching deep values in a dictionary structure without manually checking each level of the dictionary.
Args:
dict (dict): The dictionary from which to extract the value.
*keys: A series of keys that define the path to the desired value within the dictionary.
Returns:
The value found at the specified path in the dictionary, or None if the path is not present.
"""
_element = dictionary
for key in keys:
try:
_element = _element[key]
except KeyError:
return
return _element
def merge_dict(dict1: Dict[Any, Any], dict2: Dict[Any, Any], reverse_list_order: bool = False) -> Dict[Any, Any]:
"""
Merges two dictionaries, combining their key-value pairs. Where keys overlap, values from dict2 will overwrite those from dict1.
If reverse_list_order is True and the values are lists, the order of elements in lists from dict2 is reversed before merging.
Args:
dict1 (dict): The first dictionary, whose values are overwritten by dict2's values for overlapping keys.
dict2 (dict): The second dictionary, whose values will overwrite dict1's for overlapping keys.
reverse_list_order (bool): If True, reverses the order of any lists found as values before merging.
Returns:
dict: A new dictionary containing the merged key-value pairs.
"""
merged_dict = dict1.copy()
for key, value in dict2.items():
if key in merged_dict and isinstance(value, dict) and isinstance(merged_dict[key], dict):
merged_dict[key] = merge_dict(merged_dict[key], value, reverse_list_order)
elif key in merged_dict and isinstance(value, list) and isinstance(merged_dict[key], list):
if reverse_list_order:
merged_list = value + merged_dict[key]
else:
merged_list = merged_dict[key] + value
unique_list = list(dict.fromkeys(merged_list)) # remove duplicates
merged_dict[key] = unique_list
else:
merged_dict[key] = value
return merged_dict
def merge_all_group_configs(config_path: str) -> Dict[str, Dict[str, Any]]:
"""
Recursively merges 'alaka.yml/alaka-*.yml' files within a specified directory into a single comprehensive configuration dictionary.
Configurations at higher directory levels get inherited and potentially overridden by configurations in lower directory levels.
Args:
dir_path (str): The path to the directory containing hierarchical 'alaka.yml/alaka-*.yml' configuration files. The directory should follow an organizational structure where each subdirectory can contain 'alaka.yml/alaka-*.yml' files that inherits and potentially overrides settings from its parent directory's 'alaka.yml/alaka-*.yml' files.
Returns:
dict: A dictionary representing the merged configurations from all the 'alaka.yml/alaka-*.yml' files found in the directory hierarchy.
The dictionary's keys are the paths of the directories relative to the root directory specified by 'dir_path', indicating the source of the configurations. Each key maps to its respective merged configuration dictionary, which includes all inherited and overridden settings from higher-level directories down to the specified directory.
"""
dir_path = Path(config_path).absolute()
merged_configs = {}
for root, _, files in os.walk(dir_path):
no_config_found = True
for file in files:
if re.match(r'^alaka(-.*)?\.ya?ml$', file):
file_path = os.path.join(root, file)
config = read_config(file_path)
if dir_config := merged_configs.get(root):
# Merge the config with the merged config from the current dir
merged_configs[root] = merge_dict(dir_config, config)
elif par_config := merged_configs.get(os.path.dirname(root)):
# Merge the config with the merged config from the parent dir
merged_configs[root] = merge_dict(par_config, config)
else:
merged_configs[root] = config
no_config_found = False
if no_config_found:
merged_configs[root] = merged_configs.get(os.path.dirname(root))
return merged_configs
def substitute_jinja_variable(jinja_dict, subs_dict) -> None:
"""
This function recursively traverses the given jinja_dict and wherever it finds a jinja template variable, it replaces it with the corresponding value from the subs_dict.
Args:
jinja_dict (dict): The dictionary which may contain jinja template variables. Can be a nested dictionary.
subs_dict (dict): The dictionary containing the substitutions for the jinja template variables.
"""
for key, value in jinja_dict.items():
if isinstance(value, dict): # If value itself is dictionary
substitute_jinja_variable(value, subs_dict) # Recursive call
else:
if "{{" in str(value) and "}}" in str(value): # If value is a jinja template
template = Template(str(value))
jinja_dict[key] = template.render(subs_dict)
def merge_instance_configs(group_config: Dict[str, Any], instance_domain: str, instance_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge instance-specific configurations with group configurations to produce a consolidated configuration dictionary.
This considers domain-specific overrides and merges them appropriately with the defaults specified at the group level.
Args:
group_config (dict): A dictionary containing the group configurations from higher hierarchy levels.
instance_domain (str): The domain associated with the instance, used for subdomain mappings.
instance_config (dict): The configuration dictionary specific to an instance.
Returns:
dict: A dictionary with merged configuration for the instance, including domain mappings and server settings.
"""
merged_config = {}
for app, app_config in instance_config.items():
if app_config and group_config.get(app):
merged_config[app] = merge_dict(group_config[app], app_config)
elif app_config:
merged_config[app] = app_config
elif group_config.get(app):
merged_config[app] = group_config[app].copy()
else:
merged_config[app] = {}
merged_config[app]['app_domain'] = map_subdomain(app, instance_domain, merged_config[app])
if not ((server:= get_value(merged_config, 'GLOBALS', 'server')) or (server:= get_value(group_config, 'GLOBALS', 'server'))):
server = instance_domain
if not merged_config[app].get('server'):
merged_config[app]['server'] = server
if not (global_vars:= merged_config.get('GLOBALS')):
global_vars = group_config.get('GLOBALS')
substitute_jinja_variable(merged_config, global_vars)
if merged_config.get('GLOBALS'):
merged_config.pop('GLOBALS')
return merged_config
def map_subdomain(recipe: str, instance_domain: str, app_config: Dict[str, Any]) -> str:
"""
Maps a subdomain for an app based on the recipe, instance domain, and specific app configuration.
It dynamically replaces the placeholder domain with the actual instance domain or constructs
a subdomain using the recipe name and instance domain.
Args:
recipe (str): The name of the app recipe.
instance_domain (str): The domain associated with the instance.
app_config (dict): Configuration for the specific app which might include a subdomain setting.
Returns:
str: The fully qualified domain name for the app.
"""
if subdomain:= app_config.get('subdomain'):
domain = subdomain.replace("example.com", instance_domain)
else:
domain = f"{recipe}.{instance_domain}"
return domain
def get_merged_instance_configs(config_path: str, group_configs: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""
Traverse a directory structure to read and merge all YAML configuration files for each instance.
This function supports a hierarchical configuration approach by aggregating configurations across directories.
Args:
config_path (str): The path to the directory containing instance-specific YAML files.
group_configs (dict): A dictionary containing group configurations from higher hierarchy levels.
Returns:
dict: A dictionary with domains as keys and their respective merged configurations as values.
"""
_config_path = Path(config_path).absolute()
if _config_path.is_file():
parent_path = os.path.dirname(_config_path)
instance_config = read_config(str(_config_path))
domain = _config_path.name.removesuffix('.yml').removesuffix('.yaml')
merged_config = merge_instance_configs(group_configs[parent_path], domain, instance_config)
return {domain: merged_config}
instances = {}
for root, _, files in os.walk(Path(_config_path)):
for file in files:
# This pattern matches for files of the format "<domain>.yml" or "<domain>.yaml"
pattern = r"^(?:[A-Za-z0-9](?:[A-Za-z0-9\-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,6}(?:\.yaml|\.yml)$"
if re.match(pattern, file):
instance_config = read_config(f'{root}/{file}')
domain = file.removesuffix('.yml').removesuffix('.yaml')
merged_config = merge_instance_configs(group_configs[root], domain, instance_config)
instances[domain] = merged_config
return instances
def merge_connection_configs(configs: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge connection configurations from the 'combine.yml' to extend instance configurations with inter-app secrets and settings.
This involves integrating shared secrets and other connection-specific settings between applications within the same instance.
Args:
configs (dict): The initial instance configurations before integrating connection-specific adjustments.
Returns:
dict: The updated instance configurations after applying the connection settings.
"""
connection_config = read_config(COMBINE_PATH)
extend_shared_secrets(connection_config)
merged_configs = configs.copy()
for _, instance_config in merged_configs.items():
for target_app, source_apps in connection_config.items():
for source_app, target_conf in source_apps.items():
if target_app in instance_config and source_app in instance_config:
instance_config[target_app] = merge_dict(target_conf, instance_config[target_app], reverse_list_order=True)
return merged_configs
def extend_shared_secrets(connection_config: Dict[str, Any]) -> None:
"""
Extends connection configurations by embedding source app details into the shared secrets configuration.
This modifies the existing connection configurations in place, adding a layer of source app information to shared secrets.
Args:
connection_config (dict): Connection configurations which involve shared secrets.
"""
for _, source_apps in connection_config.items():
for source_app, target_conf in source_apps.items():
if shared_secrets:= target_conf.get('shared_secrets'):
target_conf['shared_secrets'] = {source_app: shared_secrets}
def abra(*args: str, machine_output: bool = False, ignore_error: bool = False) -> Union[str,Dict]:
"""
Execute the 'abra' command with the specified arguments. This function acts as a wrapper around the 'abra' CLI tool. It allows for capturing the output and optionally returning it as machine-readable JSON.
Args:
*args: Variable length argument list representing the 'abra' command and its parameters.
machine_output (bool): If True, expects the output in JSON format and parses it before returning.
ignore_error (bool): If True, suppresses the raising of errors on non-zero return codes, otherwise an exception is raised.
Returns:
str or dict: Returns the output from the 'abra' command. If machine_output is True, returns a dictionary, otherwise returns raw output as a string.
Raises:
RuntimeError: If the 'abra' command exits with a non-zero return code and ignore_error is False, including the command output in the error.
"""
command = ["abra", *args]
if machine_output:
command.append("-m")
logging.debug(f"run command: {' '.join(command)}")
process = subprocess.run(command, capture_output=True)
if process.stderr:
logging.warning(process.stderr.decode())
if process.stdout:
logging.debug(process.stdout.decode())
if process.returncode and not ignore_error:
#breakpoint()
raise RuntimeError(
f'{" ".join(command)} \n STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}')
if machine_output:
return json.loads(process.stdout.decode())
return process.stdout.decode()
def write_env_header(path: str) -> None:
"""
Writes a header comment at the top of an environment file to indicate that it is generated automatically and should not be manually edited.
This function ensures that anyone modifying the .env file is aware that changes might be overwritten by subsequent automated processes.
Args:
path (str): Path to the .env file that will receive the header.
"""
logging.debug(f'write header to {path}')
header = """################################################################################
# DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN #
################################################################################
"""
with open(path, "r+") as file:
old_content = file.read()
file.seek(0)
file.write(header + old_content)
def new_app(recipe: str, domain: str, server: str, version: str) -> None:
"""
Generate a new application .env by calling the 'abra' command with the specified parameters.
If the app .env already exists, it is removed before the new app is created.
Args:
recipe (str): The recipe name for the application.
domain (str): The domain under which the app will be deployed.
server (str): The server on which the app will be deployed.
version (str): The version of the app to deploy. If set to 'chaos' or None, it deploys the chaos version.
Raises:
RuntimeError: If the app creation command does not successfully report creation.
"""
path = get_env_path(server, domain)
if path.exists():
print(f'remove {path}')
path.unlink()
logging.info(f'create {recipe} config on {server} at {domain}')
if version in ['chaos', None]:
version = '--chaos'
out = abra("app", "new", recipe, version, "-n", "-s", server, "-D", domain )
if not "app has been created" in out:
raise RuntimeError(f'App "{recipe}" creation failed')
else:
write_env_header(path)
logging.info(f'{recipe} created on {server} at {domain}')
def update_configs(path: str, config: Dict[str, Any]) -> None:
"""
Update the .env configuration files at the specified path according to the provided configuration dictionary.
This function manages the commenting, uncommenting, and setting of environment variables as specified in the config dictionary.
Args:
path (str): Path to the .env configuration file to be updated.
config (dict): A dictionary containing the configurations to apply, which may include keys for 'uncomment', 'comment', and 'env' to manage the environment variables and other settings.
"""
if uncomment_keys := config.get("uncomment"):
uncomment(uncomment_keys, path, True)
if comment_keys := config.get("comment"):
comment(comment_keys, path, True)
if envs := config.get("env"):
uncomment(envs.keys(), path)
for key, value in envs.items():
logging.debug(f'set {key}={value} in {path}')
dotenv.set_key(path, key, value, quote_mode="never")
def generate_all_secrets(domain: str) -> None:
"""
Generates all secrets for the app specified by its domain using the 'abra' command.
Args:
domain (str): The domain associated with the application for which secrets need to be generated.
Returns:
None: This function outputs directly to the console the results of the secrets generation process.
"""
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
if any("false" in line for line in stored_secrets):
logging.info(f"Generate all secrets for {domain}")
generated_secrets = abra("app", "secret", "generate", "-a", domain)
print(f"secrets for {domain} generated")
print(generated_secrets)
def get_env_path(server: str, domain: str) -> str:
return Path(f"~/.abra/servers/{server}/{domain}.env").expanduser()
def exchange_secrets(app1: str, instance_config: Dict[str, Any], apps: List[str]) -> None:
"""
Facilitates the exchange of shared secrets between apps within the same instance based on the configuration.
This function checks for shared secrets configurations and applies them to ensure that secrets are synchronized between apps.
Args:
app1 (str): The first app to participate in the secret exchange.
instance_config (dict): Configuration of the instance including all apps.
apps (list): List of other apps in the instance to potentially share secrets with.
Returns:
None
"""
app1_config = instance_config[app1]
app1_domain = app1_config['app_domain']
for app2 in apps:
app2_config = instance_config[app2]
app2_domain = app2_config['app_domain']
if app1_shared_secrets := get_value(app1_config, "shared_secrets", app2):
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
share_secrets(app1_domain, app2_domain, app1_shared_secrets)
if app2_shared_secrets := get_value(app2_config, "shared_secrets", app1):
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
share_secrets(app2_domain, app1_domain, app2_shared_secrets)
def str2bool(value: str) -> bool:
return value.lower() in ("yes", "true", "t", "1")
def share_secrets(app1_domain: str, app2_domain: str, secrets: Dict[str, str]) -> None:
"""
Facilitates the sharing of secrets between two applications.
This function checks and transfers secrets from one applications to another if one applications possesses a secret that the other lacks, ensuring both applications maintain synchronized secret configurations.
Args:
app1_domain (str): The domain of the first application.
app2_domain (str): The domain of the second application.
secrets (dict): A dictionary mapping secret names in app2 to their corresponding names in app1.
"""
app1_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app1_domain, machine_output=True)
app1_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app1_stored_secrets}
app2_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app2_domain, machine_output=True)
app2_stored_secrets = {x['name']: str2bool(x['created-on-server']) for x in app2_stored_secrets}
for app2_secret in secrets:
app1_secret = secrets[app2_secret]
# TODO: test if both apps have the secret available
try:
app1_secret_is_stored = app1_stored_secrets[app1_secret]
except KeyError:
logging.error(f"{app1_domain} does not contain secret {app1_secret}")
continue
try:
app2_secret_is_stored = app2_stored_secrets[app2_secret]
except KeyError:
logging.error(f"{app2_domain} does not contain secret {app2_secret}")
continue
if app1_secret_is_stored and not app2_secret_is_stored:
secret = get_secret(app1_domain, app1_secret)
insert_secret(app2_domain, app2_secret, secret)
elif app2_secret_is_stored and not app1_secret_is_stored:
secret = get_secret(app2_domain, app2_secret)
insert_secret(app1_domain, app1_secret, secret)
elif not any([app1_secret_is_stored, app2_secret_is_stored]):
secret = generate_secret(app1_domain, app1_secret)
insert_secret(app2_domain, app2_secret, secret)
def get_secret(domain: str, secret_name: str) -> str:
"""
Retrieves a specified secret from the application specified by its domain using the 'abra' command.
This function is useful for fetching secrets that need to be shared or updated between different applications.
Args:
domain (str): The app domain from which the secret is to be retrieved.
secret_name (str): The name of the secret to retrieve.
Returns:
str: The value of the retrieved secret.
"""
secret = abra("app", "run", domain, "worker", "cat", f"/var/run/secrets/{secret_name}")
return secret
def generate_secret(domain: str, secret_name: str) -> str:
"""
Generates a new secret with a specified name for a given app domain. This function is utilized when a required secret is missing or needs to be regenerated.
Args:
domain (str): The app domain for which the secret is to be generated.
secret_name (str): The name of the secret to generate.
Returns:
str: The value of the newly generated secret.
"""
secret = abra("app", "secret", "generate", domain, secret_name, "v1", machine_output=True)
return secret[0]['value']
def insert_secrets_from_conf(domain: str, config: Dict[str, Any]) -> None:
"""
Inserts secrets into the specified app based on a configuration dictionary.
Args:
domain (str): The app domain into which the secrets are to be inserted.
config (dict): A dictionary containing the secrets and their corresponding values to insert.
"""
logging.info(f"Insert secrets for {domain}")
if secrets := config.get("secrets"):
for secret_name, secret in secrets.items():
insert_secret(domain, secret_name, secret)
def unquote_strings(s: str) -> str:
"""
Removes surrounding single or double quotes from a string if present.
This is used to clean the quotes from strings extracted from configurations or secrets, ensuring that the values can be used directly without formatting issues.
Args:
s (str): The string from which quotes are to be removed.
Returns:
str: The unquoted string.
"""
if s.startswith('"') and s.endswith('"'):
return s[1:-1]
elif s.startswith("'") and s.endswith("'"):
return s[1:-1]
else:
return s
def insert_secret(domain: str, secret_name: str, secret: str) -> None:
"""
Inserts a secret for a specific app only if the secret does not already exists.
This function interacts with the 'abra' command to manage secrets on the server.
Args:
domain (str): The app domain within which the secret is sinserted.
secret_name (str): The name of the secret to insert.
secret (str): The value of the secret to be inserted.
"""
# TODO parse json
stored_secrets = abra("app", "secret", "ls", "-C", domain).splitlines()
# Fix extra quotes around secrets
secret = unquote_strings(secret)
if not any(secret_name in line and "true" in line for line in stored_secrets):
logging.info(f"Insert secret {secret_name}: {secret} into {domain}")
abra("app", "secret", "insert", domain, secret_name, "v1", secret)
def uncomment(keys: List[str], path: str, match_all: bool = False) -> None:
"""
Uncomments lines in a configuration file that contain specified keys.
If 'match_all' is True, it matches against the entire line, otherwise, it matches only against the key.
Args:
keys (list of str): The keys corresponding to the lines to be uncommented.
path (str): Path to the file where lines will be uncommented.
match_all (bool): Whether to match the keys against the entire line or just the beginning.
"""
logging.debug(f'Uncomment {keys} in {path}')
with open(path, "r") as file:
lines = file.readlines()
with open(path, "w") as file:
for line in lines:
line_match = line.split("=")[0] # Match only keys
if match_all:
line_match = line
if ('=' in line) and any(key in line_match for key in keys):
line = line.lstrip("#").lstrip()
file.write(line)
def comment(keys: List[str], path: str, match_all: bool = False) -> None:
"""
Comments lines in a configuration file that contain specified keys.
If 'match_all' is True, it matches against the entire line, otherwise, it matches only against the key.
Args:
keys (list of str): The keys corresponding to the lines to be commented.
path (str): Path to the file where lines will be commented.
match_all (bool): Whether to match the keys against the entire line or just the beginning.
"""
logging.debug(f'Comment {keys} in {path}')
with open(path, "r") as file:
lines = file.readlines()
with open(path, "w") as file:
for line in lines:
line_match = line.split("=")[0] # Match only keys
if match_all:
line_match = line
if any(key in line_match for key in keys):
line = line.lstrip("#").lstrip()
line = f"#{line}"
file.write(line)
def exchange_domains(instance_domain: str, instance_config: Dict[str, Any], path: str) -> None:
"""
Replaces all domain references in the specified .env configuration file based on the instance configuration. It is used to ensure that all references to an application's domain are consistent across various configuration files.
Args:
instance_domain (str): The domain of the instance.
instance_config (dict): Configuration details for the instance, including domain mappings.
path (str): File path of the .env configuration file to be updated.
Returns:
None: The function modifies the file in place and does not return a value.
"""
# Replace all app sepcific subdomains
for app in instance_config:
old_app_domain = f'{app}.example.com'
new_app_domain = instance_config[app]['app_domain']
replace_domains(path, old_app_domain, new_app_domain)
# Replace all instance domains
replace_domains(path, 'example.com', instance_domain)
def replace_domains(path: str, old_domain: str, new_domain: str) -> None:
"""
Replaces occurrences of an old domain with a new domain in a configuration file.
Args:
path (str): The file path of the .env configuration file where domains will be replaced.
old_domain (str): The old domain name to be replaced.
new_domain (str): The new domain name to replace the old domain name.
"""
logging.debug(f'replace all {old_domain} with {new_domain} in {path}')
with open(path, "r") as file:
content = file.read()
content = content.replace(f"{old_domain}", new_domain)
with open(path, "w") as file:
file.write(content)
def list_commands(app_config: Dict[str, Any]) -> None:
"""
Lists all post-deployment commands for an app based on the app configuration.
This can help in verifying which commands are set to run after deployment.
Args:
app_config (dict): Configuration for the app which may include post-deployment commands.
Returns:
None: Outputs the list of commands to standard output.
"""
domain = app_config['app_domain']
if not (all_cmds:= app_config.get('execute')):
logging.info(f"No post deploy cmds for {domain}")
return
for cmd in all_cmds:
container = cmd.split()[0]
cmd = cmd.split()[1:]
print(f"{domain}:{container} --> '{cmd}'")
def execute_cmds(app_config: Dict[str, Any]) -> None:
"""
Execute post-deployment commands for an application based on the provided configuration.
This can include running scripts or commands inside the application's environment.
Args:
app_config (dict): A dictionary containing the deployment configuration and commands for an application.
Returns:
None
"""
domain = app_config['app_domain']
if not (all_cmds:= app_config.get('execute')):
logging.info(f"No post deploy cmds for {domain}")
return
for cmd in all_cmds:
container = cmd.split()[0]
cmd = cmd.split()[1:]
print(f"Run '{cmd}' in {domain}:{container}")
if container == "local":
print(abra("app", "cmd", "--local", domain, *cmd, ignore_error=True))
else:
print(abra("app", "cmd", domain, container, *cmd, ignore_error=True))
@click.group()
@click.option('-l', '--log', 'loglevel')
@click.option('-g', '--group_path', 'group_path')
@click.option('-c', '--config_path', 'config_path', default=".")
def cli(loglevel: str, group_path: str, config_path: str) -> None:
"""
Command-line interface setup function for the Alakazam application. It configures logging levels, loads configuration files, and merges configuration settings from specified paths.
This function is the entry point for CLI commands provided by the Alakazam tool.
Args:
loglevel (str): Desired logging level ("debug", "info", "warning", "error", "critical").
group_path (str): Path to the directory containing the desired configuration group.
config_path (str): Path to the root directory containing configuration files.
"""
global CONFIGS
all_group_configs = merge_all_group_configs(config_path)
if not Path(group_path).exists():
logging.error(f"{group_path} does not exists! Are you in the correct directory?")
exit(1)
instance_configs = get_merged_instance_configs(group_path, all_group_configs)
CONFIGS = merge_connection_configs(instance_configs)
if loglevel:
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(level=numeric_level)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def show_config(apps: List[str]) -> None:
filtered_configs = CONFIGS
if apps:
filtered_configs = {server: {app: app_config for app, app_config in app_configs.items() if app in apps}
for server, app_configs in CONFIGS.items()}
print(json.dumps(filtered_configs, indent=2))
@cli.command()
@click.option('-a', '--apps', multiple=True)
def config(apps: List[str]) -> None:
"""
Generates and updates .env configuration files for a specified list of applications.
This function reads the necessary configurations from the global settings and applies these to create or update .env files for each app.
Args:
apps (list): A list of application names for which .env files need to be generated or updated.
"""
for instance, instance_config in CONFIGS.items():
if apps:
selected_apps = []
for app in apps:
if app in instance_config.keys():
selected_apps.append(app)
else:
logging.warning(f' App config \'{app}\' not found for {instance}!')
continue
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
server = app_config["server"]
path = get_env_path(server, domain)
version = app_config.get('version')
print(f'Setup {app} ({version}) config on {server} at {domain}')
new_app(app, domain, server, version)
logging.info(f'set configs for {app} at {instance}')
update_configs(path, app_config)
exchange_domains(instance, instance_config, path)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def secrets(apps: List[str]) -> None:
"""
Generates and inserts secrets for specified apps.
This function handles the generation of new secrets, the syncronisation of shared secrets and the insertion of existing secrets from the configuration into the appropriate locations for each application.
Args:
apps (list): A list of application names for which secrets are to be generated and managed.
"""
for _, instance_config in CONFIGS.items():
instance_apps = instance_config.keys()
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
print(f"Create secrets for {domain}")
insert_secrets_from_conf(domain, app_config)
exchange_secrets(app, instance_config, instance_apps)
generate_all_secrets(domain)
def get_deployed_apps(apps: List[str]) -> Dict[str, Any]:
"""
Retrieves a list of deployed apps and their versions.
This function utilizes the 'abra' command-line tool to fetch deployment information.
Args:
apps (list): List of apps to check for deployment status.
Returns:
dict: A dictionary containing app names as keys and their deployment status and versions as values.
"""
deployed_apps = {}
processed_server = []
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
server = instance_config[app]['server']
if server in processed_server:
continue
processed_server.append(server)
deployed = abra("app", "ls", "-S", "-s", server, "-m", machine_output=True)
deployed_app_versions = {app["appName"]: app["version"] for app in deployed[server]["apps"] if app["status"] == "deployed"}
deployed_apps.update(deployed_app_versions)
return deployed_apps
@cli.command()
@click.option('-a', '--apps', multiple=True)
@click.option('-r', '--run-cmds', is_flag=True)
@click.option('-f', '--force', is_flag=True)
@click.option('-c', '--converge-checks', is_flag=True)
def deploy(apps: List[str], run_cmds: bool, force: bool, converge_checks: bool) -> None:
"""
Deploys applications as specified in the configuration.
Args:
apps (list): A list of application names to be deployed.
run_cmds (bool): Flag to indicate whether to run post-deployment commands.
force (bool): Flag to force redeployment even if the application is already deployed.
converge_checks (bool): Flag to perform convergence checks during deployment.
Returns:
None
"""
print_all_apps(apps)
if input(f"Do you really want to deploy these apps? Type YES: ") != "YES":
return
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain in deployed_domains and not force:
print(f"{domain} is already deployed")
continue
version = app_config.get('version')
if not version:
version = 'latest'
cmd = ["deploy", "-n"]
if version == 'chaos':
cmd.append("--chaos")
if force:
cmd.append("--force")
if not run_cmds and not converge_checks:
cmd.append("--no-converge-checks")
cmd.append(domain)
if version not in ['latest', 'chaos']:
cmd.append(version)
print(f'deploy {domain} with version "{version}"')
print(abra("app", *cmd))
if run_cmds:
logging.info(f'execute commands for {domain}')
execute_cmds(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
@click.option('-r', '--run-cmds', is_flag=True)
@click.option('-d', '--dry-run', is_flag=True)
def upgrade(apps: List[str], run_cmds: bool, dry_run: bool) -> None:
"""
Upgrades specified applications by executing the upgrade commands via the 'abra' command-line interface.
It checks the current deployment status of the apps and performs upgrades only where necessary, with options to execute additional commands or perform a dry run. It either took the target version from the configuration or it uses the latest available version.
Args:
apps (list): List of apps to upgrade.
run_cmds (bool): If True, post-upgrade commands are executed.
dry_run (bool): If True, the upgrade process is simulated without making any changes.
Returns:
None: Outputs the results of the upgrade process to the console.
"""
deployed_domains = get_deployed_apps(apps)
upgrade_cmds = []
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain not in deployed_domains:
print(f"{domain} is not deployed")
continue
version = app_config.get('version')
if not version:
version = 'latest'
cmd = ["upgrade", "-n"]
if version == 'chaos':
cmd.append("--chaos")
if not run_cmds:
cmd.append("--no-converge-checks")
cmd.append(domain)
if version not in ['latest', 'chaos']:
cmd.append(version)
deployed_version = deployed_domains[domain]
if version == deployed_version:
print(f"{domain} is already at version {version}")
continue
print(f'upgrade {app}: {domain} from version {deployed_version} to version "{version}"')
upgrade_cmds.append((app_config, cmd))
if not dry_run and input(f"Do you really want to upgrade these apps? Type YES: ") == "YES":
for app_config, cmd in upgrade_cmds:
print(abra("app", *cmd))
if run_cmds:
logging.info(f'execute commands for {domain}')
execute_cmds(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def undeploy(apps: List[str]) -> None:
"""
Undeploys multiple applications at once.
Args:
apps (list): List of apps to undeploy.
"""
print_all_apps(apps)
if input(f"Do you really want to undeploy these apps? Type YES: ") != "YES":
return
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain not in deployed_domains:
print(f"{domain} is not deployed")
continue
print(f'undeploy {domain}')
print(abra("app", "undeploy", "-n", domain))
@cli.command()
@click.option('-a', '--apps', multiple=True)
def cmds(apps: List[str]) -> None:
"""
Execute post-deployment commands for all specified applications based on the provided configuration.
Args:
apps (list): A list containing the applications, which post-deployment commands should be executed.
Returns:
None
"""
deployed_domains = get_deployed_apps(apps)
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
domain = app_config['app_domain']
if domain not in deployed_domains:
print(f"{domain} is not deployed")
continue
logging.info(f'execute commands for {domain}')
execute_cmds(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def list_cmds(apps: List[str]) -> None:
"""
Lists all post-deployment commands for specified applications.
This function helps in reviewing which commands would run after the deployment of the specified apps.
Args:
apps (list): List of application names to list post-deployment commands for.
"""
for _, instance_config in CONFIGS.items():
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
app_config = instance_config[app]
list_commands(app_config)
@cli.command()
@click.option('-a', '--apps', multiple=True)
def purge(apps: List[str]) -> None:
"""
Completely removes applications and their configurations. This function is used to clean up all traces of an application from the server.
Args:
apps (list): A list of application names to be purged.
Returns:
None
"""
# TODO: check for deployed apps
group_apps = print_all_apps(apps)
domains = list(zip(*sum(group_apps.values(), [])))[1]
if input(f"Do you really want to purge these apps? Type YES: ") == "YES":
for domain in domains:
logging.info(f'purge {domain}')
abra("app", "rm", "-n", domain)
print(f"{domain} purged")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def ls(apps: List[str]) -> None:
"""
Lists all selected applications along with their domains.
Args:
apps (list): List of applications to list.
"""
print_all_apps(apps)
def print_all_apps(apps: List[str]) -> Dict[str, Any]:
"""
Prints a detailed list of all specified applications including their instances and domains.
Args:
apps (list): List of applications to print details for.
Returns:
dict: Returns a dictionary with application details grouped by instance.
"""
group_apps = list_apps(apps)
for instance, instance_apps in group_apps.items():
print(instance)
print(tabulate(instance_apps))
print()
return group_apps
def list_apps(apps: Optional[List[str]] = None) -> Dict[str, List[tuple]]:
"""
Retrieves a list of applications and their associated domains from the configuration.
This function provides an organized view of the applications within their respective instances.
Args:
apps (list): List of applications to list, if specified; otherwise, lists all applications.
Returns:
dict: Dictionary containing instances as keys and lists of tuples (app name, domain) as values.
"""
group_apps = {}
for instance, instance_config in CONFIGS.items():
instance_app_domains = []
if apps:
selected_apps = [app for app in apps if app in instance_config.keys()]
else:
selected_apps = instance_config.keys()
for app in selected_apps:
if app in instance_config:
domain = instance_config[app]['app_domain']
instance_app_domains.append((app, domain))
if instance_app_domains:
group_apps[instance] = instance_app_domains
return group_apps
@cli.command()
@click.option('-a', '--apps', multiple=True)
def purge_secrets(apps: List[str]) -> None:
"""
Purges all secrets associated with specified applications.
Args:
apps (list): List of applications from which secrets are to be purged.
"""
# TODO: check for deployed apps
group_apps = print_all_apps(apps)
domains = list(zip(*sum(group_apps.values(), [])))[1]
if input(f"Do you really want to purge the secrets for these apps? Type YES: ") == "YES":
for domain in domains:
logging.info(f'purge {domain}')
abra("app","secret" ,"rm", "-a", domain)
print(f"Secrets for {domain} purged")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def purge_volumes(apps: List[str]) -> None:
"""
Purges all volumes associated with specified applications, ensuring that all data stored within the app's volumes is cleanly removed.
Args:
apps (list): List of applications from which volumes are to be purged.
"""
# TODO: check for deployed apps
group_apps = print_all_apps(apps)
domains = list(zip(*sum(group_apps.values(), [])))[1]
if input(f"Do you really want to purge the volumes for these apps? Type YES: ") == "YES":
for domain in domains:
logging.info(f'purge {domain}')
abra("app","volume" ,"rm", "-n", domain)
print(f"Volumes for {domain} purged")
@cli.command()
@click.option('-a', '--apps', multiple=True)
def check(apps: List[str]) -> None:
"""
Check the health status of all apps inside the selected group.
Args:
apps (list): List of applications to be checked.
"""
group_apps = list_apps(apps)
domains = list(zip(*sum(group_apps.values(), [])))[1]
for domain in domains:
logging.info(f'check {domain}')
containers = abra("app" ,"ps", "-m", domain, machine_output=True)
for container in containers.values():
if re.match("health: starting|unknown|unhealthy", container['status']) or container['state'] != 'running':
print(f"{domain} - {container['service name']}: {container['status']}")
if __name__ == '__main__':
cli()