backup-bot-two/backupbot.py
Moritz f40eb00435
All checks were successful
continuous-integration/drone/push Build is passing
Cleaner output for snapshots closes #63
2024-10-29 12:29:35 +01:00

590 lines
24 KiB
Python
Executable File

#!/usr/bin/python3
import os
import sys
import click
import json
import subprocess
import logging
import docker
import restic
import tarfile
import io
from pythonjsonlogger import jsonlogger
from datetime import datetime, timezone
from restic.errors import ResticFailedError
from pathlib import Path
from shutil import copyfile, rmtree
VOLUME_PATH = "/var/lib/docker/volumes/"
SECRET_PATH = '/secrets/'
SERVICE = 'ALL'
logger = logging.getLogger("backupbot")
logging.addLevelName(55, 'SUMMARY')
setattr(logging, 'SUMMARY', 55)
setattr(logger, 'summary', lambda message, *args, **
kwargs: logger.log(55, message, *args, **kwargs))
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.critical("Uncaught exception", exc_info=(
exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
@click.group()
@click.option('-l', '--log', 'loglevel')
@click.option('-m', '--machine-logs', 'machine_logs', is_flag=True, envvar='MACHINE_LOGS')
@click.option('service', '--host', '-h', envvar='SERVICE')
@click.option('repository', '--repo', '-r', envvar='RESTIC_REPOSITORY')
def cli(loglevel, service, repository, machine_logs):
global SERVICE
if service:
SERVICE = service.replace('.', '_')
if repository:
os.environ['RESTIC_REPOSITORY'] = repository
if loglevel:
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % loglevel)
logger.setLevel(numeric_level)
logHandler = logging.StreamHandler()
if machine_logs:
formatter = jsonlogger.JsonFormatter(
"%(levelname)s %(filename)s %(lineno)s %(process)d %(message)s", rename_fields={"levelname": "message_type"})
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
export_secrets()
init_repo()
def init_repo():
if repo:= os.environ.get('RESTIC_REPOSITORY_FILE'):
# RESTIC_REPOSITORY_FILE and RESTIC_REPOSITORY are mutually exclusive
del os.environ['RESTIC_REPOSITORY']
else:
repo = os.environ['RESTIC_REPOSITORY']
restic.repository = repo
logger.debug(f"set restic repository location: {repo}")
restic.password_file = '/var/run/secrets/restic_password'
try:
restic.cat.config()
except ResticFailedError as error:
if 'unable to open config file' in str(error):
result = restic.init()
logger.info(f"Initialized restic repo: {result}")
else:
raise error
def export_secrets():
for env in os.environ:
if env.endswith('FILE') and not "COMPOSE_FILE" in env:
logger.debug(f"exported secret: {env}")
with open(os.environ[env]) as file:
secret = file.read()
os.environ[env.removesuffix('_FILE')] = secret
# logger.debug(f"Read secret value: {secret}")
@cli.command()
@click.option('retries', '--retries', '-r', envvar='RETRIES', default=1)
def create(retries):
app_settings = parse_backup_labels()
pre_commands, post_commands, backup_paths, apps_versions = get_backup_details(app_settings)
copy_secrets(apps_versions)
backup_paths.append(Path(SECRET_PATH))
run_commands(pre_commands)
backup_volumes(backup_paths, apps_versions, int(retries))
run_commands(post_commands)
@cli.command()
@click.option('snapshot_id', '--snapshot', '-s', envvar='SNAPSHOT', default='latest')
@click.option('target', '--target', '-t', envvar='TARGET', default='/')
@click.option('noninteractive', '--noninteractive', envvar='NONINTERACTIVE', is_flag=True)
@click.option('volumes', '--volumes', '-v', envvar='VOLUMES', multiple=True)
@click.option('container', '--container', '-c', envvar='CONTAINER', multiple=True)
@click.option('no_commands', '--no-commands', envvar='NO_COMMANDS', is_flag=True)
def restore(snapshot_id, target, noninteractive, volumes, container, no_commands):
app_settings = parse_backup_labels('restore', container)
if SERVICE != 'ALL':
if not app_settings.get(SERVICE):
logger.error(f"The app {SERVICE} is not running, use the restore-path argument to restore paths of undeployed apps")
exit(1)
app_settings = {SERVICE: app_settings.get(SERVICE)}
pre_commands, post_commands, backup_paths, apps_versions = get_backup_details(app_settings, volumes)
snapshots = get_snapshots(snapshot_id)
if not snapshots:
logger.error(f"No Snapshots with ID {snapshot_id} for {apps_versions.keys()} found.")
exit(1)
snapshot = snapshots[0]
snapshot_id = snapshot['short_id']
if not noninteractive:
print(f"Snapshot to restore: \t{snapshot_id}")
restore_app_versions = app_versions_from_tags(snapshot.get('tags'))
print("Apps:")
for app, version in apps_versions.items():
restore_version = restore_app_versions.get(app)
print(f"\t{app} \t {restore_version}")
if version != restore_version:
print(f"WARNING!!! The running app is deployed with version {version}")
print("The following volume paths will be restored:")
for p in backup_paths:
print(f'\t{p}')
if not no_commands:
print("The following commands will be executed:")
for container, cmd in list(pre_commands.items()) + list(post_commands.items()):
print(f"\t{container.labels['com.docker.swarm.service.name']}:\t{cmd}")
snapshot_date = datetime.fromisoformat(snapshot['time'])
delta = datetime.now(tz=timezone.utc) - snapshot_date
print(f"This snapshot is {delta} old")
print("\nTHIS COMMAND WILL IRREVERSIBLY OVERWRITES FILES")
prompt = input("Type YES (uppercase) to continue: ")
if prompt != 'YES':
logger.error("Restore aborted")
exit(1)
print(f"Restoring Snapshot {snapshot_id} at {target}")
if not no_commands and pre_commands:
print(f"Run pre commands.")
run_commands(pre_commands)
if backup_paths:
result = restic_restore(snapshot_id=snapshot_id, include=backup_paths, target_dir=target)
logger.debug(result)
else:
print('No paths to restore.')
if not no_commands and post_commands:
print(f"Run post commands.")
run_commands(post_commands)
@cli.command()
@click.option('snapshot_id', '--snapshot', '-s', envvar='SNAPSHOT', default='latest')
@click.option('target', '--target', '-t', envvar='TARGET', default='/')
@click.option('noninteractive', '--noninteractive', envvar='NONINTERACTIVE', is_flag=True)
@click.argument('paths', nargs=-1, required=True, envvar='INCLUDE_PATH')
def restore_path(snapshot_id, target, noninteractive, paths):
""" PATHS: list of paths to restore"""
snapshots = get_snapshots(snapshot_id)
if not snapshots:
logger.error(f"No Snapshots with ID {snapshot_id} for app {SERVICE} found.")
exit(1)
snapshot = snapshots[0]
snapshot_id = snapshot['short_id']
if not noninteractive:
print(f"Snapshot to restore: \t{snapshot_id}")
restore_app_versions = app_versions_from_tags(snapshot.get('tags'))
print("Apps:")
for app, version in restore_app_versions.items():
if SERVICE == 'ALL' or SERVICE == app:
print(f"\t{app} \t {version}")
print("The following paths will be restored:")
for p in paths:
print(f'\t{p}')
snapshot_date = datetime.fromisoformat(snapshot['time'])
delta = datetime.now(tz=timezone.utc) - snapshot_date
print(f"This snapshot is {delta} old")
print("\nTHIS COMMAND WILL IRREVERSIBLY OVERWRITES FILES")
prompt = input("Type YES (uppercase) to continue: ")
if prompt != 'YES':
logger.error("Restore aborted")
exit(1)
print(f"Restoring Snapshot {snapshot_id} at {target}")
result = restic_restore(snapshot_id=snapshot_id, include=paths, target_dir=target)
logger.debug(result)
def restic_restore(snapshot_id, include=[], target_dir=None):
cmd = restic.cat.base_command() + ['restore', snapshot_id]
for path in include:
cmd.extend(['--include', path])
if target_dir:
cmd.extend(['--target', target_dir])
return restic.internal.command_executor.execute(cmd)
def get_snapshots(snapshot_id=None):
if snapshot_id and snapshot_id != 'latest':
snapshots = restic.snapshots(snapshot_id=snapshot_id)
if not SERVICE in app_versions_from_tags(snapshots[0].get('tags')):
logger.error(f'Snapshot with ID {snapshot_id} does not contain {SERVICE}')
exit(1)
else:
snapshots = restic.snapshots()
snapshots = list(filter(lambda x: SERVICE in app_versions_from_tags(x.get('tags')), snapshots))
if snapshot_id == 'latest':
return snapshots[-1:]
else:
return snapshots
def app_versions_from_tags(tags):
if tags:
app_versions = map(lambda x: x.split(':'), tags)
return {i[0]: i[1] if len(i) > 1 else None for i in app_versions}
else:
return {}
def str2bool(value: str) -> bool:
return value.lower() in ("yes", "true", "t", "1")
def parse_backup_labels(hook_type='backup', selected_container=[]):
client = docker.from_env()
container_by_service = {
c.labels.get('com.docker.swarm.service.name'): c for c in client.containers.list()}
services = client.services.list()
app_settings = {}
for s in services:
specs = s.attrs['Spec']
labels = specs['Labels']
stack_name = labels['com.docker.stack.namespace']
container_name = s.name.removeprefix(f"{stack_name}_")
version = labels.get(f'coop-cloud.{stack_name}.version')
settings = app_settings[stack_name] = app_settings.get(stack_name) or {}
if (backup := labels.get('backupbot.backup')) and str2bool(backup):
settings['enabled'] = True
if version:
settings['version'] = version
if selected_container and container_name not in selected_container:
logger.debug(f"Skipping {s.name} because it's not a selected container")
continue
if mounts:= specs['TaskTemplate']['ContainerSpec'].get('Mounts'):
volumes = parse_volumes(stack_name, mounts)
volumes.update(settings.get('volumes') or {})
settings['volumes'] = volumes
excluded_volumes, included_volume_paths = parse_excludes_includes(labels)
settings['excluded_volumes'] = excluded_volumes.union(settings.get('excluded_volumes') or set())
settings['included_volume_paths'] = included_volume_paths.union(settings.get('included_volume_paths') or set())
if container := container_by_service.get(s.name):
if command := labels.get(f'backupbot.{hook_type}.pre-hook'):
if not (pre_hooks:= settings.get('pre_hooks')):
pre_hooks = settings['pre_hooks'] = {}
pre_hooks[container] = command
if command := labels.get(f'backupbot.{hook_type}.post-hook'):
if not (post_hooks:= settings.get('post_hooks')):
post_hooks = settings['post_hooks'] = {}
post_hooks[container] = command
else:
logger.debug(f"Container {s.name} is not running.")
if labels.get(f'backupbot.{hook_type}.pre-hook') or labels.get(f'backupbot.{hook_type}.post-hook'):
logger.error(f"Container {s.name} contain hooks but it's not running")
return app_settings
def get_backup_details(app_settings, volumes=[]):
backup_paths = set()
backup_apps_versions = {}
pre_hooks= {}
post_hooks = {}
for app, settings in app_settings.items():
if settings.get('enabled'):
if SERVICE != 'ALL' and SERVICE != app:
continue
backup_apps_versions[app] = settings.get('version')
add_backup_paths(backup_paths, settings, app, volumes)
if hooks:= settings.get('pre_hooks'):
pre_hooks.update(hooks)
if hooks:= settings.get('post_hooks'):
post_hooks.update(hooks)
return pre_hooks, post_hooks, list(backup_paths), backup_apps_versions
def add_backup_paths(backup_paths, settings, app, selected_volumes):
if (volumes := settings.get('volumes')):
if includes:= settings.get('included_volume_paths'):
included_volumes = list(zip(*includes))[0]
for volume, rel_paths in includes:
if not (volume_path:= volumes.get(volume)):
logger.error(f'Can not find volume with the name {volume}')
continue
if selected_volumes and volume not in selected_volumes:
logger.debug(f'Skipping {volume}:{rel_paths} because the volume is not selected')
continue
for p in rel_paths:
absolute_path = Path(f"{volume_path}/{p}")
backup_paths.add(absolute_path)
else:
included_volumes = []
excluded_volumes = settings.get('excluded_volumes') or []
for name, path in volumes.items():
if selected_volumes and name not in selected_volumes:
logger.debug(f'Skipping volume: {name} because the volume is not selected')
continue
if name in excluded_volumes:
logger.debug(f'Skipping volume: {name} because the volume is excluded')
continue
if name in included_volumes:
logger.debug(f'Skipping volume: {name} because a path is selected')
continue
backup_paths.add(path)
else:
logger.warning(f"{app} does not contain any volumes")
def parse_volumes(stack_name, mounts):
volumes = {}
for m in mounts:
if m['Type'] != 'volume':
continue
relative_path = m['Source']
name = relative_path.removeprefix(stack_name + '_')
absolute_path = Path(f"{VOLUME_PATH}{relative_path}/_data/")
volumes[name] = absolute_path
return volumes
def parse_excludes_includes(labels):
excluded_volumes = set()
included_volume_paths = set()
for label, value in labels.items():
if label.startswith('backupbot.backup.volumes.'):
volume_name = label.removeprefix('backupbot.backup.volumes.').removesuffix('.path')
if label.endswith('path'):
relative_paths = tuple(value.split(','))
included_volume_paths.add((volume_name, relative_paths))
elif not str2bool(value):
excluded_volumes.add(volume_name)
return excluded_volumes, included_volume_paths
def copy_secrets(apps):
# TODO: check if it is deployed
rmtree(SECRET_PATH, ignore_errors=True)
os.mkdir(SECRET_PATH)
client = docker.from_env()
container_by_service = {
c.labels.get('com.docker.swarm.service.name'): c for c in client.containers.list()}
services = client.services.list()
for s in services:
app_name = s.attrs['Spec']['Labels']['com.docker.stack.namespace']
if (app_name in apps and
(app_secs := s.attrs['Spec']['TaskTemplate']['ContainerSpec'].get('Secrets'))):
if not container_by_service.get(s.name):
logger.warning(
f"Container {s.name} is not running, secrets can not be copied.")
continue
container_id = container_by_service[s.name].id
for sec in app_secs:
src = f'/var/lib/docker/containers/{container_id}/mounts/secrets/{sec["SecretID"]}'
if not Path(src).exists():
logger.error(
f"For the secret {sec['SecretName']} the file {src} does not exist for {s.name}")
continue
dst = SECRET_PATH + sec['SecretName']
logger.debug(f"Copy Secret {sec['SecretName']}")
copyfile(src, dst)
def run_commands(commands):
for container, command in commands.items():
if not command:
continue
# Remove bash/sh wrapping
command = command.removeprefix('bash -c').removeprefix('sh -c').removeprefix(' ')
# Remove quotes surrounding the command
if (len(command) >= 2 and command[0] == command[-1] and (command[0] == "'" or command[0] == '"')):
command = command[1:-1]
# Use bash's pipefail to return exit codes inside a pipe to prevent silent failure
command = f"bash -c 'set -o pipefail;{command}'"
logger.info(f"run command in {container.name}:")
logger.info(command)
result = container.exec_run(command)
if result.exit_code:
logger.error(
f"Failed to run command {command} in {container.name}: {result.output.decode()}")
else:
logger.debug(result.output.decode())
def backup_volumes(backup_paths, apps_versions, retries, dry_run=False):
while True:
try:
logger.info("Backup these paths:")
logger.info("\n".join(map(str, backup_paths)))
backup_paths = list(filter(path_exists, backup_paths))
cmd = restic.cat.base_command()
parent = get_snapshots('latest')
if parent:
# https://restic.readthedocs.io/en/stable/040_backup.html#file-change-detection
cmd.extend(['--parent', parent[0]['short_id']])
tags = [f"{app}:{version}" for app,version in apps_versions.items()]
if SERVICE == 'ALL':
tags.append(SERVICE)
logger.info("Start volume backup")
result = restic.internal.backup.run(cmd, backup_paths, dry_run=dry_run, tags=tags)
logger.summary("backup finished", extra=result)
return
except ResticFailedError as error:
logger.error(f"Backup failed for {SERVICE}.")
logger.error(error, exc_info=True)
if retries > 0:
retries -= 1
else:
exit(1)
def path_exists(path):
if not path.exists():
logger.error(f'{path} does not exist')
return path.exists()
@cli.command()
def snapshots():
snapshots = get_snapshots()
for snap in snapshots:
output = [snap['time'].split('.')[0], snap['short_id']]
if tags:= snap.get('tags'):
app_versions = app_versions_from_tags(tags)
if version:= app_versions.get(SERVICE):
output.append(version)
print(*output)
if not snapshots:
err_msg = "No Snapshots found"
if SERVICE != 'ALL':
service_name = SERVICE.replace('_', '.')
err_msg += f' for app {service_name}'
logger.warning(err_msg)
@cli.command()
@click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest')
@click.option('show_all', '--all', '-a', envvar='SHOW_ALL', is_flag=True)
@click.option('timestamps', '--timestamps', '-t', envvar='TIMESTAMPS', is_flag=True)
@click.argument('path', required=False, default='/var/lib/docker/volumes/', envvar='INCLUDE_PATH')
def ls(snapshot, show_all, timestamps, path):
if snapshot == 'latest':
latest_snapshot = get_snapshots('latest')
if not latest_snapshot:
logger.error(f"There is no latest snapshot for {SERVICE}")
exit(1)
snapshot = latest_snapshot[0]['short_id']
if show_all:
path = None
results = list_files(snapshot, path)
for r in results:
if r.get('path'):
if timestamps:
print(f"{r['ctime']}\t{r['path']}")
else:
print(f"{r['path']}")
def list_files(snapshot, path):
cmd = restic.cat.base_command() + ['ls']
cmd.append(snapshot)
if path:
cmd.append(path)
try:
output = restic.internal.command_executor.execute(cmd)
except ResticFailedError as error:
if 'no snapshot found' in str(error):
err_msg = f'There is no snapshot "{snapshot}"'
if SERVICE != 'ALL':
err_msg += f' for the app "{SERVICE}"'
logger.error(err_msg)
exit(1)
else:
raise error
output = output.replace('}\n{', '}|{')
results = list(map(json.loads, output.split('|')))
return results
@cli.command()
@click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest')
@click.option('path', '--path', '-p', envvar='INCLUDE_PATH')
@click.option('volumes', '--volumes', '-v', envvar='VOLUMES')
@click.option('secrets', '--secrets', '-c', is_flag=True, envvar='SECRETS')
def download(snapshot, path, volumes, secrets):
file_dumps = []
if snapshot == 'latest':
latest_snapshot = get_snapshots('latest')
if not latest_snapshot:
logger.error(f"There is no latest snapshot for {SERVICE}")
exit(1)
snapshot = latest_snapshot[0]['short_id']
if not any([path, volumes, secrets]):
volumes = secrets = True
if path:
path = path.removesuffix('/')
binary_output = dump(snapshot, path)
files = list_files(snapshot, path)
filetype = [f.get('type') for f in files if f.get('path') == path][0]
filename = Path(path).name
if filetype == 'dir':
filename = filename + ".tar"
tarinfo = tarfile.TarInfo(name=filename)
tarinfo.size = len(binary_output)
file_dumps.append((binary_output, tarinfo))
if volumes:
if SERVICE == 'ALL':
logger.error("Please specify '--host' when using '--volumes'")
exit(1)
files = list_files(snapshot, VOLUME_PATH)
for f in files[1:]:
path = f['path']
if Path(path).name.startswith(SERVICE) and f['type'] == 'dir':
binary_output = dump(snapshot, path)
filename = f"{Path(path).name}.tar"
tarinfo = tarfile.TarInfo(name=filename)
tarinfo.size = len(binary_output)
file_dumps.append((binary_output, tarinfo))
if secrets:
if SERVICE == 'ALL':
logger.error("Please specify '--host' when using '--secrets'")
exit(1)
filename = f"{SERVICE}.json"
files = list_files(snapshot, SECRET_PATH)
secrets = {}
for f in files[1:]:
path = f['path']
if Path(path).name.startswith(SERVICE) and f['type'] == 'file':
secret = dump(snapshot, path).decode()
secret_name = path.removeprefix(f'{SECRET_PATH}{SERVICE}_')
secrets[secret_name] = secret
binary_output = json.dumps(secrets).encode()
tarinfo = tarfile.TarInfo(name=filename)
tarinfo.size = len(binary_output)
file_dumps.append((binary_output, tarinfo))
with tarfile.open('/tmp/backup.tar.gz', "w:gz") as tar:
print(f"Writing files to /tmp/backup.tar.gz...")
for binary_output, tarinfo in file_dumps:
tar.addfile(tarinfo, fileobj=io.BytesIO(binary_output))
size = get_formatted_size('/tmp/backup.tar.gz')
print(
f"Backup has been written to /tmp/backup.tar.gz with a size of {size}")
def get_formatted_size(file_path):
file_size = os.path.getsize(file_path)
units = ['Bytes', 'KB', 'MB', 'GB', 'TB']
for unit in units:
if file_size < 1024:
return f"{round(file_size, 3)} {unit}"
file_size /= 1024
return f"{round(file_size, 3)} {units[-1]}"
def dump(snapshot, path):
cmd = restic.cat.base_command() + ['dump']
cmd = cmd + [snapshot, path]
print(f"Dumping {path} from snapshot '{snapshot}'")
output = subprocess.run(cmd, capture_output=True)
if output.returncode:
logger.error(
f"error while dumping {path} from snapshot '{snapshot}': {output.stderr}")
exit(1)
return output.stdout
if __name__ == '__main__':
cli()