From 54e32ab4222c28a0db91f09d3675561605511873 Mon Sep 17 00:00:00 2001 From: Moritz Date: Wed, 14 May 2025 12:26:20 +0200 Subject: [PATCH] backupbot formatting --- .gitignore | 2 +- backupbot.py | 398 +++++++++++++++++++++++++++++---------------------- 2 files changed, 230 insertions(+), 170 deletions(-) diff --git a/.gitignore b/.gitignore index 1dd0e0a..1d17dae 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -/testing +.venv diff --git a/backupbot.py b/backupbot.py index d47c5d8..63dad7d 100755 --- a/backupbot.py +++ b/backupbot.py @@ -17,47 +17,53 @@ from pathlib import Path from shutil import copyfile, rmtree VOLUME_PATH = "/var/lib/docker/volumes/" -SECRET_PATH = '/secrets/' -SERVICE = 'ALL' +SECRET_PATH = "/secrets/" +SERVICE = "ALL" logger = logging.getLogger("backupbot") -logging.addLevelName(55, 'SUMMARY') -setattr(logging, 'SUMMARY', 55) -setattr(logger, 'summary', lambda message, *args, ** - kwargs: logger.log(55, message, *args, **kwargs)) +logging.addLevelName(55, "SUMMARY") +setattr(logging, "SUMMARY", 55) +setattr( + logger, + "summary", + lambda message, *args, **kwargs: logger.log(55, message, *args, **kwargs), +) def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return - logger.critical("Uncaught exception", exc_info=( - exc_type, exc_value, exc_traceback)) + logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)) sys.excepthook = handle_exception @click.group() -@click.option('-l', '--log', 'loglevel') -@click.option('-m', '--machine-logs', 'machine_logs', is_flag=True, envvar='MACHINE_LOGS') -@click.option('service', '--host', '-h', envvar='SERVICE') -@click.option('repository', '--repo', '-r', envvar='RESTIC_REPOSITORY') +@click.option("-l", "--log", "loglevel") +@click.option( + "-m", "--machine-logs", "machine_logs", is_flag=True, envvar="MACHINE_LOGS" +) +@click.option("service", "--host", "-h", envvar="SERVICE") +@click.option("repository", "--repo", "-r", envvar="RESTIC_REPOSITORY") def cli(loglevel, service, repository, machine_logs): global SERVICE if service: - SERVICE = service.replace('.', '_') + SERVICE = service.replace(".", "_") if repository: - os.environ['RESTIC_REPOSITORY'] = repository + os.environ["RESTIC_REPOSITORY"] = repository if loglevel: numeric_level = getattr(logging, loglevel.upper(), None) if not isinstance(numeric_level, int): - raise ValueError('Invalid log level: %s' % loglevel) + raise ValueError("Invalid log level: %s" % loglevel) logger.setLevel(numeric_level) logHandler = logging.StreamHandler() if machine_logs: formatter = jsonlogger.JsonFormatter( - "%(levelname)s %(filename)s %(lineno)s %(process)d %(message)s", rename_fields={"levelname": "message_type"}) + "%(levelname)s %(filename)s %(lineno)s %(process)d %(message)s", + rename_fields={"levelname": "message_type"}, + ) logHandler.setFormatter(formatter) logger.addHandler(logHandler) @@ -66,18 +72,18 @@ def cli(loglevel, service, repository, machine_logs): def init_repo(): - if repo:= os.environ.get('RESTIC_REPOSITORY_FILE'): + if repo := os.environ.get("RESTIC_REPOSITORY_FILE"): # RESTIC_REPOSITORY_FILE and RESTIC_REPOSITORY are mutually exclusive - del os.environ['RESTIC_REPOSITORY'] + del os.environ["RESTIC_REPOSITORY"] else: - repo = os.environ['RESTIC_REPOSITORY'] + repo = os.environ["RESTIC_REPOSITORY"] restic.repository = repo logger.debug(f"set restic repository location: {repo}") - restic.password_file = '/var/run/secrets/restic_password' + restic.password_file = "/var/run/secrets/restic_password" try: restic.cat.config() except ResticFailedError as error: - if 'unable to open config file' in str(error): + if "unable to open config file" in str(error): result = restic.init() logger.info(f"Initialized restic repo: {result}") else: @@ -86,19 +92,21 @@ def init_repo(): def export_secrets(): for env in os.environ: - if env.endswith('FILE') and not "COMPOSE_FILE" in env: + if env.endswith("FILE") and not "COMPOSE_FILE" in env: logger.debug(f"exported secret: {env}") with open(os.environ[env]) as file: secret = file.read() - os.environ[env.removesuffix('_FILE')] = secret + os.environ[env.removesuffix("_FILE")] = secret # logger.debug(f"Read secret value: {secret}") @cli.command() -@click.option('retries', '--retries', '-r', envvar='RETRIES', default=1) +@click.option("retries", "--retries", "-r", envvar="RETRIES", default=1) def create(retries): app_settings = parse_backup_labels() - pre_commands, post_commands, backup_paths, apps_versions = get_backup_details(app_settings) + pre_commands, post_commands, backup_paths, apps_versions = get_backup_details( + app_settings + ) copy_secrets(apps_versions) backup_paths.append(Path(SECRET_PATH)) run_commands(pre_commands) @@ -107,29 +115,37 @@ def create(retries): @cli.command() -@click.option('snapshot_id', '--snapshot', '-s', envvar='SNAPSHOT', default='latest') -@click.option('target', '--target', '-t', envvar='TARGET', default='/') -@click.option('noninteractive', '--noninteractive', envvar='NONINTERACTIVE', is_flag=True) -@click.option('volumes', '--volumes', '-v', envvar='VOLUMES', multiple=True) -@click.option('container', '--container', '-c', envvar='CONTAINER', multiple=True) -@click.option('no_commands', '--no-commands', envvar='NO_COMMANDS', is_flag=True) +@click.option("snapshot_id", "--snapshot", "-s", envvar="SNAPSHOT", default="latest") +@click.option("target", "--target", "-t", envvar="TARGET", default="/") +@click.option( + "noninteractive", "--noninteractive", envvar="NONINTERACTIVE", is_flag=True +) +@click.option("volumes", "--volumes", "-v", envvar="VOLUMES", multiple=True) +@click.option("container", "--container", "-c", envvar="CONTAINER", multiple=True) +@click.option("no_commands", "--no-commands", envvar="NO_COMMANDS", is_flag=True) def restore(snapshot_id, target, noninteractive, volumes, container, no_commands): - app_settings = parse_backup_labels('restore', container) - if SERVICE != 'ALL': + app_settings = parse_backup_labels("restore", container) + if SERVICE != "ALL": if not app_settings.get(SERVICE): - logger.error(f"The app {SERVICE} is not running, use the restore-path argument to restore paths of undeployed apps") + logger.error( + f"The app {SERVICE} is not running, use the restore-path argument to restore paths of undeployed apps" + ) exit(1) app_settings = {SERVICE: app_settings.get(SERVICE)} - pre_commands, post_commands, backup_paths, apps_versions = get_backup_details(app_settings, volumes) + pre_commands, post_commands, backup_paths, apps_versions = get_backup_details( + app_settings, volumes + ) snapshots = get_snapshots(snapshot_id) if not snapshots: - logger.error(f"No Snapshots with ID {snapshot_id} for {apps_versions.keys()} found.") + logger.error( + f"No Snapshots with ID {snapshot_id} for {apps_versions.keys()} found." + ) exit(1) snapshot = snapshots[0] - snapshot_id = snapshot['short_id'] + snapshot_id = snapshot["short_id"] if not noninteractive: print(f"Snapshot to restore: \t{snapshot_id}") - restore_app_versions = app_versions_from_tags(snapshot.get('tags')) + restore_app_versions = app_versions_from_tags(snapshot.get("tags")) print("Apps:") for app, version in apps_versions.items(): restore_version = restore_app_versions.get(app) @@ -138,17 +154,19 @@ def restore(snapshot_id, target, noninteractive, volumes, container, no_commands print(f"WARNING!!! The running app is deployed with version {version}") print("The following volume paths will be restored:") for p in backup_paths: - print(f'\t{p}') + print(f"\t{p}") if not no_commands: print("The following commands will be executed:") - for container, cmd in list(pre_commands.items()) + list(post_commands.items()): + for container, cmd in list(pre_commands.items()) + list( + post_commands.items() + ): print(f"\t{container.labels['com.docker.swarm.service.name']}:\t{cmd}") - snapshot_date = datetime.fromisoformat(snapshot['time']) + snapshot_date = datetime.fromisoformat(snapshot["time"]) delta = datetime.now(tz=timezone.utc) - snapshot_date print(f"This snapshot is {delta} old") print("\nTHIS COMMAND WILL IRREVERSIBLY OVERWRITES FILES") prompt = input("Type YES (uppercase) to continue: ") - if prompt != 'YES': + if prompt != "YES": logger.error("Restore aborted") exit(1) print(f"Restoring Snapshot {snapshot_id} at {target}") @@ -156,69 +174,78 @@ def restore(snapshot_id, target, noninteractive, volumes, container, no_commands print(f"Run pre commands.") run_commands(pre_commands) if backup_paths: - result = restic_restore(snapshot_id=snapshot_id, include=backup_paths, target_dir=target) + result = restic_restore( + snapshot_id=snapshot_id, include=backup_paths, target_dir=target + ) logger.debug(result) else: - print('No paths to restore.') + print("No paths to restore.") if not no_commands and post_commands: print(f"Run post commands.") run_commands(post_commands) @cli.command() -@click.option('snapshot_id', '--snapshot', '-s', envvar='SNAPSHOT', default='latest') -@click.option('target', '--target', '-t', envvar='TARGET', default='/') -@click.option('noninteractive', '--noninteractive', envvar='NONINTERACTIVE', is_flag=True) -@click.argument('paths', nargs=-1, required=True, envvar='INCLUDE_PATH') +@click.option("snapshot_id", "--snapshot", "-s", envvar="SNAPSHOT", default="latest") +@click.option("target", "--target", "-t", envvar="TARGET", default="/") +@click.option( + "noninteractive", "--noninteractive", envvar="NONINTERACTIVE", is_flag=True +) +@click.argument("paths", nargs=-1, required=True, envvar="INCLUDE_PATH") def restore_path(snapshot_id, target, noninteractive, paths): - """ PATHS: list of paths to restore""" + """PATHS: list of paths to restore""" snapshots = get_snapshots(snapshot_id) if not snapshots: logger.error(f"No Snapshots with ID {snapshot_id} for app {SERVICE} found.") exit(1) snapshot = snapshots[0] - snapshot_id = snapshot['short_id'] + snapshot_id = snapshot["short_id"] if not noninteractive: print(f"Snapshot to restore: \t{snapshot_id}") - restore_app_versions = app_versions_from_tags(snapshot.get('tags')) + restore_app_versions = app_versions_from_tags(snapshot.get("tags")) print("Apps:") for app, version in restore_app_versions.items(): - if SERVICE == 'ALL' or SERVICE == app: + if SERVICE == "ALL" or SERVICE == app: print(f"\t{app} \t {version}") print("The following paths will be restored:") for p in paths: - print(f'\t{p}') - snapshot_date = datetime.fromisoformat(snapshot['time']) + print(f"\t{p}") + snapshot_date = datetime.fromisoformat(snapshot["time"]) delta = datetime.now(tz=timezone.utc) - snapshot_date print(f"This snapshot is {delta} old") print("\nTHIS COMMAND WILL IRREVERSIBLY OVERWRITES FILES") prompt = input("Type YES (uppercase) to continue: ") - if prompt != 'YES': + if prompt != "YES": logger.error("Restore aborted") exit(1) print(f"Restoring Snapshot {snapshot_id} at {target}") result = restic_restore(snapshot_id=snapshot_id, include=paths, target_dir=target) logger.debug(result) + def restic_restore(snapshot_id, include=[], target_dir=None): - cmd = restic.cat.base_command() + ['restore', snapshot_id] + cmd = restic.cat.base_command() + ["restore", snapshot_id] for path in include: - cmd.extend(['--include', path]) + cmd.extend(["--include", path]) if target_dir: - cmd.extend(['--target', target_dir]) + cmd.extend(["--target", target_dir]) return restic.internal.command_executor.execute(cmd) def get_snapshots(snapshot_id=None): - if snapshot_id and snapshot_id != 'latest': + if snapshot_id and snapshot_id != "latest": snapshots = restic.snapshots(snapshot_id=snapshot_id) - if not SERVICE in app_versions_from_tags(snapshots[0].get('tags')): - logger.error(f'Snapshot with ID {snapshot_id} does not contain {SERVICE}') + if not SERVICE in app_versions_from_tags(snapshots[0].get("tags")): + logger.error(f"Snapshot with ID {snapshot_id} does not contain {SERVICE}") exit(1) else: snapshots = restic.snapshots() - snapshots = list(filter(lambda x: SERVICE in app_versions_from_tags(x.get('tags')), snapshots)) - if snapshot_id == 'latest': + snapshots = list( + filter( + lambda x: SERVICE in app_versions_from_tags(x.get("tags")), snapshots + ) + ) + if snapshot_id == "latest": return snapshots[-1:] else: return snapshots @@ -226,54 +253,63 @@ def get_snapshots(snapshot_id=None): def app_versions_from_tags(tags): if tags: - app_versions = map(lambda x: x.split(':'), tags) + app_versions = map(lambda x: x.split(":"), tags) return {i[0]: i[1] if len(i) > 1 else None for i in app_versions} else: return {} + def str2bool(value: str) -> bool: - return value.lower() in ("yes", "true", "t", "1") + return value.lower() in ("yes", "true", "t", "1") -def parse_backup_labels(hook_type='backup', selected_container=[]): +def parse_backup_labels(hook_type="backup", selected_container=[]): client = docker.from_env() container_by_service = { - c.labels.get('com.docker.swarm.service.name'): c for c in client.containers.list()} + c.labels.get("com.docker.swarm.service.name"): c + for c in client.containers.list() + } services = client.services.list() app_settings = {} for s in services: - specs = s.attrs['Spec'] - labels = specs['Labels'] - stack_name = labels['com.docker.stack.namespace'] + specs = s.attrs["Spec"] + labels = specs["Labels"] + stack_name = labels["com.docker.stack.namespace"] container_name = s.name.removeprefix(f"{stack_name}_") - version = labels.get(f'coop-cloud.{stack_name}.version') + version = labels.get(f"coop-cloud.{stack_name}.version") settings = app_settings[stack_name] = app_settings.get(stack_name) or {} - if (backup := labels.get('backupbot.backup')) and str2bool(backup): - settings['enabled'] = True + if (backup := labels.get("backupbot.backup")) and str2bool(backup): + settings["enabled"] = True if version: - settings['version'] = version + settings["version"] = version if selected_container and container_name not in selected_container: logger.debug(f"Skipping {s.name} because it's not a selected container") continue - if mounts:= specs['TaskTemplate']['ContainerSpec'].get('Mounts'): + if mounts := specs["TaskTemplate"]["ContainerSpec"].get("Mounts"): volumes = parse_volumes(stack_name, mounts) - volumes.update(settings.get('volumes') or {}) - settings['volumes'] = volumes + volumes.update(settings.get("volumes") or {}) + settings["volumes"] = volumes excluded_volumes, included_volume_paths = parse_excludes_includes(labels) - settings['excluded_volumes'] = excluded_volumes.union(settings.get('excluded_volumes') or set()) - settings['included_volume_paths'] = included_volume_paths.union(settings.get('included_volume_paths') or set()) + settings["excluded_volumes"] = excluded_volumes.union( + settings.get("excluded_volumes") or set() + ) + settings["included_volume_paths"] = included_volume_paths.union( + settings.get("included_volume_paths") or set() + ) if container := container_by_service.get(s.name): - if command := labels.get(f'backupbot.{hook_type}.pre-hook'): - if not (pre_hooks:= settings.get('pre_hooks')): - pre_hooks = settings['pre_hooks'] = {} + if command := labels.get(f"backupbot.{hook_type}.pre-hook"): + if not (pre_hooks := settings.get("pre_hooks")): + pre_hooks = settings["pre_hooks"] = {} pre_hooks[container] = command - if command := labels.get(f'backupbot.{hook_type}.post-hook'): - if not (post_hooks:= settings.get('post_hooks')): - post_hooks = settings['post_hooks'] = {} + if command := labels.get(f"backupbot.{hook_type}.post-hook"): + if not (post_hooks := settings.get("post_hooks")): + post_hooks = settings["post_hooks"] = {} post_hooks[container] = command else: logger.debug(f"Container {s.name} is not running.") - if labels.get(f'backupbot.{hook_type}.pre-hook') or labels.get(f'backupbot.{hook_type}.post-hook'): + if labels.get(f"backupbot.{hook_type}.pre-hook") or labels.get( + f"backupbot.{hook_type}.post-hook" + ): logger.error(f"Container {s.name} contain hooks but it's not running") return app_settings @@ -281,47 +317,53 @@ def parse_backup_labels(hook_type='backup', selected_container=[]): def get_backup_details(app_settings, volumes=[]): backup_paths = set() backup_apps_versions = {} - pre_hooks= {} + pre_hooks = {} post_hooks = {} for app, settings in app_settings.items(): - if settings.get('enabled'): - if SERVICE != 'ALL' and SERVICE != app: + if settings.get("enabled"): + if SERVICE != "ALL" and SERVICE != app: continue - backup_apps_versions[app] = settings.get('version') + backup_apps_versions[app] = settings.get("version") add_backup_paths(backup_paths, settings, app, volumes) - if hooks:= settings.get('pre_hooks'): + if hooks := settings.get("pre_hooks"): pre_hooks.update(hooks) - if hooks:= settings.get('post_hooks'): + if hooks := settings.get("post_hooks"): post_hooks.update(hooks) return pre_hooks, post_hooks, list(backup_paths), backup_apps_versions def add_backup_paths(backup_paths, settings, app, selected_volumes): - if (volumes := settings.get('volumes')): - if includes:= settings.get('included_volume_paths'): + if volumes := settings.get("volumes"): + if includes := settings.get("included_volume_paths"): included_volumes = list(zip(*includes))[0] for volume, rel_paths in includes: - if not (volume_path:= volumes.get(volume)): - logger.error(f'Can not find volume with the name {volume} for {app}') + if not (volume_path := volumes.get(volume)): + logger.error( + f"Can not find volume with the name {volume} for {app}" + ) continue if selected_volumes and volume not in selected_volumes: - logger.debug(f'Skipping {volume}:{rel_paths} because the volume is not selected') + logger.debug( + f"Skipping {volume}:{rel_paths} because the volume is not selected" + ) continue for p in rel_paths: absolute_path = Path(f"{volume_path}/{p}") backup_paths.add(absolute_path) else: included_volumes = [] - excluded_volumes = settings.get('excluded_volumes') or [] + excluded_volumes = settings.get("excluded_volumes") or [] for name, path in volumes.items(): if selected_volumes and name not in selected_volumes: - logger.debug(f'Skipping volume: {name} because the volume is not selected') + logger.debug( + f"Skipping volume: {name} because the volume is not selected" + ) continue if name in excluded_volumes: - logger.debug(f'Skipping volume: {name} because the volume is excluded') + logger.debug(f"Skipping volume: {name} because the volume is excluded") continue if name in included_volumes: - logger.debug(f'Skipping volume: {name} because a path is selected') + logger.debug(f"Skipping volume: {name} because a path is selected") continue backup_paths.add(path) else: @@ -331,10 +373,10 @@ def add_backup_paths(backup_paths, settings, app, selected_volumes): def parse_volumes(stack_name, mounts): volumes = {} for m in mounts: - if m['Type'] != 'volume': + if m["Type"] != "volume": continue - relative_path = m['Source'] - name = relative_path.removeprefix(stack_name + '_') + relative_path = m["Source"] + name = relative_path.removeprefix(stack_name + "_") absolute_path = Path(f"{VOLUME_PATH}{relative_path}/_data/") volumes[name] = absolute_path return volumes @@ -344,10 +386,12 @@ def parse_excludes_includes(labels): excluded_volumes = set() included_volume_paths = set() for label, value in labels.items(): - if label.startswith('backupbot.backup.volumes.'): - volume_name = label.removeprefix('backupbot.backup.volumes.').removesuffix('.path') - if label.endswith('path'): - relative_paths = tuple(value.split(',')) + if label.startswith("backupbot.backup.volumes."): + volume_name = label.removeprefix("backupbot.backup.volumes.").removesuffix( + ".path" + ) + if label.endswith("path"): + relative_paths = tuple(value.split(",")) included_volume_paths.add((volume_name, relative_paths)) elif not str2bool(value): excluded_volumes.add(volume_name) @@ -360,24 +404,29 @@ def copy_secrets(apps): os.mkdir(SECRET_PATH) client = docker.from_env() container_by_service = { - c.labels.get('com.docker.swarm.service.name'): c for c in client.containers.list()} + c.labels.get("com.docker.swarm.service.name"): c + for c in client.containers.list() + } services = client.services.list() for s in services: - app_name = s.attrs['Spec']['Labels']['com.docker.stack.namespace'] - if (app_name in apps and - (app_secs := s.attrs['Spec']['TaskTemplate']['ContainerSpec'].get('Secrets'))): + app_name = s.attrs["Spec"]["Labels"]["com.docker.stack.namespace"] + if app_name in apps and ( + app_secs := s.attrs["Spec"]["TaskTemplate"]["ContainerSpec"].get("Secrets") + ): if not container_by_service.get(s.name): logger.warning( - f"Container {s.name} is not running, secrets can not be copied.") + f"Container {s.name} is not running, secrets can not be copied." + ) continue container_id = container_by_service[s.name].id for sec in app_secs: - src = f'/var/lib/docker/containers/{container_id}/mounts/secrets/{sec["SecretID"]}' + src = f"/var/lib/docker/containers/{container_id}/mounts/secrets/{sec['SecretID']}" if not Path(src).exists(): logger.error( - f"For the secret {sec['SecretName']} the file {src} does not exist for {s.name}") + f"For the secret {sec['SecretName']} the file {src} does not exist for {s.name}" + ) continue - dst = SECRET_PATH + sec['SecretName'] + dst = SECRET_PATH + sec["SecretName"] logger.debug(f"Copy Secret {sec['SecretName']}") copyfile(src, dst) @@ -387,9 +436,15 @@ def run_commands(commands): if not command: continue # Remove bash/sh wrapping - command = command.removeprefix('bash -c').removeprefix('sh -c').removeprefix(' ') + command = ( + command.removeprefix("bash -c").removeprefix("sh -c").removeprefix(" ") + ) # Remove quotes surrounding the command - if (len(command) >= 2 and command[0] == command[-1] and (command[0] == "'" or command[0] == '"')): + if ( + len(command) >= 2 + and command[0] == command[-1] + and (command[0] == "'" or command[0] == '"') + ): command = command[1:-1] # Use bash's pipefail to return exit codes inside a pipe to prevent silent failure command = f"bash -c 'set -o pipefail;{command}'" @@ -398,7 +453,8 @@ def run_commands(commands): result = container.exec_run(command) if result.exit_code: logger.error( - f"Failed to run command {command} in {container.name}: {result.output.decode()}") + f"Failed to run command {command} in {container.name}: {result.output.decode()}" + ) else: logger.debug(result.output.decode()) @@ -410,15 +466,17 @@ def backup_volumes(backup_paths, apps_versions, retries, dry_run=False): logger.info("\n".join(map(str, backup_paths))) backup_paths = list(filter(path_exists, backup_paths)) cmd = restic.cat.base_command() - parent = get_snapshots('latest') + parent = get_snapshots("latest") if parent: # https://restic.readthedocs.io/en/stable/040_backup.html#file-change-detection - cmd.extend(['--parent', parent[0]['short_id']]) - tags = [f"{app}:{version}" for app,version in apps_versions.items()] - if SERVICE == 'ALL': + cmd.extend(["--parent", parent[0]["short_id"]]) + tags = [f"{app}:{version}" for app, version in apps_versions.items()] + if SERVICE == "ALL": tags.append(SERVICE) logger.info("Start volume backup") - result = restic.internal.backup.run(cmd, backup_paths, dry_run=dry_run, tags=tags) + result = restic.internal.backup.run( + cmd, backup_paths, dry_run=dry_run, tags=tags + ) logger.summary("backup finished", extra=result) return except ResticFailedError as error: @@ -432,7 +490,7 @@ def backup_volumes(backup_paths, apps_versions, retries, dry_run=False): def path_exists(path): if not path.exists(): - logger.error(f'{path} does not exist') + logger.error(f"{path} does not exist") return path.exists() @@ -440,37 +498,39 @@ def path_exists(path): def snapshots(): snapshots = get_snapshots() for snap in snapshots: - output = [snap['time'].split('.')[0], snap['short_id']] - if tags:= snap.get('tags'): + output = [snap["time"].split(".")[0], snap["short_id"]] + if tags := snap.get("tags"): app_versions = app_versions_from_tags(tags) - if version:= app_versions.get(SERVICE): + if version := app_versions.get(SERVICE): output.append(version) print(*output) if not snapshots: err_msg = "No Snapshots found" - if SERVICE != 'ALL': - service_name = SERVICE.replace('_', '.') - err_msg += f' for app {service_name}' + if SERVICE != "ALL": + service_name = SERVICE.replace("_", ".") + err_msg += f" for app {service_name}" logger.warning(err_msg) @cli.command() -@click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest') -@click.option('show_all', '--all', '-a', envvar='SHOW_ALL', is_flag=True) -@click.option('timestamps', '--timestamps', '-t', envvar='TIMESTAMPS', is_flag=True) -@click.argument('path', required=False, default='/var/lib/docker/volumes/', envvar='INCLUDE_PATH') +@click.option("snapshot", "--snapshot", "-s", envvar="SNAPSHOT", default="latest") +@click.option("show_all", "--all", "-a", envvar="SHOW_ALL", is_flag=True) +@click.option("timestamps", "--timestamps", "-t", envvar="TIMESTAMPS", is_flag=True) +@click.argument( + "path", required=False, default="/var/lib/docker/volumes/", envvar="INCLUDE_PATH" +) def ls(snapshot, show_all, timestamps, path): - if snapshot == 'latest': - latest_snapshot = get_snapshots('latest') + if snapshot == "latest": + latest_snapshot = get_snapshots("latest") if not latest_snapshot: logger.error(f"There is no latest snapshot for {SERVICE}") exit(1) - snapshot = latest_snapshot[0]['short_id'] + snapshot = latest_snapshot[0]["short_id"] if show_all: path = None results = list_files(snapshot, path) for r in results: - if r.get('path'): + if r.get("path"): if timestamps: print(f"{r['ctime']}\t{r['path']}") else: @@ -478,94 +538,93 @@ def ls(snapshot, show_all, timestamps, path): def list_files(snapshot, path): - cmd = restic.cat.base_command() + ['ls'] + cmd = restic.cat.base_command() + ["ls"] cmd.append(snapshot) if path: cmd.append(path) try: output = restic.internal.command_executor.execute(cmd) except ResticFailedError as error: - if 'no snapshot found' in str(error): + if "no snapshot found" in str(error): err_msg = f'There is no snapshot "{snapshot}"' - if SERVICE != 'ALL': + if SERVICE != "ALL": err_msg += f' for the app "{SERVICE}"' logger.error(err_msg) exit(1) else: raise error - output = output.replace('}\n{', '}|{') - results = list(map(json.loads, output.split('|'))) + output = output.replace("}\n{", "}|{") + results = list(map(json.loads, output.split("|"))) return results @cli.command() -@click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest') -@click.option('path', '--path', '-p', envvar='INCLUDE_PATH') -@click.option('volumes', '--volumes', '-v', envvar='VOLUMES') -@click.option('secrets', '--secrets', '-c', is_flag=True, envvar='SECRETS') +@click.option("snapshot", "--snapshot", "-s", envvar="SNAPSHOT", default="latest") +@click.option("path", "--path", "-p", envvar="INCLUDE_PATH") +@click.option("volumes", "--volumes", "-v", envvar="VOLUMES") +@click.option("secrets", "--secrets", "-c", is_flag=True, envvar="SECRETS") def download(snapshot, path, volumes, secrets): file_dumps = [] - if snapshot == 'latest': - latest_snapshot = get_snapshots('latest') + if snapshot == "latest": + latest_snapshot = get_snapshots("latest") if not latest_snapshot: logger.error(f"There is no latest snapshot for {SERVICE}") exit(1) - snapshot = latest_snapshot[0]['short_id'] + snapshot = latest_snapshot[0]["short_id"] if not any([path, volumes, secrets]): volumes = secrets = True if path: - path = path.removesuffix('/') + path = path.removesuffix("/") binary_output = dump(snapshot, path) files = list_files(snapshot, path) - filetype = [f.get('type') for f in files if f.get('path') == path][0] + filetype = [f.get("type") for f in files if f.get("path") == path][0] filename = Path(path).name - if filetype == 'dir': + if filetype == "dir": filename = filename + ".tar" tarinfo = tarfile.TarInfo(name=filename) tarinfo.size = len(binary_output) file_dumps.append((binary_output, tarinfo)) if volumes: - if SERVICE == 'ALL': + if SERVICE == "ALL": logger.error("Please specify '--host' when using '--volumes'") exit(1) files = list_files(snapshot, VOLUME_PATH) for f in files[1:]: - path = f['path'] - if Path(path).name.startswith(SERVICE) and f['type'] == 'dir': + path = f["path"] + if Path(path).name.startswith(SERVICE) and f["type"] == "dir": binary_output = dump(snapshot, path) filename = f"{Path(path).name}.tar" tarinfo = tarfile.TarInfo(name=filename) tarinfo.size = len(binary_output) file_dumps.append((binary_output, tarinfo)) if secrets: - if SERVICE == 'ALL': + if SERVICE == "ALL": logger.error("Please specify '--host' when using '--secrets'") exit(1) filename = f"{SERVICE}.json" files = list_files(snapshot, SECRET_PATH) secrets = {} for f in files[1:]: - path = f['path'] - if Path(path).name.startswith(SERVICE) and f['type'] == 'file': + path = f["path"] + if Path(path).name.startswith(SERVICE) and f["type"] == "file": secret = dump(snapshot, path).decode() - secret_name = path.removeprefix(f'{SECRET_PATH}{SERVICE}_') + secret_name = path.removeprefix(f"{SECRET_PATH}{SERVICE}_") secrets[secret_name] = secret binary_output = json.dumps(secrets).encode() tarinfo = tarfile.TarInfo(name=filename) tarinfo.size = len(binary_output) file_dumps.append((binary_output, tarinfo)) - with tarfile.open('/tmp/backup.tar.gz', "w:gz") as tar: + with tarfile.open("/tmp/backup.tar.gz", "w:gz") as tar: print(f"Writing files to /tmp/backup.tar.gz...") for binary_output, tarinfo in file_dumps: tar.addfile(tarinfo, fileobj=io.BytesIO(binary_output)) - size = get_formatted_size('/tmp/backup.tar.gz') - print( - f"Backup has been written to /tmp/backup.tar.gz with a size of {size}") + size = get_formatted_size("/tmp/backup.tar.gz") + print(f"Backup has been written to /tmp/backup.tar.gz with a size of {size}") def get_formatted_size(file_path): file_size = os.path.getsize(file_path) - units = ['Bytes', 'KB', 'MB', 'GB', 'TB'] + units = ["Bytes", "KB", "MB", "GB", "TB"] for unit in units: if file_size < 1024: return f"{round(file_size, 3)} {unit}" @@ -574,16 +633,17 @@ def get_formatted_size(file_path): def dump(snapshot, path): - cmd = restic.cat.base_command() + ['dump'] + cmd = restic.cat.base_command() + ["dump"] cmd = cmd + [snapshot, path] print(f"Dumping {path} from snapshot '{snapshot}'") output = subprocess.run(cmd, capture_output=True) if output.returncode: logger.error( - f"error while dumping {path} from snapshot '{snapshot}': {output.stderr}") + f"error while dumping {path} from snapshot '{snapshot}': {output.stderr}" + ) exit(1) return output.stdout -if __name__ == '__main__': +if __name__ == "__main__": cli()