#!/usr/bin/python3 import os import sys import click import json import subprocess import logging import docker import restic import tarfile import io from pythonjsonlogger import jsonlogger from datetime import datetime, timezone from restic.errors import ResticFailedError from pathlib import Path from shutil import copyfile, rmtree VOLUME_PATH = "/var/lib/docker/volumes/" SECRET_PATH = '/secrets/' SERVICE = 'ALL' logger = logging.getLogger("backupbot") logging.addLevelName(55, 'SUMMARY') setattr(logging, 'SUMMARY', 55) setattr(logger, 'summary', lambda message, *args, ** kwargs: logger.log(55, message, *args, **kwargs)) def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return logger.critical("Uncaught exception", exc_info=( exc_type, exc_value, exc_traceback)) sys.excepthook = handle_exception @click.group() @click.option('-l', '--log', 'loglevel') @click.option('-m', '--machine-logs', 'machine_logs', is_flag=True) @click.option('service', '--host', '-h', envvar='SERVICE') @click.option('repository', '--repo', '-r', envvar='RESTIC_REPOSITORY') def cli(loglevel, service, repository, machine_logs): global SERVICE if service: SERVICE = service.replace('.', '_') if repository: os.environ['RESTIC_REPOSITORY'] = repository if loglevel: numeric_level = getattr(logging, loglevel.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % loglevel) logger.setLevel(numeric_level) logHandler = logging.StreamHandler() if machine_logs: formatter = jsonlogger.JsonFormatter( "%(levelname)s %(filename)s %(lineno)s %(process)d %(message)s", rename_fields={"levelname": "message_type"}) logHandler.setFormatter(formatter) logger.addHandler(logHandler) export_secrets() init_repo() def init_repo(): if repo:= os.environ.get('RESTIC_REPOSITORY_FILE'): # RESTIC_REPOSITORY_FILE and RESTIC_REPOSITORY are mutually exclusive del os.environ['RESTIC_REPOSITORY'] else: repo = os.environ['RESTIC_REPOSITORY'] restic.repository = repo logger.debug(f"set restic repository location: {repo}") restic.password_file = '/var/run/secrets/restic_password' try: restic.cat.config() except ResticFailedError as error: if 'unable to open config file' in str(error): result = restic.init() logger.info(f"Initialized restic repo: {result}") else: raise error def export_secrets(): for env in os.environ: if env.endswith('FILE') and not "COMPOSE_FILE" in env: logger.debug(f"exported secret: {env}") with open(os.environ[env]) as file: secret = file.read() os.environ[env.removesuffix('_FILE')] = secret # logger.debug(f"Read secret value: {secret}") @cli.command() @click.option('retries', '--retries', '-r', envvar='RETRIES', default=1) def create(retries): app_settings = parse_backup_labels() pre_commands, post_commands, backup_paths, apps = get_backup_details(app_settings) copy_secrets(apps) backup_paths.append(Path(SECRET_PATH)) run_commands(pre_commands) backup_volumes(backup_paths, apps, int(retries)) run_commands(post_commands) @cli.command() @click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest') @click.option('target', '--target', '-t', envvar='TARGET', default='/') @click.option('noninteractive', '--noninteractive', envvar='NONINTERACTIVE', is_flag=True) @click.option('volumes', '--volumes', '-v', envvar='VOLUMES', multiple=True) @click.option('container', '--container', '-c', envvar='CONTAINER', multiple=True) def restore(snapshot, target, noninteractive, volumes, container): app_settings = parse_backup_labels('restore', container) if SERVICE != 'ALL': app_settings = {SERVICE: app_settings[SERVICE]} pre_commands, post_commands, backup_paths, apps = get_backup_details(app_settings, volumes) snapshots = get_snapshots(snapshot_id=snapshot) if not snapshot: logger.error("No Snapshots with ID {snapshots} for {apps} found.") exit(1) if not noninteractive: snapshot_date = datetime.fromisoformat(snapshots[0]['time']) delta = datetime.now(tz=timezone.utc) - snapshot_date print(f"You are going to restore Snapshot {snapshot} of {apps} at {target}") print("The following volume paths will be restored:") for p in backup_paths: print(f'\t{p}') print(f"This snapshot is {delta} old") print( f"THIS COMMAND WILL IRREVERSIBLY OVERWRITES FILES AT {target}") prompt = input("Type YES (uppercase) to continue: ") if prompt != 'YES': logger.error("Restore aborted") exit(1) print(f"Restoring Snapshot {snapshot} at {target}") run_commands(pre_commands) result = restic_restore(snapshot_id=snapshot, include=backup_paths, target_dir=target) run_commands(post_commands) logger.debug(result) def restic_restore(snapshot_id='latest', include=[], target_dir=None): cmd = restic.cat.base_command() + ['restore', snapshot_id] for path in include: cmd.extend(['--include', path]) if target_dir: cmd.extend(['--target', target_dir]) return restic.internal.command_executor.execute(cmd) def get_snapshots(snapshot_id=None): if snapshot_id and snapshot_id != 'latest': snapshots = restic.snapshots(snapshot_id=snapshot_id) if SERVICE not in snapshots[0]['tags']: logger.error(f'Snapshot with ID {snapshot_id} does not contain {SERVICE}') exit(1) else: snapshots = restic.snapshots() snapshots = list(filter(lambda x: x.get('tags') and SERVICE in x.get('tags'), snapshots)) if snapshot_id == 'latest': return snapshots[-1:] else: return snapshots def parse_backup_labels(hook_type='backup', selected_container=[]): client = docker.from_env() container_by_service = { c.labels.get('com.docker.swarm.service.name'): c for c in client.containers.list()} services = client.services.list() app_settings = {} for s in services: specs = s.attrs['Spec'] labels = specs['Labels'] stack_name = labels['com.docker.stack.namespace'] container_name = s.name.removeprefix(f"{stack_name}_") settings = app_settings[stack_name] = app_settings.get(stack_name) or {} if (backup := labels.get('backupbot.backup')) and bool(backup): settings['enabled'] = True if selected_container and container_name not in selected_container: logger.debug(f"Skipping {s.name} because it's not a selected container") continue if mounts:= specs['TaskTemplate']['ContainerSpec'].get('Mounts'): volumes = parse_volumes(stack_name, mounts) volumes.update(settings.get('volumes') or {}) settings['volumes'] = volumes excluded_volumes, included_volume_paths = parse_excludes_includes(labels) settings['excluded_volumes'] = excluded_volumes.union(settings.get('excluded_volumes') or set()) settings['included_volume_paths'] = included_volume_paths.union(settings.get('included_volume_paths') or set()) if container := container_by_service.get(s.name): if command := labels.get(f'backupbot.{hook_type}.pre-hook'): if not (pre_hooks:= settings.get('pre_hooks')): pre_hooks = settings['pre_hooks'] = {} pre_hooks[container] = command if command := labels.get(f'backupbot.{hook_type}.post-hook'): if not (post_hooks:= settings.get('post_hooks')): post_hooks = settings['post_hooks'] = {} post_hooks[container] = command else: logger.error( f"Container {s.name} is not running, hooks can not be executed") return app_settings def get_backup_details(app_settings, volumes=[]): backup_paths = set() backup_apps = [] pre_hooks= {} post_hooks = {} for app, settings in app_settings.items(): if settings.get('enabled'): if SERVICE != 'ALL' and SERVICE != app: continue backup_apps.append(app) add_backup_paths(backup_paths, settings, app, volumes) if hooks:= settings.get('pre_hooks'): pre_hooks.update(hooks) if hooks:= settings.get('post_hooks'): post_hooks.update(hooks) return pre_hooks, post_hooks, list(backup_paths), backup_apps def add_backup_paths(backup_paths, settings, app, selected_volumes): if (volumes := settings.get('volumes')): if includes:= settings.get('included_volume_paths'): included_volumes = list(zip(*includes))[0] for volume, rel_paths in includes: if not (volume_path:= volumes.get(volume)): logger.error(f'Can not find volume with the name {volume}') continue if selected_volumes and volume not in selected_volumes: logger.debug(f'Skipping {volume}:{rel_paths} because the volume is not selected') continue for p in rel_paths: absolute_path = Path(f"{volume_path}/{p}") backup_paths.add(absolute_path) else: included_volumes = [] excluded_volumes = settings.get('excluded_volumes') or [] for name, path in volumes.items(): if selected_volumes and name not in selected_volumes: logger.debug(f'Skipping volume: {name} because the volume is not selected') continue if name in excluded_volumes: logger.debug(f'Skipping volume: {name} because the volume is excluded') continue if name in included_volumes: logger.debug(f'Skipping volume: {name} because a path is selected') continue backup_paths.add(path) else: logger.warning(f"{app} does not contain any volumes") def parse_volumes(stack_name, mounts): volumes = {} for m in mounts: if m['Type'] != 'volume': continue relative_path = m['Source'] name = relative_path.removeprefix(stack_name + '_') absolute_path = Path(f"{VOLUME_PATH}{relative_path}/_data/") volumes[name] = absolute_path return volumes def parse_excludes_includes(labels): excluded_volumes = set() included_volume_paths = set() for label, value in labels.items(): if label.startswith('backupbot.backup.volumes.'): volume_name = label.removeprefix('backupbot.backup.volumes.').removesuffix('.path') if label.endswith('path'): relative_paths = tuple(value.split(',')) included_volume_paths.add((volume_name, relative_paths)) elif bool(value): excluded_volumes.add(volume_name) return excluded_volumes, included_volume_paths def copy_secrets(apps): # TODO: check if it is deployed rmtree(SECRET_PATH, ignore_errors=True) os.mkdir(SECRET_PATH) client = docker.from_env() container_by_service = { c.labels.get('com.docker.swarm.service.name'): c for c in client.containers.list()} services = client.services.list() for s in services: app_name = s.attrs['Spec']['Labels']['com.docker.stack.namespace'] if (app_name in apps and (app_secs := s.attrs['Spec']['TaskTemplate']['ContainerSpec'].get('Secrets'))): if not container_by_service.get(s.name): logger.warning( f"Container {s.name} is not running, secrets can not be copied.") continue container_id = container_by_service[s.name].id for sec in app_secs: src = f'/var/lib/docker/containers/{container_id}/mounts/secrets/{sec["SecretID"]}' if not Path(src).exists(): logger.error( f"For the secret {sec['SecretName']} the file {src} does not exist for {s.name}") continue dst = SECRET_PATH + sec['SecretName'] logger.debug(f"Copy Secret {sec['SecretName']}") copyfile(src, dst) def run_commands(commands): for container, command in commands.items(): if not command: continue # Remove bash/sh wrapping command = command.removeprefix('bash -c').removeprefix('sh -c').removeprefix(' ') # Remove quotes surrounding the command if (len(command) >= 2 and command[0] == command[-1] and (command[0] == "'" or command[0] == '"')): command = command[1:-1] # Use bash's pipefail to return exit codes inside a pipe to prevent silent failure command = f"bash -c 'set -o pipefail;{command}'" logger.info(f"run command in {container.name}:") logger.info(command) result = container.exec_run(command) if result.exit_code: logger.error( f"Failed to run command {command} in {container.name}: {result.output.decode()}") else: logger.info(result.output.decode()) def backup_volumes(backup_paths, apps, retries, dry_run=False): while True: try: logger.info("Backup these paths:") logger.debug("\n".join(map(str, backup_paths))) backup_paths = list(filter(path_exists, backup_paths)) cmd = restic.cat.base_command() parent = get_snapshots('latest') if parent: # https://restic.readthedocs.io/en/stable/040_backup.html#file-change-detection cmd.extend(['--parent', parent[0]['short_id']]) tags = set(apps + [SERVICE]) logger.info("Start volume backup") result = restic.internal.backup.run(cmd, backup_paths, dry_run=dry_run, tags=tags) logger.summary("backup finished", extra=result) return except ResticFailedError as error: logger.error( f"Backup failed for {apps}. Could not Backup these paths: {backup_paths}") logger.error(error, exc_info=True) if retries > 0: retries -= 1 else: exit(1) def path_exists(path): if not path.exists(): logger.error(f'{path} does not exist') return path.exists() @cli.command() def snapshots(): snapshots = get_snapshots() for snap in snapshots: print(snap['time'], snap['id']) if not snapshots: err_msg = "No Snapshots found" if SERVICE != 'ALL': service_name = SERVICE.replace('_', '.') err_msg += f' for app {service_name}' logger.warning(err_msg) @cli.command() @click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest') @click.option('path', '--path', '-p', envvar='INCLUDE_PATH') def ls(snapshot, path): results = list_files(snapshot, path) for r in results: if r.get('path'): print(f"{r['ctime']}\t{r['path']}") def list_files(snapshot, path): cmd = restic.cat.base_command() + ['ls'] cmd = cmd + ['--tag', SERVICE] cmd.append(snapshot) if path: cmd.append(path) try: output = restic.internal.command_executor.execute(cmd) except ResticFailedError as error: if 'no snapshot found' in str(error): err_msg = f'There is no snapshot "{snapshot}"' if SERVICE != 'ALL': err_msg += f' for the app "{SERVICE}"' logger.error(err_msg) exit(1) else: raise error output = output.replace('}\n{', '}|{') results = list(map(json.loads, output.split('|'))) return results @cli.command() @click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest') @click.option('path', '--path', '-p', envvar='INCLUDE_PATH') @click.option('volumes', '--volumes', '-v', envvar='VOLUMES') @click.option('secrets', '--secrets', '-c', is_flag=True, envvar='SECRETS') def download(snapshot, path, volumes, secrets): file_dumps = [] if not any([path, volumes, secrets]): volumes = secrets = True if path: path = path.removesuffix('/') binary_output = dump(snapshot, path) files = list_files(snapshot, path) filetype = [f.get('type') for f in files if f.get('path') == path][0] filename = Path(path).name if filetype == 'dir': filename = filename + ".tar" tarinfo = tarfile.TarInfo(name=filename) tarinfo.size = len(binary_output) file_dumps.append((binary_output, tarinfo)) if volumes: if SERVICE == 'ALL': logger.error("Please specify '--host' when using '--volumes'") exit(1) files = list_files(snapshot, VOLUME_PATH) for f in files[1:]: path = f['path'] if Path(path).name.startswith(SERVICE) and f['type'] == 'dir': binary_output = dump(snapshot, path) filename = f"{Path(path).name}.tar" tarinfo = tarfile.TarInfo(name=filename) tarinfo.size = len(binary_output) file_dumps.append((binary_output, tarinfo)) if secrets: if SERVICE == 'ALL': logger.error("Please specify '--host' when using '--secrets'") exit(1) filename = f"{SERVICE}.json" files = list_files(snapshot, SECRET_PATH) secrets = {} for f in files[1:]: path = f['path'] if Path(path).name.startswith(SERVICE) and f['type'] == 'file': secret = dump(snapshot, path).decode() secret_name = path.removeprefix(f'{SECRET_PATH}{SERVICE}_') secrets[secret_name] = secret binary_output = json.dumps(secrets).encode() tarinfo = tarfile.TarInfo(name=filename) tarinfo.size = len(binary_output) file_dumps.append((binary_output, tarinfo)) with tarfile.open('/tmp/backup.tar.gz', "w:gz") as tar: print(f"Writing files to /tmp/backup.tar.gz...") for binary_output, tarinfo in file_dumps: tar.addfile(tarinfo, fileobj=io.BytesIO(binary_output)) size = get_formatted_size('/tmp/backup.tar.gz') print( f"Backup has been written to /tmp/backup.tar.gz with a size of {size}") def get_formatted_size(file_path): file_size = os.path.getsize(file_path) units = ['Bytes', 'KB', 'MB', 'GB', 'TB'] for unit in units: if file_size < 1024: return f"{round(file_size, 3)} {unit}" file_size /= 1024 return f"{round(file_size, 3)} {units[-1]}" def dump(snapshot, path): cmd = restic.cat.base_command() + ['dump'] cmd = cmd + ['--tag', SERVICE] cmd = cmd + [snapshot, path] print(f"Dumping {path} from snapshot '{snapshot}'") output = subprocess.run(cmd, capture_output=True) if output.returncode: logger.error( f"error while dumping {path} from snapshot '{snapshot}': {output.stderr}") exit(1) return output.stdout if __name__ == '__main__': cli()