1492 lines
67 KiB
Python
Executable File
1492 lines
67 KiB
Python
Executable File
#!/bin/python3
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Union, Set, Tuple
|
|
import subprocess
|
|
import re
|
|
|
|
from tabulate import tabulate
|
|
import click
|
|
import dotenv
|
|
from icecream import ic
|
|
from git import Repo
|
|
from colorama import Fore, init
|
|
from git.exc import InvalidGitRepositoryError
|
|
from jinja2 import Template
|
|
from ruamel.yaml import YAML
|
|
from ruamel.yaml.constructor import SafeConstructor
|
|
from ruamel.yaml.nodes import ScalarNode
|
|
from packaging import version
|
|
from uptime_kuma_api import UptimeKumaApi, MonitorType
|
|
from time import sleep
|
|
|
|
COMBINE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/combine.yml"
|
|
# INSTANCE_CONFIGS: dict: contains all app organized by recipe names and instance domains
|
|
# The structure of the dictionary is as follows:
|
|
# {
|
|
# "instance_domain": {
|
|
# "recipe_name": {
|
|
# "subdomain": ...,
|
|
# "server": ...,
|
|
# "version": ...,
|
|
# ...
|
|
# },
|
|
# ...
|
|
# },
|
|
INSTANCE_CONFIGS = {}
|
|
# ALL_CONFIGS: same like INSTANCE_CONFIGS but not filtered by group path and exclude paths
|
|
ALL_CONFIGS = {}
|
|
|
|
# SETTINGS: dics: contains global settings for alakazam
|
|
#{
|
|
# 'root': <path that contains all the instance config files>,
|
|
# 'uptime_kuma':
|
|
# {
|
|
# "url": ...,
|
|
# "user": ...,
|
|
# "password": ...,
|
|
# "paramter": {...},
|
|
# ...
|
|
# },
|
|
# ...
|
|
#}
|
|
SETTINGS = {}
|
|
SETTINGS_PATH = "~/.config/alakazam.yml"
|
|
|
|
|
|
class MySafeConstructor(SafeConstructor):
|
|
"""
|
|
A custom YAML constructor that preserves quotes in YAML strings when parsing.
|
|
"""
|
|
def construct_yaml_str(self, node):
|
|
"""
|
|
Overrides the base scalar constructor to preserve the quotes around the YAML strings based on the original style of the node.
|
|
"""
|
|
value = self.construct_scalar(node)
|
|
if isinstance(node, ScalarNode) and (node.style == '"' or node.style == "'"):
|
|
# If node was quoted, keep quotes in the returned value
|
|
return '{}{}{}'.format(node.style, value, node.style)
|
|
# Otherwise, return value as is
|
|
return value
|
|
|
|
MySafeConstructor.add_constructor(
|
|
'tag:yaml.org,2002:str',
|
|
MySafeConstructor.construct_yaml_str)
|
|
|
|
|
|
def read_config(configpath: str) -> Dict[str, Any]:
|
|
"""
|
|
Reads a YAML configuration file from the specified configpath and returns the configuration as a dictionary, including processing any Jinja2 templating specified within the file. It handles the loading of YAML files, processes any Jinja2 templating using the 'GLOBALS' section for variables, and converts the file into a dictionary format.
|
|
|
|
Args:
|
|
configpath (str): The path to the YAML configuration file. The file should contain valid YAML and optional Jinja2 templating.
|
|
|
|
Returns:
|
|
dict: The configuration data extracted and processed from the YAML file, structured as a dictionary.
|
|
"""
|
|
filepath = Path(configpath).expanduser()
|
|
if not filepath.exists():
|
|
logging.warning(f"config file {filepath} does not exist")
|
|
return {}
|
|
yaml = YAML(typ='safe', pure=True) # Set type to 'safe' and use pure Python mode
|
|
yaml.Constructor = MySafeConstructor
|
|
with open(filepath) as file:
|
|
try:
|
|
yaml_config = yaml.load(file)
|
|
except Exception as e:
|
|
logging.error(f"Error reading config file {filepath}: {str(e)}")
|
|
return {}
|
|
|
|
if not yaml_config:
|
|
logging.warning(f"config file {filepath} is empty")
|
|
return {}
|
|
return yaml_config
|
|
# globals = yaml_config.get('GLOBALS')
|
|
# jinja = Environment(loader=FileSystemLoader(
|
|
# ['/', '.']), trim_blocks=True, lstrip_blocks=True)
|
|
# template = jinja.get_template(filepath.as_posix(), globals=globals)
|
|
# return yaml.load(template.render())
|
|
|
|
|
|
def get_value(dictionary: Dict[Any, Any], *keys: str) -> Optional[Any]:
|
|
"""
|
|
Retrieves a nested value from a dictionary using a series of keys. This function is useful for fetching deep values in a dictionary structure without manually checking each level of the dictionary.
|
|
|
|
Args:
|
|
dict (dict): The dictionary from which to extract the value.
|
|
*keys: A series of keys that define the path to the desired value within the dictionary.
|
|
|
|
Returns:
|
|
The value found at the specified path in the dictionary, or None if the path is not present.
|
|
"""
|
|
_element = dictionary
|
|
for key in keys:
|
|
try:
|
|
_element = _element[key]
|
|
except KeyError:
|
|
return
|
|
return _element
|
|
|
|
|
|
def merge_dict(dict1: Dict[Any, Any], dict2: Dict[Any, Any], reverse_list_order: bool = False) -> Dict[Any, Any]:
|
|
"""
|
|
Merges two dictionaries, combining their key-value pairs. Where keys overlap, values from dict2 will overwrite those from dict1.
|
|
If reverse_list_order is True and the values are lists, the order of elements in lists from dict2 is reversed before merging.
|
|
|
|
Args:
|
|
dict1 (dict): The first dictionary, whose values are overwritten by dict2's values for overlapping keys.
|
|
dict2 (dict): The second dictionary, whose values will overwrite dict1's for overlapping keys.
|
|
reverse_list_order (bool): If True, reverses the order of any lists found as values before merging.
|
|
|
|
Returns:
|
|
dict: A new dictionary containing the merged key-value pairs.
|
|
"""
|
|
merged_dict = dict1.copy()
|
|
for key, value in dict2.items():
|
|
if key in merged_dict and isinstance(value, dict) and isinstance(merged_dict[key], dict):
|
|
merged_dict[key] = merge_dict(merged_dict[key], value, reverse_list_order)
|
|
elif key in merged_dict and isinstance(value, list) and isinstance(merged_dict[key], list):
|
|
if reverse_list_order:
|
|
merged_list = value + merged_dict[key]
|
|
else:
|
|
merged_list = merged_dict[key] + value
|
|
unique_list = list(dict.fromkeys(merged_list)) # remove duplicates
|
|
merged_dict[key] = unique_list
|
|
elif value is not None:
|
|
merged_dict[key] = value
|
|
return merged_dict
|
|
|
|
|
|
def merge_all_group_configs(root_path: Path) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Recursively merges 'alaka.yml/alaka-*.yml' files within a specified directory into a single comprehensive configuration dictionary.
|
|
Configurations at higher directory levels get inherited and potentially overridden by configurations in lower directory levels. The merged configurations at each recursion level are accessible by their directory path relative to the root path.
|
|
|
|
Args:
|
|
root_path (str): The path to the directory containing hierarchical 'alaka.yml/alaka-*.yml' configuration files. The directory should follow an organizational structure where each subdirectory can contain 'alaka.yml/alaka-*.yml' files that inherits and potentially overrides settings from its parent directory's 'alaka.yml/alaka-*.yml' files.
|
|
|
|
Returns:
|
|
dict: A dictionary representing the merged configurations from all the 'alaka.yml/alaka-*.yml' files found in the directory hierarchy.
|
|
The dictionary's keys are the paths of the directories relative to the root directory specified by 'root_path', indicating the source of the configurations. Each key maps to its respective merged configuration dictionary, which includes all inherited and overridden settings from higher-level directories down to the specified directory.
|
|
"""
|
|
merged_configs = {}
|
|
for root, _, files in os.walk(root_path):
|
|
no_config_found = True
|
|
for file in files:
|
|
if re.match(r'^alaka(-.*)?\.ya?ml$', file):
|
|
file_path = os.path.join(root, file)
|
|
config = read_config(file_path)
|
|
if dir_config := merged_configs.get(root):
|
|
# Merge the config with the merged config from the current dir
|
|
merged_configs[root] = merge_dict(dir_config, config)
|
|
elif par_config := merged_configs.get(os.path.dirname(root)):
|
|
# Merge the config with the merged config from the parent dir
|
|
merged_configs[root] = merge_dict(par_config, config)
|
|
else:
|
|
merged_configs[root] = config
|
|
no_config_found = False
|
|
if no_config_found:
|
|
merged_configs[root] = merged_configs.get(os.path.dirname(root))
|
|
return merged_configs
|
|
|
|
|
|
def substitute_jinja_variable(jinja_dict, subs_dict) -> None:
|
|
"""
|
|
This function recursively traverses the given jinja_dict and wherever it finds a jinja template variable, it replaces it with the corresponding value from the subs_dict.
|
|
|
|
Args:
|
|
jinja_dict (dict): The dictionary which may contain jinja template variables. Can be a nested dictionary.
|
|
subs_dict (dict): The dictionary containing the substitutions for the jinja template variables.
|
|
"""
|
|
for key, value in jinja_dict.items():
|
|
if isinstance(value, dict): # If value itself is dictionary
|
|
substitute_jinja_variable(value, subs_dict) # Recursive call
|
|
else:
|
|
if "{{" in str(value) and "}}" in str(value): # If value is a jinja template
|
|
template = Template(str(value))
|
|
jinja_dict[key] = template.render(subs_dict)
|
|
|
|
|
|
def merge_instance_configs(group_config: Dict[str, Any], instance_domain: str, instance_config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Merge instance-specific configurations with group configurations to produce a consolidated configuration dictionary.
|
|
This considers domain-specific overrides and merges them appropriately with the defaults specified at the group level.
|
|
|
|
Args:
|
|
group_config (dict): A dictionary containing the group configurations from higher hierarchy levels.
|
|
instance_domain (str): The domain associated with the instance, used for subdomain mappings.
|
|
instance_config (dict): The configuration dictionary specific to an instance.
|
|
|
|
Returns:
|
|
dict: A dictionary with merged configuration for the instance, including domain mappings and server settings.
|
|
"""
|
|
merged_config = {}
|
|
for app, app_config in instance_config.items():
|
|
if app_config and group_config.get(app):
|
|
merged_config[app] = merge_dict(group_config[app], app_config)
|
|
elif app_config:
|
|
merged_config[app] = app_config
|
|
elif group_config.get(app):
|
|
merged_config[app] = group_config[app].copy()
|
|
else:
|
|
merged_config[app] = {}
|
|
merged_config[app]['app_domain'] = map_subdomain(app, instance_domain, merged_config[app])
|
|
if not ((server:= get_value(merged_config, 'GLOBALS', 'server')) or (server:= get_value(group_config, 'GLOBALS', 'server'))):
|
|
server = instance_domain
|
|
if not merged_config[app].get('server'):
|
|
merged_config[app]['server'] = server
|
|
if not (global_vars:= merged_config.get('GLOBALS')):
|
|
global_vars = group_config.get('GLOBALS')
|
|
substitute_jinja_variable(merged_config, global_vars)
|
|
if merged_config.get('GLOBALS'):
|
|
merged_config.pop('GLOBALS')
|
|
return merged_config
|
|
|
|
|
|
def map_subdomain(recipe: str, instance_domain: str, app_config: Dict[str, Any]) -> str:
|
|
"""
|
|
Maps a subdomain for an app based on the recipe, instance domain, and specific app configuration.
|
|
It dynamically replaces the placeholder domain with the actual instance domain or constructs
|
|
a subdomain using the recipe name and instance domain.
|
|
|
|
Args:
|
|
recipe (str): The name of the app recipe.
|
|
instance_domain (str): The domain associated with the instance.
|
|
app_config (dict): Configuration for the specific app which might include a subdomain setting.
|
|
|
|
Returns:
|
|
str: The fully qualified domain name for the app.
|
|
"""
|
|
if subdomain:= app_config.get('subdomain'):
|
|
domain = subdomain.replace("example.com", instance_domain)
|
|
else:
|
|
domain = f"{recipe}.{instance_domain}"
|
|
return domain
|
|
|
|
|
|
def get_merged_instance_configs(config_path: Path, group_configs: Dict[str, Any], exclude_paths: List[str]) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Traverse a directory structure to read and merge all YAML configuration files for each instance.
|
|
This function supports a hierarchical configuration approach by aggregating configurations across directories.
|
|
|
|
Args:
|
|
config_path (str): The path to the directory containing instance-specific YAML files.
|
|
group_configs (dict): A dictionary containing group configurations from higher hierarchy levels.
|
|
|
|
Returns:
|
|
dict: A dictionary with domains as keys and their respective merged configurations as values.
|
|
"""
|
|
if config_path.is_file():
|
|
parent_path = os.path.dirname(config_path)
|
|
instance_config = read_config(str(config_path))
|
|
domain = config_path.name.removesuffix('.yml').removesuffix('.yaml')
|
|
merged_config = merge_instance_configs(group_configs[parent_path], domain, instance_config)
|
|
return {domain: merged_config}
|
|
instances = {}
|
|
for root, _, files in os.walk(config_path):
|
|
if any(root.startswith(p) for p in exclude_paths):
|
|
continue
|
|
for file in files:
|
|
if any(f"{root}/{file}".startswith(p) for p in exclude_paths):
|
|
continue
|
|
# This pattern matches for files of the format "<domain>.yml" or "<domain>.yaml"
|
|
pattern = r"^(?:[A-Za-z0-9](?:[A-Za-z0-9\-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,6}(?:\.yaml|\.yml)$"
|
|
if re.match(pattern, file):
|
|
instance_config = read_config(f'{root}/{file}')
|
|
domain = file.removesuffix('.yml').removesuffix('.yaml')
|
|
merged_config = merge_instance_configs(group_configs[root], domain, instance_config)
|
|
instances[domain] = merged_config
|
|
return instances
|
|
|
|
|
|
def merge_connection_configs(configs: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Merge connection configurations from the 'combine.yml' to extend instance configurations with inter-app secrets and settings.
|
|
This involves integrating shared secrets and other connection-specific settings between applications within the same instance.
|
|
|
|
Args:
|
|
configs (dict): The initial instance configurations before integrating connection-specific adjustments.
|
|
|
|
Returns:
|
|
dict: The updated instance configurations after applying the connection settings.
|
|
"""
|
|
connection_config = read_config(COMBINE_PATH)
|
|
extend_shared_secrets(connection_config)
|
|
merged_configs = configs.copy()
|
|
for _, instance_config in merged_configs.items():
|
|
for target_app, source_apps in connection_config.items():
|
|
for source_app, target_conf in source_apps.items():
|
|
if target_app in instance_config and source_app in instance_config:
|
|
instance_config[target_app] = merge_dict(target_conf, instance_config[target_app], reverse_list_order=True)
|
|
return merged_configs
|
|
|
|
|
|
def extend_shared_secrets(connection_config: Dict[str, Any]) -> None:
|
|
"""
|
|
Extends connection configurations by embedding source app details into the shared secrets configuration.
|
|
This modifies the existing connection configurations in place, adding a layer of source app information to shared secrets.
|
|
|
|
Args:
|
|
connection_config (dict): Connection configurations which involve shared secrets.
|
|
"""
|
|
for _, source_apps in connection_config.items():
|
|
for source_app, target_conf in source_apps.items():
|
|
if shared_secrets:= target_conf.get('shared_secrets'):
|
|
target_conf['shared_secrets'] = {source_app: shared_secrets}
|
|
|
|
|
|
def abra(*args: str, machine_output: bool = False, ignore_error: bool = False) -> Union[str,Dict]:
|
|
"""
|
|
Execute the 'abra' command with the specified arguments. This function acts as a wrapper around the 'abra' CLI tool. It allows for capturing the output and optionally returning it as machine-readable JSON.
|
|
|
|
Args:
|
|
*args: Variable length argument list representing the 'abra' command and its parameters.
|
|
machine_output (bool): If True, expects the output in JSON format and parses it before returning.
|
|
ignore_error (bool): If True, suppresses the raising of errors on non-zero return codes, otherwise an exception is raised.
|
|
|
|
Returns:
|
|
str or dict: Returns the output from the 'abra' command. If machine_output is True, returns a dictionary, otherwise returns raw output as a string.
|
|
|
|
Raises:
|
|
RuntimeError: If the 'abra' command exits with a non-zero return code and ignore_error is False, including the command output in the error.
|
|
"""
|
|
command = ["abra", *args]
|
|
#remove empty elements
|
|
command = [arg for arg in command if arg]
|
|
if machine_output:
|
|
command.append("-m")
|
|
logging.debug(f"run command: {' '.join(command)}")
|
|
process = subprocess.run(command, capture_output=True)
|
|
if process.stderr and ignore_error:
|
|
logging.debug(process.stderr.decode())
|
|
if process.stdout:
|
|
logging.debug(process.stdout.decode())
|
|
if process.returncode and not ignore_error:
|
|
#breakpoint()
|
|
raise RuntimeError(
|
|
f'{" ".join(command)} \n STDOUT: \n {process.stdout.decode()} \n STDERR: {process.stderr.decode()}')
|
|
if machine_output:
|
|
return json.loads(process.stdout.decode())
|
|
return process.stdout.decode()
|
|
|
|
|
|
def write_env_header(path: Path) -> None:
|
|
"""
|
|
Writes a header comment at the top of an environment file to indicate that it is generated automatically and should not be manually edited.
|
|
This function ensures that anyone modifying the .env file is aware that changes might be overwritten by subsequent automated processes.
|
|
|
|
Args:
|
|
path (str): Path to the .env file that will receive the header.
|
|
"""
|
|
logging.debug(f'write header to {path}')
|
|
header = """################################################################################
|
|
# DO NOT EDIT THIS FILE, IT IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN #
|
|
################################################################################
|
|
|
|
"""
|
|
with open(path, "r+") as file:
|
|
old_content = file.read()
|
|
file.seek(0)
|
|
file.write(header + old_content)
|
|
|
|
|
|
def new_app(recipe: str, domain: str, server: str, version: str) -> None:
|
|
"""
|
|
Generate a new application .env by calling the 'abra' command with the specified parameters.
|
|
If the app .env already exists, it is removed before the new app is created.
|
|
|
|
Args:
|
|
recipe (str): The recipe name for the application.
|
|
domain (str): The domain under which the app will be deployed.
|
|
server (str): The server on which the app will be deployed.
|
|
version (str): The version of the app to deploy. If set to 'chaos' or None, it deploys the chaos version.
|
|
|
|
Raises:
|
|
RuntimeError: If the app creation command does not successfully report creation.
|
|
"""
|
|
path = get_env_path(server, domain)
|
|
if path.exists():
|
|
print(f'remove {path}')
|
|
path.unlink()
|
|
logging.info(f'create {recipe} config on {server} at {domain}')
|
|
if version in ['chaos', None]:
|
|
version = '--chaos'
|
|
abra("app", "new", recipe, version, "-n", "-s", server, "-D", domain )
|
|
# TODO: write new check if app has been created
|
|
#if not "app has been created" in out:
|
|
# raise RuntimeError(f'App "{recipe}" creation failed')
|
|
#else:
|
|
write_env_header(path)
|
|
logging.info(f'{recipe} created on {server} at {domain}')
|
|
|
|
|
|
def update_configs(path: Path, config: Dict[str, Any]) -> None:
|
|
"""
|
|
Update the .env configuration files at the specified path according to the provided configuration dictionary.
|
|
This function manages the commenting, uncommenting, and setting of environment variables as specified in the config dictionary.
|
|
|
|
Args:
|
|
path (str): Path to the .env configuration file to be updated.
|
|
config (dict): A dictionary containing the configurations to apply, which may include keys for 'uncomment', 'comment', and 'env' to manage the environment variables and other settings.
|
|
"""
|
|
if uncomment_keys := config.get("uncomment"):
|
|
uncomment(uncomment_keys, path, True)
|
|
if comment_keys := config.get("comment"):
|
|
comment(comment_keys, path, True)
|
|
if envs := config.get("env"):
|
|
uncomment(envs.keys(), path)
|
|
for key, value in envs.items():
|
|
logging.debug(f'set {key}={value} in {path}')
|
|
if isinstance(value, dict):
|
|
value=json.dumps(value)
|
|
dotenv.set_key(path, key, value, quote_mode="never")
|
|
|
|
|
|
def generate_all_secrets(domain: str) -> None:
|
|
"""
|
|
Generates all secrets for the app specified by its domain using the 'abra' command.
|
|
|
|
Args:
|
|
domain (str): The domain associated with the application for which secrets need to be generated.
|
|
|
|
Returns:
|
|
None: This function outputs directly to the console the results of the secrets generation process.
|
|
"""
|
|
stored_secrets = abra("app", "secret", "ls", domain).splitlines()
|
|
if any("false" in line for line in stored_secrets):
|
|
logging.info(f"Generate all secrets for {domain}")
|
|
generated_secrets = abra("app", "secret", "generate", "-a", domain)
|
|
print(f"secrets for {domain} generated")
|
|
print(generated_secrets)
|
|
|
|
|
|
def get_env_path(server: str, domain: str) -> Path:
|
|
return Path(f"~/.abra/servers/{server}/{domain}.env").expanduser()
|
|
|
|
|
|
def exchange_secrets(app1: str, instance_config: Dict[str, Any], apps: Tuple[str]) -> None:
|
|
"""
|
|
Facilitates the exchange of shared secrets between apps within the same instance based on the configuration.
|
|
This function checks for shared secrets configurations and applies them to ensure that secrets are synchronized between apps.
|
|
|
|
Args:
|
|
app1 (str): The first app to participate in the secret exchange.
|
|
instance_config (dict): Configuration of the instance including all apps.
|
|
apps (list): List of other apps in the instance to potentially share secrets with.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
app1_config = instance_config[app1]
|
|
app1_domain = app1_config['app_domain']
|
|
for app2 in apps:
|
|
app2_config = instance_config[app2]
|
|
app2_domain = app2_config['app_domain']
|
|
if app1_shared_secrets := get_value(app1_config, "shared_secrets", app2):
|
|
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
|
|
share_secrets(app1_domain, app2_domain, app1_shared_secrets)
|
|
if app2_shared_secrets := get_value(app2_config, "shared_secrets", app1):
|
|
logging.info(f'share secrets between {app1_domain} and {app2_domain}')
|
|
share_secrets(app2_domain, app1_domain, app2_shared_secrets)
|
|
|
|
|
|
def str2bool(value: str) -> bool:
|
|
return value.lower() in ("yes", "true", "t", "1")
|
|
|
|
def share_secrets(app1_domain: str, app2_domain: str, secrets: Dict[str, str]) -> None:
|
|
"""
|
|
Facilitates the sharing of secrets between two applications.
|
|
This function checks and transfers secrets from one applications to another if one applications possesses a secret that the other lacks, ensuring both applications maintain synchronized secret configurations.
|
|
|
|
Args:
|
|
app1_domain (str): The domain of the first application.
|
|
app2_domain (str): The domain of the second application.
|
|
secrets (dict): A dictionary mapping secret names in app2 to their corresponding names in app1.
|
|
"""
|
|
app1_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app1_domain, machine_output=True)
|
|
app1_stored_secrets = {x['name']: str2bool(x['created on server']) for x in app1_stored_secrets}
|
|
app2_stored_secrets = abra("app", "secret", "ls", "-m", "-C", app2_domain, machine_output=True)
|
|
app2_stored_secrets = {x['name']: str2bool(x['created on server']) for x in app2_stored_secrets}
|
|
try:
|
|
app1_container = abra("app" ,"ps", "-m", app1_domain, machine_output=True)
|
|
app1_container = list(app1_container)
|
|
except RuntimeError:
|
|
# App1 is not deployed
|
|
app1_container = []
|
|
try:
|
|
app2_container = abra("app" ,"ps", "-m", app2_domain, machine_output=True)
|
|
app2_container = list(app2_container)
|
|
except RuntimeError:
|
|
# App2 is not deployed
|
|
app2_container = []
|
|
for app2_secret in secrets:
|
|
app1_secret = secrets[app2_secret]
|
|
# TODO: test if both apps have the secret available
|
|
try:
|
|
app1_secret_is_stored = app1_stored_secrets[app1_secret]
|
|
except KeyError:
|
|
logging.error(f"{app1_domain} does not contain secret {app1_secret}")
|
|
continue
|
|
try:
|
|
app2_secret_is_stored = app2_stored_secrets[app2_secret]
|
|
except KeyError:
|
|
logging.error(f"{app2_domain} does not contain secret {app2_secret}")
|
|
continue
|
|
if app1_secret_is_stored and not app2_secret_is_stored:
|
|
if not app1_container:
|
|
logging.error(f"{app1_domain} already contains the secrets {app1_secret} but it's not running, so it's not possible to export the secret.")
|
|
secret = get_secret(app1_domain, app1_secret, app1_container)
|
|
insert_secret(app2_domain, app2_secret, secret)
|
|
logging.info(f"Shared secret {app1_secret} from {app1_domain} to {app2_domain} as {app2_secret}")
|
|
elif app2_secret_is_stored and not app1_secret_is_stored:
|
|
if not app2_container:
|
|
logging.error(f"{app2_domain} already contains the secrets {app2_secret} but it's not running, so it's not possible to export the secret.")
|
|
secret = get_secret(app2_domain, app2_secret, app2_container)
|
|
insert_secret(app1_domain, app1_secret, secret)
|
|
logging.info(f"Shared secret {app2_secret} from {app2_domain} to {app1_domain} as {app1_secret}")
|
|
elif not any([app1_secret_is_stored, app2_secret_is_stored]):
|
|
secret = generate_secret(app1_domain, app1_secret)
|
|
insert_secret(app2_domain, app2_secret, secret)
|
|
logging.info(f"Generated secrets exachanged secret {app1_secret} and {app2_secret} in {app1_domain} and {app2_domain}")
|
|
|
|
|
|
def get_secret(domain: str, secret_name: str, containers: List) -> str:
|
|
"""
|
|
Retrieves a specified secret from the application specified by its domain using the 'abra' command.
|
|
This function is useful for fetching secrets that need to be shared or updated between different applications.
|
|
|
|
Args:
|
|
domain (str): The app domain from which the secret is to be retrieved.
|
|
secret_name (str): The name of the secret to retrieve.
|
|
|
|
Returns:
|
|
str: The value of the retrieved secret.
|
|
"""
|
|
for container in containers:
|
|
try:
|
|
secret = abra("app", "run", domain, container, "cat", f"/var/run/secrets/{secret_name}")
|
|
return str(secret)
|
|
except RuntimeError:
|
|
continue
|
|
raise RuntimeError(f"{secret_name} not found for {domain}")
|
|
|
|
def generate_secret(domain: str, secret_name: str) -> str:
|
|
"""
|
|
Generates a new secret with a specified name for a given app domain. This function is utilized when a required secret is missing or needs to be regenerated.
|
|
|
|
Args:
|
|
domain (str): The app domain for which the secret is to be generated.
|
|
secret_name (str): The name of the secret to generate.
|
|
|
|
Returns:
|
|
str: The value of the newly generated secret.
|
|
"""
|
|
secret = abra("app", "secret", "generate", domain, secret_name, "v1", machine_output=True)
|
|
return secret[0]['value']
|
|
|
|
|
|
def insert_secrets_from_conf(domain: str, app_config: Dict[str, Any]) -> None:
|
|
"""
|
|
Inserts secrets into the specified app based on a configuration dictionary.
|
|
|
|
Args:
|
|
domain (str): The app domain into which the secrets are to be inserted.
|
|
app_config (dict): A dictionary containing the secrets and their corresponding values to insert.
|
|
"""
|
|
logging.info(f"Insert secrets for {domain}")
|
|
if secrets := app_config.get("secrets"):
|
|
for secret_name, secret in secrets.items():
|
|
insert_secret(domain, secret_name, secret)
|
|
|
|
|
|
def run_secret_hooks(domain: str, app_config: Dict[str, Any]) -> None:
|
|
"""
|
|
Run local abra.sh commands to generate secrets.
|
|
|
|
Args:
|
|
domain (str): The app domain into which the secrets are to be inserted.
|
|
app_config (dict): A dictionary containing the secrets hooks and their corresponding values to insert.
|
|
"""
|
|
logging.info(f"Run secret hooks for {domain}")
|
|
if secret_hooks := app_config.get("secret_hooks"):
|
|
for cmd in secret_hooks:
|
|
print(f"Run '{cmd}' in {domain}")
|
|
print(abra("app", "cmd", "--local", domain, cmd, ignore_error=True))
|
|
|
|
|
|
def unquote_strings(s: str) -> str:
|
|
"""
|
|
Removes surrounding single or double quotes from a string if present.
|
|
This is used to clean the quotes from strings extracted from configurations or secrets, ensuring that the values can be used directly without formatting issues.
|
|
|
|
Args:
|
|
s (str): The string from which quotes are to be removed.
|
|
|
|
Returns:
|
|
str: The unquoted string.
|
|
"""
|
|
if s.startswith('"') and s.endswith('"'):
|
|
return s[1:-1]
|
|
elif s.startswith("'") and s.endswith("'"):
|
|
return s[1:-1]
|
|
else:
|
|
return s
|
|
|
|
|
|
def insert_secret(domain: str, secret_name: str, secret: str) -> None:
|
|
"""
|
|
Inserts a secret for a specific app only if the secret does not already exists.
|
|
This function interacts with the 'abra' command to manage secrets on the server.
|
|
|
|
Args:
|
|
domain (str): The app domain within which the secret is sinserted.
|
|
secret_name (str): The name of the secret to insert.
|
|
secret (str): The value of the secret to be inserted.
|
|
"""
|
|
# TODO parse json
|
|
stored_secrets = abra("app", "secret", "ls", "-C", domain).splitlines()
|
|
# Fix extra quotes around secrets
|
|
secret = unquote_strings(secret)
|
|
if not any(secret_name in line and "true" in line for line in stored_secrets):
|
|
logging.debug(f"Insert secret {secret_name}: {secret} into {domain}")
|
|
abra("app", "secret", "insert", domain, secret_name, "v1", secret)
|
|
|
|
|
|
def uncomment(keys: List[str], path: str, match_all: bool = False) -> None:
|
|
"""
|
|
Uncomments lines in a configuration file that contain specified keys.
|
|
If 'match_all' is True, it matches against the entire line, otherwise, it matches only against the key.
|
|
|
|
Args:
|
|
keys (list of str): The keys corresponding to the lines to be uncommented.
|
|
path (str): Path to the file where lines will be uncommented.
|
|
match_all (bool): Whether to match the keys against the entire line or just the beginning.
|
|
"""
|
|
logging.debug(f'Uncomment {keys} in {path}')
|
|
with open(path, "r") as file:
|
|
lines = file.readlines()
|
|
with open(path, "w") as file:
|
|
for line in lines:
|
|
line_match = line.split("=")[0] # Match only keys
|
|
if match_all:
|
|
line_match = line
|
|
if ('=' in line) and any(key in line_match for key in keys):
|
|
line = line.lstrip("#").lstrip()
|
|
file.write(line)
|
|
|
|
|
|
def comment(keys: List[str], path: str, match_all: bool = False) -> None:
|
|
"""
|
|
Comments lines in a configuration file that contain specified keys.
|
|
If 'match_all' is True, it matches against the entire line, otherwise, it matches only against the key.
|
|
|
|
Args:
|
|
keys (list of str): The keys corresponding to the lines to be commented.
|
|
path (str): Path to the file where lines will be commented.
|
|
match_all (bool): Whether to match the keys against the entire line or just the beginning.
|
|
"""
|
|
logging.debug(f'Comment {keys} in {path}')
|
|
with open(path, "r") as file:
|
|
lines = file.readlines()
|
|
with open(path, "w") as file:
|
|
for line in lines:
|
|
line_match = line.split("=")[0] # Match only keys
|
|
if match_all:
|
|
line_match = line
|
|
if any(key in line_match for key in keys):
|
|
line = line.lstrip("#").lstrip()
|
|
line = f"#{line}"
|
|
file.write(line)
|
|
|
|
|
|
def exchange_domains(instance_domain: str, instance_config: Dict[str, Any], path: Path) -> None:
|
|
"""
|
|
Replaces all domain references in the specified .env configuration file based on the instance configuration. It is used to ensure that all references to an application's domain are consistent across various configuration files.
|
|
|
|
Args:
|
|
instance_domain (str): The domain of the instance.
|
|
instance_config (dict): Configuration details for the instance, including domain mappings.
|
|
path (str): File path of the .env configuration file to be updated.
|
|
|
|
Returns:
|
|
None: The function modifies the file in place and does not return a value.
|
|
"""
|
|
# Replace all app sepcific subdomains
|
|
for app in instance_config:
|
|
old_app_domain = f'{app}.example.com'
|
|
new_app_domain = instance_config[app]['app_domain']
|
|
replace_domains(path, old_app_domain, new_app_domain)
|
|
# Replace all instance domains
|
|
replace_domains(path, 'example.com', instance_domain)
|
|
|
|
|
|
|
|
def replace_domains(path: Path, old_domain: str, new_domain: str) -> None:
|
|
"""
|
|
Replaces occurrences of an old domain with a new domain in a configuration file.
|
|
|
|
Args:
|
|
path (str): The file path of the .env configuration file where domains will be replaced.
|
|
old_domain (str): The old domain name to be replaced.
|
|
new_domain (str): The new domain name to replace the old domain name.
|
|
"""
|
|
logging.debug(f'replace all {old_domain} with {new_domain} in {path}')
|
|
with open(path, "r") as file:
|
|
content = file.read()
|
|
content = content.replace(f"{old_domain}", new_domain)
|
|
with open(path, "w") as file:
|
|
file.write(content)
|
|
|
|
|
|
def execute_cmds(app_config: Dict[str, Any], commands: Tuple[str] = tuple(), initial: bool = False, deploy: bool = False, upgrade: bool = False, dry_run: bool = False, chaos: bool = False) -> None:
|
|
"""
|
|
Execute post-deployment commands for an application based on the provided configuration.
|
|
This can include running scripts or commands inside the application's environment.
|
|
|
|
Args:
|
|
app_config (dict): A dictionary containing the deployment configuration and commands for an application.
|
|
commands (list): A list of commands that should be executed in the following format:
|
|
'<container_name> <abra.sh command> <optional arguments>'
|
|
initial (bool): execute initial-hooks
|
|
deploy (bool): execute deploy-hooks
|
|
upgrade (bool): execute upgrade-hooks
|
|
dry-run(bool): only show cmds, don't execute them
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
domain = app_config['app_domain']
|
|
all_cmds = []
|
|
if initial and (initial_hooks:= app_config.get('initial-hooks')):
|
|
all_cmds = all_cmds + initial_hooks
|
|
if deploy and (deploy_hooks:= app_config.get('deploy-hooks')):
|
|
all_cmds = all_cmds + deploy_hooks
|
|
if upgrade and (upgrade_hooks:= app_config.get('upgrade-hooks')):
|
|
all_cmds = all_cmds + upgrade_hooks
|
|
if commands:
|
|
all_cmds = all_cmds + list(commands)
|
|
chaos_flag = ""
|
|
if chaos:
|
|
chaos_flag = "-C"
|
|
for cmd in all_cmds:
|
|
container = cmd.split()[0]
|
|
cmd = cmd.split()[1:]
|
|
print(f"Run '{cmd}' in {domain}:{container}")
|
|
if dry_run:
|
|
continue
|
|
if container == "local":
|
|
print(abra("app", "cmd", "--local", chaos_flag, domain, *cmd, ignore_error=True))
|
|
else:
|
|
print(abra("app", "cmd", chaos_flag, domain, container, *cmd, ignore_error=True))
|
|
|
|
|
|
@click.group(context_settings={"help_option_names": ['-h', '--help']})
|
|
@click.option('-l', '--log', 'loglevel', help='Desired logging level ("debug", "info", "warning", "error", "critical")')
|
|
@click.option('-e', '--exclude', help='Path to a directory that contains a group of instance configurations to be excluded.', multiple=True, type=click.Path(exists=True))
|
|
@click.argument('group_path', type=click.Path(exists=True))
|
|
def cli(loglevel: str, group_path: str, exclude:Tuple[str]) -> None:
|
|
"""
|
|
Alakazam is a meta-configuration app-connector and an abra wrapper, designed as a proof-of-concept to simplify the management of environment configuration files across multiple instances.
|
|
|
|
GROUP_PATH: path to the directory that contains a group of instance configurations or the instance configurations itself.
|
|
"""
|
|
global INSTANCE_CONFIGS
|
|
global ALL_CONFIGS
|
|
global SETTINGS
|
|
SETTINGS = read_config(SETTINGS_PATH)
|
|
if not (root_path:= SETTINGS.get('root')):
|
|
root_path = os.getcwd()
|
|
logging.warning(f"There is no 'root' path defined in '{SETTINGS_PATH}', use current path '{root_path}'instead")
|
|
_group_path = Path(group_path).expanduser().absolute()
|
|
_root_path = Path(root_path).expanduser().absolute()
|
|
if not str(_group_path).startswith(str(_root_path)):
|
|
logging.error(f"{_root_path} does not contain {_group_path}?")
|
|
exit(1)
|
|
all_group_configs = merge_all_group_configs(_root_path)
|
|
exclude_paths = list(map(lambda p: str(Path(p).absolute()), exclude))
|
|
instance_configs = get_merged_instance_configs(_group_path, all_group_configs, exclude_paths)
|
|
INSTANCE_CONFIGS = merge_connection_configs(instance_configs)
|
|
all_configs = get_merged_instance_configs(_root_path, all_group_configs, [])
|
|
ALL_CONFIGS = merge_connection_configs(all_configs)
|
|
if loglevel:
|
|
numeric_level = getattr(logging, loglevel.upper(), None)
|
|
if not isinstance(numeric_level, int):
|
|
raise ValueError('Invalid log level: %s' % loglevel)
|
|
logging.basicConfig(level=numeric_level)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
def show_config(recipes: Tuple[str]) -> None:
|
|
"""
|
|
Show the merged instance configurations.
|
|
"""
|
|
filtered_configs = INSTANCE_CONFIGS
|
|
if recipes:
|
|
filtered_configs = {server: {recipe_name: app_config for recipe_name, app_config in app_configs.items() if recipe_name in recipes}
|
|
for server, app_configs in INSTANCE_CONFIGS.items()}
|
|
print(json.dumps(filtered_configs, indent=2))
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
def config(recipes: Tuple[str]) -> None:
|
|
"""
|
|
Generates and updates .env configuration files for a specified list of applications.
|
|
This function reads the necessary configurations from the global settings and applies these to create or update .env files for each app.
|
|
"""
|
|
for instance, instance_config in INSTANCE_CONFIGS.items():
|
|
if recipes:
|
|
selected_apps = []
|
|
for app in recipes:
|
|
if app in instance_config.keys():
|
|
selected_apps.append(app)
|
|
else:
|
|
logging.info(f' App config \'{app}\' not found for {instance}!')
|
|
continue
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
server = app_config["server"]
|
|
path = get_env_path(server, domain)
|
|
version = app_config.get('version')
|
|
print(f'Setup {app} ({version}) config on {server} at {domain}')
|
|
new_app(app, domain, server, version)
|
|
logging.info(f'set configs for {app} at {instance}')
|
|
update_configs(path, app_config)
|
|
exchange_domains(instance, instance_config, path)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
def secrets(recipes: Tuple[str]) -> None:
|
|
"""
|
|
Generates and inserts secrets for specified apps.
|
|
This function handles the generation of new secrets, the syncronisation of shared secrets and the insertion of existing secrets from the configuration into the appropriate locations for each application.
|
|
"""
|
|
for _, instance_config in INSTANCE_CONFIGS.items():
|
|
instance_apps = instance_config.keys()
|
|
if recipes:
|
|
selected_apps = [app for app in recipes if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
print(f"Create secrets for {domain}")
|
|
insert_secrets_from_conf(domain, app_config)
|
|
run_secret_hooks(domain, app_config)
|
|
exchange_secrets(app, instance_config, instance_apps)
|
|
generate_all_secrets(domain)
|
|
|
|
|
|
def get_deployed_apps(apps: Tuple[str]) -> Dict[str, str]:
|
|
"""
|
|
Retrieves a list of deployed apps and their versions.
|
|
This function utilizes the 'abra' command-line tool to fetch deployment information.
|
|
|
|
Args:
|
|
apps (list): List of apps to check for deployment status.
|
|
|
|
Returns:
|
|
dict: A dictionary containing app names as keys and their versions as values.
|
|
"""
|
|
deployed_apps = {}
|
|
processed_server = []
|
|
for _, instance_config in INSTANCE_CONFIGS.items():
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
server = instance_config[app]['server']
|
|
if server in processed_server:
|
|
continue
|
|
processed_server.append(server)
|
|
deployed = abra("app", "ls", "-S", "-s", server, "-m", machine_output=True)
|
|
deployed_app_versions = {app["appName"]: app["version"] for app in deployed[server]["apps"] if app["status"] == "deployed"}
|
|
deployed_apps.update(deployed_app_versions)
|
|
return deployed_apps
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
@click.option('-e', '--execute-hooks', is_flag=True, help='run post-deployment commands.')
|
|
@click.option('-f', '--force', is_flag=True, help='force redeployment even if the application is already deployed.')
|
|
@click.option('-c', '--converge-checks', is_flag=True, help='perform convergence checks during deployment.')
|
|
def deploy(recipes: Tuple[str], execute_hooks: bool, force: bool, converge_checks: bool) -> None:
|
|
"""
|
|
Deploys applications as specified in the configuration.
|
|
"""
|
|
if force:
|
|
instance_apps = get_apps(recipes)
|
|
else:
|
|
instance_apps = get_apps_by_deployment(recipes, deployed=False)
|
|
print_all_apps(instance_apps)
|
|
if not instance_apps or input(f"Do you really want to deploy these apps? Type YES: ") != "YES":
|
|
return
|
|
for instance, apps_to_deploy in instance_apps.items():
|
|
for app, domain in apps_to_deploy:
|
|
app_config = get_value(INSTANCE_CONFIGS, instance, app)
|
|
version = app_config.get('version')
|
|
if not version:
|
|
version = 'latest'
|
|
cmd = ["deploy", "-n"]
|
|
if version == 'chaos':
|
|
cmd.append("--chaos")
|
|
if force:
|
|
cmd.append("--force")
|
|
if not execute_hooks and not converge_checks:
|
|
cmd.append("--no-converge-checks")
|
|
cmd.append(domain)
|
|
if version not in ['latest', 'chaos']:
|
|
cmd.append(version)
|
|
print(f'deploy {domain} with version "{version}"')
|
|
print(abra("app", *cmd))
|
|
if execute_hooks:
|
|
logging.info(f'execute commands for {domain}')
|
|
execute_cmds(app_config, deploy=True)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
@click.option('-e', '--execute-hooks', is_flag=True, help='run post-upgrade commands.')
|
|
@click.option('-d', '--dry-run', is_flag=True, help="don't execute the upgrade process")
|
|
@click.option('-rd', '--redeploy', is_flag=True, help="use undeploy and deploy for the updating process")
|
|
def upgrade(recipes: Tuple[str], execute_hooks: bool, dry_run: bool, redeploy: bool) -> None:
|
|
"""
|
|
Upgrades specified applications by executing the upgrade commands via the 'abra' command-line interface.
|
|
It checks the current deployment status of the apps and performs upgrades only where necessary, with options to execute additional commands or perform a dry run. It either took the target version from the configuration or it uses the latest available version.
|
|
"""
|
|
deployed_instance_apps = get_apps_by_deployment(recipes, deployed=True)
|
|
upgrade_cmds = []
|
|
release_notes = {}
|
|
for instance, deployed_apps in deployed_instance_apps.items():
|
|
upgrade_apps = []
|
|
for app_details in deployed_apps:
|
|
app, domain, deployed_version = app_details
|
|
app_config = get_value(INSTANCE_CONFIGS, instance, app)
|
|
upgrade_version = app_config.get('version')
|
|
if not upgrade_version:
|
|
upgrade_version = get_latest_version(app)
|
|
upgrade_cmd = ["upgrade", "-n"]
|
|
if not execute_hooks:
|
|
upgrade_cmd.append("--no-converge-checks")
|
|
upgrade_cmd.append(domain)
|
|
if deployed_version == 'unknown':
|
|
logging.warning(f"Cold not detect deployed version of {domain}")
|
|
continue
|
|
try:
|
|
version.parse(deployed_version)
|
|
except version.InvalidVersion:
|
|
logging.warning(f"Cold not detect deployed version of {domain}: {deployed_version}")
|
|
continue
|
|
upgrade_cmd.append(upgrade_version)
|
|
if version.parse(upgrade_version) <= version.parse(deployed_version):
|
|
logging.info(f"{domain} is already at version {upgrade_version}")
|
|
continue
|
|
app_details.append(upgrade_version)
|
|
upgrade_apps.append(app_details)
|
|
logging.info(f'upgrade {app}: {domain} from version {deployed_version} to version "{upgrade_version}"')
|
|
upgrade_cmds.append((app_config, upgrade_cmd))
|
|
release_note_cmd = upgrade_cmd.copy()
|
|
release_note_cmd.insert(1, '-r')
|
|
release_note = abra("app", *release_note_cmd, ignore_error=True)
|
|
if (not (note:= release_notes.get(app))) or len(note) <= len(release_note):
|
|
release_notes[app] = release_note
|
|
deployed_instance_apps[instance] = upgrade_apps
|
|
print_all_apps(deployed_instance_apps)
|
|
input("Press any key to show release notes.")
|
|
for app, note in release_notes.items():
|
|
print(app)
|
|
print(note)
|
|
if not dry_run and input(f"Do you really want to upgrade these apps? Type YES: ") == "YES":
|
|
for app_config, upgrade_cmd in upgrade_cmds:
|
|
app_domain = app_config.get('app_domain')
|
|
if redeploy:
|
|
upgrade_cmd.pop(0)
|
|
print(f'undeploy {app_domain}')
|
|
upgrade_cmd.insert(0, 'deploy')
|
|
print(abra("app", "undeploy", '--no-input', app_domain))
|
|
sleep(20)
|
|
print(f'deploy {app_domain}')
|
|
print(abra("app", *upgrade_cmd))
|
|
if execute_hooks:
|
|
logging.info(f'execute commands for {app_domain}')
|
|
execute_cmds(app_config, upgrade=True)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
def undeploy(recipes: Tuple[str]) -> None:
|
|
"""
|
|
Undeploys multiple applications at once.
|
|
"""
|
|
instance_apps = get_apps(recipes)
|
|
print_all_apps(instance_apps)
|
|
if input(f"Do you really want to undeploy these apps? Type YES: ") != "YES":
|
|
return
|
|
deployed_domains = get_deployed_apps(recipes)
|
|
for _, instance_config in INSTANCE_CONFIGS.items():
|
|
if recipes:
|
|
selected_apps = [app for app in recipes if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
if domain not in deployed_domains:
|
|
print(f"{domain} is not deployed")
|
|
continue
|
|
print(f'undeploy {domain}')
|
|
print(abra("app", "undeploy", "-n", domain))
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
@click.option('commands', '-c', '--command', multiple=True, metavar="'<container_name> <abra.sh command> <optional arguments>'", help='commands that should be executed')
|
|
@click.option('-i', '--initial', is_flag=True, help='execute initial-hooks from config')
|
|
@click.option('-d', '--deploy', is_flag=True, help='execute deploy-hooks from config')
|
|
@click.option('-u', '--upgrade', is_flag=True, help='execute upgrade-hooks from config')
|
|
@click.option('-l', '--list-cmds', is_flag=True, help="only show cmds, don't execute them")
|
|
@click.option('-C', '--chaos', is_flag=True, help="execute in chaos mode")
|
|
def cmd(recipes: Tuple[str], commands: Tuple[str], initial: bool, deploy: bool, upgrade: bool, list_cmds: bool = False, chaos: bool = False) -> None:
|
|
"""
|
|
Execute commands for all specified applications based on the provided configuration.
|
|
"""
|
|
deployed_domains = get_deployed_apps(recipes)
|
|
for _, instance_config in INSTANCE_CONFIGS.items():
|
|
if recipes:
|
|
selected_apps = [app for app in recipes if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
app_config = instance_config[app]
|
|
domain = app_config['app_domain']
|
|
if domain not in deployed_domains:
|
|
print(f"{domain} is not deployed")
|
|
continue
|
|
logging.info(f'execute commands for {domain}')
|
|
execute_cmds(app_config, commands, initial, deploy, upgrade, list_cmds, chaos)
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
def purge(recipes: Tuple[str]) -> None:
|
|
"""
|
|
Completely removes applications and their configurations. This function is used to clean up all traces of an application from the server.
|
|
"""
|
|
# TODO: check for deployed apps
|
|
instance_apps = get_apps(recipes)
|
|
print_all_apps(instance_apps)
|
|
domains = list(zip(*sum(instance_apps.values(), [])))[1]
|
|
if input(f"Do you really want to purge these apps? Type YES: ") == "YES":
|
|
for domain in domains:
|
|
logging.info(f'purge {domain}')
|
|
abra("app", "rm", "-n", domain)
|
|
print(f"{domain} purged")
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
@click.option('-d', '--deployed', is_flag=True, help='show only deployed apps.')
|
|
@click.option('-u', '--undeployed', is_flag=True, help='show only undeployed apps.')
|
|
@click.option('--domains', is_flag=True, help='list only domains.')
|
|
@click.option('-s' , '--server', is_flag=True, help='list only the server.')
|
|
def ls(recipes: Tuple[str], deployed: bool, undeployed: bool, domains: bool, server: bool) -> None:
|
|
"""
|
|
Lists all selected applications along with their domains.
|
|
"""
|
|
if server:
|
|
for s in get_server(recipes):
|
|
print(s)
|
|
return
|
|
if deployed:
|
|
instance_apps = get_apps_by_deployment(recipes, deployed=True)
|
|
elif undeployed:
|
|
instance_apps = get_apps_by_deployment(recipes, deployed=False)
|
|
else:
|
|
instance_apps = get_apps(recipes)
|
|
if domains:
|
|
print(list(zip(*sum(instance_apps.values(), [])))[1])
|
|
else:
|
|
print_all_apps(instance_apps)
|
|
|
|
|
|
def print_all_apps(instance_apps: Dict[str, List[List]]) -> None:
|
|
"""
|
|
Prints a detailed list of all specified applications including their instances and domains.
|
|
|
|
Args:
|
|
apps (list): List of applications to print details for.
|
|
|
|
Returns:
|
|
dict: Returns a dictionary with application details grouped by instance.
|
|
"""
|
|
for instance, apps in instance_apps.items():
|
|
print(tabulate(apps, headers=["",instance, "", ""], tablefmt='rounded_outline'))
|
|
print()
|
|
|
|
def get_apps(apps: Optional[Tuple[str]] = None) -> Dict[str, List[List]]:
|
|
"""
|
|
Retrieves a dict of applications and their associated domains from the configuration.
|
|
This function provides an organized view of the applications within their respective instances.
|
|
|
|
Args:
|
|
apps (list): List of applications to list, if specified; otherwise, lists all applications.
|
|
|
|
Returns:
|
|
dict: Dictionary containing instances as keys and lists of lists [app name, domain] as values.
|
|
"""
|
|
instance_apps = {}
|
|
for instance, instance_config in INSTANCE_CONFIGS.items():
|
|
instance_app_domains = []
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
if app in instance_config:
|
|
domain = instance_config[app]['app_domain']
|
|
instance_app_domains.append([app, domain])
|
|
if instance_app_domains:
|
|
instance_apps[instance] = instance_app_domains
|
|
return instance_apps
|
|
|
|
|
|
def get_server(apps: Optional[Tuple[str]] = None) -> Set[str]:
|
|
"""
|
|
Retrieves a list of server for the selected apps.
|
|
|
|
Args:
|
|
apps (list): List of applications, if not specified use all applications.
|
|
|
|
Returns:
|
|
Set: Set containing the server domains.
|
|
"""
|
|
server = set()
|
|
for _, instance_config in INSTANCE_CONFIGS.items():
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
if app in instance_config:
|
|
server.add(instance_config[app]['server'])
|
|
return server
|
|
|
|
|
|
def get_server_apps(apps: Optional[Tuple[str]] = None, all: bool = False) -> Dict[str, List[str]]:
|
|
"""
|
|
Retrieves a dict of server associated with the app domains for the selected apps.
|
|
|
|
Args:
|
|
apps (list): List of applications, if not specified use all applications.
|
|
all: get server->appdomain mapping from all instances
|
|
|
|
Returns:
|
|
dict: Dictionary containing server as keys and lists of app domains as values.
|
|
|
|
"""
|
|
server_apps = dict()
|
|
instance_configs = INSTANCE_CONFIGS.items()
|
|
if all:
|
|
instance_configs = ALL_CONFIGS.items()
|
|
for _, instance_config in instance_configs:
|
|
if apps:
|
|
selected_apps = [app for app in apps if app in instance_config.keys()]
|
|
else:
|
|
selected_apps = instance_config.keys()
|
|
for app in selected_apps:
|
|
if app in instance_config:
|
|
server = instance_config[app]['server']
|
|
if server in server_apps:
|
|
server_apps[server].append(instance_config[app]['app_domain'])
|
|
else:
|
|
server_apps[server] = [instance_config[app]['app_domain']]
|
|
return server_apps
|
|
|
|
|
|
|
|
def get_apps_by_deployment(apps: Tuple[str], deployed: bool = True) -> Dict[str, List[List]]:
|
|
"""
|
|
Retrieves a dict of deployed/undeployed apps and their associated domains.
|
|
|
|
Args:
|
|
apps (list): List of applications to list, if specified; otherwise, lists all applications.
|
|
deployed (bool): filter for deployed apps if true and for undeployed apps if false.
|
|
|
|
Returns:
|
|
dict: Dictionary containing instances as keys and lists of lists [app name, domain, version] as values. The version is only appended if the deployed flag is True.
|
|
"""
|
|
instance_apps = get_apps(apps)
|
|
deployed_domains = get_deployed_apps(apps)
|
|
filtered_instance_apps = {}
|
|
for instance, apps_domains in instance_apps.items():
|
|
if deployed:
|
|
filtered_apps = [[app_name, domain, deployed_domains[domain]] for app_name, domain in apps_domains if domain in deployed_domains]
|
|
else:
|
|
filtered_apps = [[app_name, domain] for app_name, domain in apps_domains if domain not in deployed_domains]
|
|
if filtered_apps:
|
|
filtered_instance_apps[instance] = filtered_apps
|
|
return filtered_instance_apps
|
|
|
|
|
|
def get_latest_version(app: str) -> str:
|
|
"""
|
|
Function to get the latest version of an application.
|
|
|
|
Args:
|
|
app (str): Name of the application.
|
|
Returns:
|
|
str: The latest version of the application as a string.
|
|
"""
|
|
versions = abra('recipe', 'versions', app, machine_output=True)
|
|
latest = sorted([(version.parse(v['version']), v['version']) for v in versions])[-1][1]
|
|
logging.debug(f'latest version of {app}: {latest}')
|
|
return latest
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
@click.option('-s', '--secret', multiple=False, help='list of secrets to be purged.')
|
|
def purge_secrets(recipes: Tuple[str], secret:str) -> None:
|
|
"""
|
|
Purges all secrets associated with specified applications.
|
|
"""
|
|
# TODO: check for deployed apps
|
|
instance_apps = get_apps(recipes)
|
|
print_all_apps(instance_apps)
|
|
domains = list(zip(*sum(instance_apps.values(), [])))[1]
|
|
if input(f"Do you really want to purge the secret {secret} for these apps? Type YES: ") == "YES":
|
|
for domain in domains:
|
|
logging.info(f'purge {domain}')
|
|
if not secret:
|
|
abra("app","secret" ,"rm", "-a", domain)
|
|
print(f"Secrets for {domain} purged")
|
|
else:
|
|
abra("app","secret" ,"rm", domain, secret)
|
|
print(f"Secret {secret} for {domain} purged")
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
def purge_volumes(recipes: Tuple[str]) -> None:
|
|
"""
|
|
Purges all volumes associated with specified applications, ensuring that all data stored within the app's volumes is cleanly removed.
|
|
"""
|
|
# TODO: check for deployed apps
|
|
instance_apps = get_apps(recipes)
|
|
print_all_apps(instance_apps)
|
|
domains = list(zip(*sum(instance_apps.values(), [])))[1]
|
|
if input(f"Do you really want to purge the volumes for these apps? Type YES: ") == "YES":
|
|
for domain in domains:
|
|
logging.info(f'purge {domain}')
|
|
abra("app","volume" ,"rm", "-n", domain)
|
|
print(f"Volumes for {domain} purged")
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
@click.option('-w', '--watch', is_flag=True, help='repeat checking.')
|
|
@click.option('-v', '--verbose', is_flag=True, help='show more info columns for each container')
|
|
def ps(recipes: Tuple[str], watch: bool, verbose: bool) -> None:
|
|
"""
|
|
Check the health status of all apps inside the selected group.
|
|
"""
|
|
while True:
|
|
instance_apps = get_apps_by_deployment(recipes, deployed=True)
|
|
if not instance_apps:
|
|
print("No apps deployed")
|
|
break
|
|
domains = list(zip(*sum(instance_apps.values(), [])))[1]
|
|
rows = [['App', 'Service', 'Status', 'Version', 'Chaos', 'State', 'Image']]
|
|
for domain in domains:
|
|
logging.info(f'check {domain}')
|
|
containers = abra("app" ,"ps", "-m", domain, machine_output=True)
|
|
for container in containers.values():
|
|
status = container['status']
|
|
if re.match(".*(starting|unknown|unhealthy).*", container['status']):
|
|
status = f'\033[31;1;4m{status}\033[0m'
|
|
row = [domain, container['service'], status, container['version']]
|
|
if verbose:
|
|
row.append(container['chaos'])
|
|
row.append(container['state'])
|
|
row.append(container['image'])
|
|
rows.append(row)
|
|
if watch:
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
|
print(tabulate(rows, 'firstrow', 'rounded_outline'), flush=True)
|
|
if not watch:
|
|
break
|
|
|
|
|
|
@cli.command()
|
|
def install() -> None:
|
|
"""
|
|
Reinstall the alakazam dependencies.
|
|
"""
|
|
pass
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
@click.option('message','-m', '--message', metavar='<Message>', help='create a commit with the specified Message')
|
|
@click.option('-p', '--push', is_flag=True, help='Push all changes to the remote')
|
|
def git(recipes: Tuple[str], message: str, push: bool = False) -> None:
|
|
"""
|
|
Run multiple git commands on each ~/.abra/servers. Without any specified options each repository will only be pulled.
|
|
"""
|
|
servers = get_server(recipes)
|
|
for server in servers:
|
|
server_path = Path(f"~/.abra/servers/{server}").expanduser()
|
|
try:
|
|
repo = Repo(server_path)
|
|
except InvalidGitRepositoryError:
|
|
logging.error(f'{server_path} is not a git repository')
|
|
continue
|
|
print(f"\033[1m{server_path}\033[0m")
|
|
if repo.remotes:
|
|
repo.remote().pull()
|
|
print(f"➜➜ Pulled Repository")
|
|
repo.git.add('.')
|
|
status = repo.git.status()
|
|
if not ("Your branch is up to date" in status and "working tree clean" in status):
|
|
print(status)
|
|
if not message:
|
|
print("➜➜ no commit message provided, skip committing")
|
|
elif repo.commit().message == message:
|
|
print(f"➜➜ commit already committed: {message}")
|
|
else:
|
|
repo.index.commit(message)
|
|
print(f"➜➜ Created commit {message}")
|
|
if push and repo.remotes:
|
|
repo.remote().push()
|
|
print(f"➜➜ Pushed Repository")
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
def diff(recipes: Tuple[str]) -> None:
|
|
"""
|
|
Show the changes in the .env repositories inside ~/.abra/servers.
|
|
"""
|
|
init(autoreset=True)
|
|
servers = get_server(recipes)
|
|
for server in servers:
|
|
server_path = Path(f"~/.abra/servers/{server}").expanduser()
|
|
try:
|
|
repo = Repo(server_path)
|
|
except InvalidGitRepositoryError:
|
|
logging.error(f'{server_path} is not a git repository')
|
|
continue
|
|
diff_output = repo.git.diff()
|
|
for line in diff_output.splitlines():
|
|
if line.startswith('-'):
|
|
print(Fore.RED + line)
|
|
elif line.startswith('+'):
|
|
print(Fore.GREEN + line)
|
|
else:
|
|
print(line)
|
|
if repo.untracked_files:
|
|
print("\n Untracked Files:")
|
|
for file in repo.untracked_files:
|
|
print(f'\t {file}')
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
@click.option('-u', '--update', is_flag=True, help='update the existing uptime kuma monitors.')
|
|
@click.option('missing', '-m', '--get-missing', is_flag=True, help='only print missing uptime kuma monitors.')
|
|
def uptime(recipes: Tuple[str], update: bool, missing: bool) -> None:
|
|
"""
|
|
Add the app domains to an uptime kuma instance.
|
|
"""
|
|
if not (uptime_config:= (SETTINGS.get('uptime_kuma'))):
|
|
logging.error(f"No uptime kuma settings provided in {SETTINGS_PATH}")
|
|
exit(1)
|
|
if not uptime_config.get('url'):
|
|
logging.error(f"No uptime kuma 'url' provided in {SETTINGS_PATH}")
|
|
exit(1)
|
|
if not uptime_config.get('user'):
|
|
logging.error(f"No uptime kuma 'user' provided in {SETTINGS_PATH}")
|
|
exit(1)
|
|
if not uptime_config.get('password'):
|
|
logging.error(f"No uptime kuma 'password' provided in {SETTINGS_PATH}")
|
|
exit(1)
|
|
uptime_kuma_url = uptime_config.get('url')
|
|
api = UptimeKumaApi(uptime_kuma_url)
|
|
api.login(uptime_config.get('user'), uptime_config.get('password'))
|
|
|
|
monitor_parameter = uptime_config.get('parameter')
|
|
if not monitor_parameter:
|
|
monitor_parameter = {}
|
|
|
|
instance_apps = get_apps(recipes)
|
|
if not missing:
|
|
print_all_apps(instance_apps)
|
|
else:
|
|
print(f"The following domains are missing in {uptime_kuma_url}")
|
|
if not instance_apps or (not missing and input(f"Do you really want to add these apps to {uptime_kuma_url}? Type YES: ") != "YES"):
|
|
return
|
|
domains = list(zip(*sum(instance_apps.values(), [])))[1]
|
|
|
|
monitors = api.get_monitors()
|
|
added_monitors = {m['name']:m['id'] for m in monitors if m['type'] == MonitorType.HTTP}
|
|
|
|
excluded_subdomains = ['traefik', 'monitoring', 'backup']
|
|
for domain in domains:
|
|
if any(domain.startswith(s) for s in excluded_subdomains):
|
|
logging.debug(f"Skip {domain}")
|
|
continue
|
|
if 'matrix' in domain:
|
|
url = f"https://{domain}/.well-known/matrix/server"
|
|
elif 'collab' in domain:
|
|
url = f"https://{domain}/hosting/discovery"
|
|
else:
|
|
url = f"https://{domain}"
|
|
if (monitor:= added_monitors.get(domain)):
|
|
if not missing and not update:
|
|
print(f"{domain} already exists.")
|
|
if not not missing or not update:
|
|
continue
|
|
response = api.edit_monitor(monitor, **monitor_parameter)
|
|
elif missing:
|
|
print(domain)
|
|
continue
|
|
else:
|
|
response = api.add_monitor(type=MonitorType.HTTP, name=domain, url=url, **monitor_parameter)
|
|
print(f"{domain}: {response.get('msg')}")
|
|
|
|
|
|
@cli.command()
|
|
@click.option('recipes', '-r', '--recipe', multiple=True, metavar='<RecipeName>', help='Filter for selcted recipes, this option can be specified multiple times.')
|
|
def backup(recipes: Tuple[str]) -> None:
|
|
"""
|
|
Create a backup for the slected apps. If there are multiple selected apps for one server, a full server backup will be created.
|
|
"""
|
|
server_apps = get_server_apps(recipes)
|
|
server_backupbots = get_server_apps(('backup-bot-two',), all=True)
|
|
for server, apps in server_apps.items():
|
|
backupbot = server_backupbots[server]
|
|
if not backupbot:
|
|
logging.error(f"There is no backupbot for server {server}")
|
|
continue
|
|
backupbot = backupbot[0]
|
|
cmd = ["app" ,"run", backupbot, 'app', '--', 'backup', '-l', 'info']
|
|
if len(apps) == 1:
|
|
cmd = cmd + ['-h', apps[0]]
|
|
cmd.append('create')
|
|
print(f"Run backups on {server}")
|
|
logging.info(f'Run backup: \n\t{" ".join(cmd)}')
|
|
output = abra(*cmd)
|
|
print(output)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
cli()
|